zivleech/zenius.py

213 regels
7.0 KiB
Python

import argparse
import functools
import os
import os.path
import pickle
import queue
import urllib
import urllib.parse
import zipfile
import requests
import requests.exceptions
from bs4 import BeautifulSoup
from frozendict import frozendict
def parse_args(*args):
parser = argparse.ArgumentParser(description="Mirror simfiles from ZIV")
parser.add_argument('categories', type=str, nargs='*',
help='ZIV category pages to mirror',
default=['https://zenius-i-vanisher.com/v5.2/simfiles.php?category=latest20official'])
parser.add_argument('--songdir', type=str, help="Directory to keep songs in", default="songs")
parser.add_argument('--zipdir', type=str, help="Directory to keep downloaded zip files in", default="zips")
parser.add_argument('--recurse', '-r',
help='Recursively fetch the main categories for each song',
action='store_true')
feature = parser.add_mutually_exclusive_group(required=False)
feature.add_argument('--dry-run', '-n',
help="Only perform a dry run; don't download any files",
dest='dry_run', action='store_true')
feature.add_argument('--no-dry-run',
help="Dwonload all files",
dest='dry_run', action='store_false')
feature.set_defaults(dry_run=False)
return parser.parse_args(*args)
errors = []
@functools.lru_cache()
def retrieve(url, filename, save_headers=None, extract=None, **kwargs):
print(f'Downloading {url} -> {filename}')
remove = False
def record_error(message):
errors.append((url, filename, message))
print(message)
try:
req = requests.get(url, **kwargs, stream=True)
if req.status_code == 200:
with open(f'{filename}.part', 'wb') as output:
for chunk in req.iter_content(1024):
output.write(chunk)
os.replace(f'{filename}.part', filename)
if extract:
with zipfile.ZipFile(filename, 'r') as zip:
print(f'Extracting into {extract}')
os.makedirs(extract, exist_ok=True)
zip.extractall(extract)
if save_headers:
with open(save_headers, 'wb') as data:
pickle.dump(req.headers, data)
elif req.status_code == 304:
print("Not modified")
else:
record_error(f"Error: {req.status_code} {req.text}")
except requests.exceptions.BaseHTTPError as e:
record_error(f'Error downloading: {e.msg}')
except zipfile.BadZipFile:
record_error(f'Not a zip file: {filename}')
remove = True
except KeyboardInterrupt as e:
record_error('Download aborting...')
remove = True
raise e
except Exception as e:
record_error(f'Unhandled error: {e}')
remove = True
finally:
if remove:
if os.path.isfile(filename):
print(f'Removing {filename}')
os.remove(filename)
if save_headers and os.path.isfile(save_headers):
print(f'Removing {save_headers}')
os.remove(save_headers)
return req.headers
@functools.lru_cache()
def get_page(cat_url):
request = requests.get(cat_url)
return BeautifulSoup(request.text, features="html.parser")
def load_prev_headers(filename, header_file):
req_headers = {}
if os.path.isfile(header_file) and os.path.isfile(filename):
with open(header_file, 'rb') as data:
prev_headers = pickle.load(data)
if 'etag' in prev_headers:
req_headers['If-None-Match'] = prev_headers['etag']
if 'last-modified' in prev_headers:
req_headers['If-Modified-Since'] = prev_headers['last-modified']
return req_headers
def mirror(cat_url, args):
page = get_page(cat_url)
group_urls = {}
if 'viewsimfilecategory.php' in cat_url:
simgroup = page.find('div', {'class': 'headertop'}).h1
group_url = cat_url
else:
simgroup = None
for row in page.find_all('tr'):
simfile = row.find(
"a", href=lambda href: href and "viewsimfile.php" in href)
group_link = row.find(
"a", href=lambda href: href and "viewsimfilecategory.php" in href)
if group_link:
simgroup = group_link
group_url = group_link['href']
if not (simfile and simgroup):
continue
songname = ' '.join(simfile.get_text().replace('/', '-').split())
groupname = ' '.join(simgroup.get_text().replace('/', '-').split())
print(f"collection: '{groupname}' simfile: '{songname}'")
simlink = simfile['href']
try:
sim_id = urllib.parse.parse_qs(urllib.parse.urlparse(
simfile['href']).query)['simfileid'][0]
except KeyError:
print(f"WARNING: no simfileid found on URL {simlink}")
continue
group_urls[groupname] = urllib.parse.urljoin(cat_url, group_url)
url = f'https://zenius-i-vanisher.com/v5.2/download.php?type=ddrsimfile&simfileid={sim_id}'
if args.dry_run:
print(f"Dry run requested, not downloading {url}")
continue
filename = os.path.join(args.zipdir, f'{sim_id}.zip')
headers = os.path.join(args.zipdir, f'{sim_id}.headers')
req_headers = load_prev_headers(filename, headers)
retrieve(url, filename, extract=os.path.join(args.songdir, groupname),
headers=frozendict(req_headers), save_headers=headers)
for groupname, group_url in group_urls.items():
page = get_page(group_url)
banner_urls = {urllib.parse.urljoin(group_url, banner['src'])
for banner in page.select('p.centre img')
if 'simfileNoBanner.png' not in banner['src']}
for url in banner_urls:
filename = os.path.join(args.songdir, groupname, 'banner.png')
headers = os.path.join(args.zipdir, f'{groupname}-banner.headers')
req_headers = load_prev_headers(filename, headers)
retrieve(url, filename, extract=False,
headers=frozendict(req_headers), save_headers=headers)
return group_urls.values()
def main():
args = parse_args()
os.makedirs(args.songdir, exist_ok=True)
os.makedirs(args.zipdir, exist_ok=True)
seen_cats = set()
pending = queue.Queue()
for url in args.categories:
pending.put(url)
while not pending.empty():
url = pending.get()
found = None
if url not in seen_cats:
seen_cats.add(url)
found = mirror(url, args)
if args.recurse:
for url in found:
print(f'Scheduling discovered category {url}')
pending.put(url)
if __name__ == "__main__":
main()
if errors:
print('Downloading got errors:')
for url, filename, message in errors:
print(f'{url} ({filename}): {message}')