Leech/update scripts for zenius-i-vanisher.com
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

213 lines
7.0 KiB

import argparse
import functools
import os
import os.path
import pickle
import queue
import urllib
2 years ago
import urllib.parse
import zipfile
2 years ago
import requests
import requests.exceptions
2 years ago
from bs4 import BeautifulSoup
from frozendict import frozendict
2 years ago
def parse_args(*args):
parser = argparse.ArgumentParser(description="Mirror simfiles from ZIV")
1 year ago
parser.add_argument('categories', type=str, nargs='*',
help='ZIV category pages to mirror',
2 years ago
parser.add_argument('--songdir', type=str, help="Directory to keep songs in", default="songs")
parser.add_argument('--zipdir', type=str, help="Directory to keep downloaded zip files in", default="zips")
parser.add_argument('--recurse', '-r',
help='Recursively fetch the main categories for each song',
2 years ago
feature = parser.add_mutually_exclusive_group(required=False)
feature.add_argument('--dry-run', '-n',
help="Only perform a dry run; don't download any files",
2 years ago
dest='dry_run', action='store_true')
help="Dwonload all files",
2 years ago
dest='dry_run', action='store_false')
return parser.parse_args(*args)
1 year ago
errors = []
def retrieve(url, filename, save_headers=None, extract=None, **kwargs):
print(f'Downloading {url} -> {filename}')
1 year ago
remove = False
1 year ago
def record_error(message):
errors.append((url, filename, message))
req = requests.get(url, **kwargs, stream=True)
if req.status_code == 200:
with open(f'{filename}.part', 'wb') as output:
for chunk in req.iter_content(1024):
os.replace(f'{filename}.part', filename)
if extract:
with zipfile.ZipFile(filename, 'r') as zip:
print(f'Extracting into {extract}')
os.makedirs(extract, exist_ok=True)
if save_headers:
1 year ago
with open(save_headers, 'wb') as data:
pickle.dump(req.headers, data)
elif req.status_code == 304:
print("Not modified")
1 year ago
record_error(f"Error: {req.status_code} {req.text}")
except requests.exceptions.BaseHTTPError as e:
1 year ago
record_error(f'Error downloading: {e.msg}')
except zipfile.BadZipFile:
1 year ago
record_error(f'Not a zip file: {filename}')
1 year ago
remove = True
except KeyboardInterrupt as e:
1 year ago
record_error('Download aborting...')
1 year ago
remove = True
raise e
1 year ago
except Exception as e:
record_error(f'Unhandled error: {e}')
remove = True
1 year ago
if remove:
if os.path.isfile(filename):
print(f'Removing {filename}')
if save_headers and os.path.isfile(save_headers):
print(f'Removing {save_headers}')
return req.headers
def get_page(cat_url):
2 years ago
request = requests.get(cat_url)
return BeautifulSoup(request.text, features="html.parser")
def load_prev_headers(filename, header_file):
req_headers = {}
if os.path.isfile(header_file) and os.path.isfile(filename):
1 year ago
with open(header_file, 'rb') as data:
prev_headers = pickle.load(data)
if 'etag' in prev_headers:
req_headers['If-None-Match'] = prev_headers['etag']
if 'last-modified' in prev_headers:
req_headers['If-Modified-Since'] = prev_headers['last-modified']
return req_headers
def mirror(cat_url, args):
page = get_page(cat_url)
group_urls = {}
2 years ago
if 'viewsimfilecategory.php' in cat_url:
simgroup = page.find('div', {'class': 'headertop'}).h1
group_url = cat_url
2 years ago
simgroup = None
for row in page.find_all('tr'):
simfile = row.find(
"a", href=lambda href: href and "viewsimfile.php" in href)
group_link = row.find(
"a", href=lambda href: href and "viewsimfilecategory.php" in href)
if group_link:
simgroup = group_link
group_url = group_link['href']
2 years ago
if not (simfile and simgroup):
songname = ' '.join(simfile.get_text().replace('/', '-').split())
groupname = ' '.join(simgroup.get_text().replace('/', '-').split())
print(f"collection: '{groupname}' simfile: '{songname}'")
simlink = simfile['href']
sim_id = urllib.parse.parse_qs(urllib.parse.urlparse(
2 years ago
except KeyError:
print(f"WARNING: no simfileid found on URL {simlink}")
group_urls[groupname] = urllib.parse.urljoin(cat_url, group_url)
2 years ago
url = f'https://zenius-i-vanisher.com/v5.2/download.php?type=ddrsimfile&simfileid={sim_id}'
if args.dry_run:
print(f"Dry run requested, not downloading {url}")
filename = os.path.join(args.zipdir, f'{sim_id}.zip')
headers = os.path.join(args.zipdir, f'{sim_id}.headers')
req_headers = load_prev_headers(filename, headers)
retrieve(url, filename, extract=os.path.join(args.songdir, groupname),
headers=frozendict(req_headers), save_headers=headers)
2 years ago
for groupname, group_url in group_urls.items():
page = get_page(group_url)
banner_urls = {urllib.parse.urljoin(group_url, banner['src'])
for banner in page.select('p.centre img')
if 'simfileNoBanner.png' not in banner['src']}
for url in banner_urls:
filename = os.path.join(args.songdir, groupname, 'banner.png')
headers = os.path.join(args.zipdir, f'{groupname}-banner.headers')
req_headers = load_prev_headers(filename, headers)
retrieve(url, filename, extract=False,
headers=frozendict(req_headers), save_headers=headers)
return group_urls.values()
1 year ago
def main():
2 years ago
args = parse_args()
os.makedirs(args.songdir, exist_ok=True)
os.makedirs(args.zipdir, exist_ok=True)
seen_cats = set()
pending = queue.Queue()
for url in args.categories:
while not pending.empty():
url = pending.get()
found = None
if url not in seen_cats:
found = mirror(url, args)
if args.recurse:
for url in found:
print(f'Scheduling discovered category {url}')
1 year ago
if __name__ == "__main__":
1 year ago
if errors:
print('Downloading got errors:')
for url, filename, message in errors:
print(f'{url} ({filename}): {message}')