Leech/update scripts for zenius-i-vanisher.com
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

213 lines
7.0 KiB

import argparse
import functools
import os
import os.path
import pickle
import queue
import urllib
import urllib.parse
import zipfile
import requests
import requests.exceptions
from bs4 import BeautifulSoup
from frozendict import frozendict
def parse_args(*args):
parser = argparse.ArgumentParser(description="Mirror simfiles from ZIV")
10 months ago
parser.add_argument('categories', type=str, nargs='*',
help='ZIV category pages to mirror',
parser.add_argument('--songdir', type=str, help="Directory to keep songs in", default="songs")
parser.add_argument('--zipdir', type=str, help="Directory to keep downloaded zip files in", default="zips")
parser.add_argument('--recurse', '-r',
help='Recursively fetch the main categories for each song',
feature = parser.add_mutually_exclusive_group(required=False)
feature.add_argument('--dry-run', '-n',
help="Only perform a dry run; don't download any files",
dest='dry_run', action='store_true')
10 months ago
help="Dwonload all files",
dest='dry_run', action='store_false')
return parser.parse_args(*args)
10 months ago
errors = []
def retrieve(url, filename, save_headers=None, extract=None, **kwargs):
print(f'Downloading {url} -> {filename}')
10 months ago
remove = False
10 months ago
def record_error(message):
errors.append((url, filename, message))
req = requests.get(url, **kwargs, stream=True)
if req.status_code == 200:
with open(f'{filename}.part', 'wb') as output:
for chunk in req.iter_content(1024):
os.replace(f'{filename}.part', filename)
if extract:
with zipfile.ZipFile(filename, 'r') as zip:
print(f'Extracting into {extract}')
os.makedirs(extract, exist_ok=True)
if save_headers:
10 months ago
with open(save_headers, 'wb') as data:
pickle.dump(req.headers, data)
elif req.status_code == 304:
print("Not modified")
10 months ago
record_error(f"Error: {req.status_code} {req.text}")
except requests.exceptions.BaseHTTPError as e:
10 months ago
record_error(f'Error downloading: {e.msg}')
except zipfile.BadZipFile:
10 months ago
record_error(f'Not a zip file: {filename}')
10 months ago
remove = True
except KeyboardInterrupt as e:
10 months ago
record_error('Download aborting...')
10 months ago
remove = True
raise e
10 months ago
except Exception as e:
record_error(f'Unhandled error: {e}')
remove = True
10 months ago
if remove:
if os.path.isfile(filename):
print(f'Removing {filename}')
if save_headers and os.path.isfile(save_headers):
print(f'Removing {save_headers}')
return req.headers
def get_page(cat_url):
request = requests.get(cat_url)
return BeautifulSoup(request.text, features="html.parser")
def load_prev_headers(filename, header_file):
req_headers = {}
if os.path.isfile(header_file) and os.path.isfile(filename):
10 months ago
with open(header_file, 'rb') as data:
prev_headers = pickle.load(data)
if 'etag' in prev_headers:
req_headers['If-None-Match'] = prev_headers['etag']
if 'last-modified' in prev_headers:
req_headers['If-Modified-Since'] = prev_headers['last-modified']
return req_headers
def mirror(cat_url, args):
page = get_page(cat_url)
group_urls = {}
if 'viewsimfilecategory.php' in cat_url:
simgroup = page.find('div', {'class': 'headertop'}).h1
group_url = cat_url
simgroup = None
for row in page.find_all('tr'):
simfile = row.find(
"a", href=lambda href: href and "viewsimfile.php" in href)
group_link = row.find(
"a", href=lambda href: href and "viewsimfilecategory.php" in href)
if group_link:
simgroup = group_link
group_url = group_link['href']
if not (simfile and simgroup):
songname = ' '.join(simfile.get_text().replace('/', '-').split())
groupname = ' '.join(simgroup.get_text().replace('/', '-').split())
print(f"collection: '{groupname}' simfile: '{songname}'")
simlink = simfile['href']
sim_id = urllib.parse.parse_qs(urllib.parse.urlparse(
except KeyError:
print(f"WARNING: no simfileid found on URL {simlink}")
group_urls[groupname] = urllib.parse.urljoin(cat_url, group_url)
url = f'https://zenius-i-vanisher.com/v5.2/download.php?type=ddrsimfile&simfileid={sim_id}'
if args.dry_run:
print(f"Dry run requested, not downloading {url}")
filename = os.path.join(args.zipdir, f'{sim_id}.zip')
headers = os.path.join(args.zipdir, f'{sim_id}.headers')
req_headers = load_prev_headers(filename, headers)
retrieve(url, filename, extract=os.path.join(args.songdir, groupname),
headers=frozendict(req_headers), save_headers=headers)
for groupname, group_url in group_urls.items():
page = get_page(group_url)
banner_urls = {urllib.parse.urljoin(group_url, banner['src'])
for banner in page.select('p.centre img')
if 'simfileNoBanner.png' not in banner['src']}
for url in banner_urls:
filename = os.path.join(args.songdir, groupname, 'banner.png')
headers = os.path.join(args.zipdir, f'{groupname}-banner.headers')
req_headers = load_prev_headers(filename, headers)
retrieve(url, filename, extract=False,
10 months ago
headers=frozendict(req_headers), save_headers=headers)
return group_urls.values()
10 months ago
def main():
args = parse_args()
os.makedirs(args.songdir, exist_ok=True)
os.makedirs(args.zipdir, exist_ok=True)
seen_cats = set()
pending = queue.Queue()
for url in args.categories:
while not pending.empty():
url = pending.get()
found = None
if url not in seen_cats:
found = mirror(url, args)
if args.recurse:
for url in found:
print(f'Scheduling discovered category {url}')
10 months ago
if __name__ == "__main__":
10 months ago
if errors:
print('Downloading got errors:')
for url, filename, message in errors:
print(f'{url} ({filename}): {message}')