Leech/update scripts for zenius-i-vanisher.com
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

212 lines
7.0 KiB

import argparse
import functools
import os
import os.path
import pickle
import queue
import urllib
import urllib.parse
import zipfile
import requests
import requests.exceptions
from bs4 import BeautifulSoup
from frozendict import frozendict
def parse_args(*args):
parser = argparse.ArgumentParser(description="Mirror simfiles from ZIV")
parser.add_argument('categories', type=str, nargs='*',
help='ZIV category pages to mirror',
parser.add_argument('--songdir', type=str, help="Directory to keep songs in", default="songs")
parser.add_argument('--zipdir', type=str, help="Directory to keep downloaded zip files in", default="zips")
parser.add_argument('--recurse', '-r',
help='Recursively fetch the main categories for each song',
feature = parser.add_mutually_exclusive_group(required=False)
feature.add_argument('--dry-run', '-n',
help="Only perform a dry run; don't download any files",
dest='dry_run', action='store_true')
help="Dwonload all files",
dest='dry_run', action='store_false')
return parser.parse_args(*args)
errors = []
def retrieve(url, filename, save_headers=None, extract=None, **kwargs):
print(f'Downloading {url} -> {filename}')
remove = False
def record_error(message):
errors.append((url, filename, message))
req = requests.get(url, **kwargs, stream=True)
if req.status_code == 200:
with open(f'{filename}.part', 'wb') as output:
for chunk in req.iter_content(1024):
os.replace(f'{filename}.part', filename)
if extract:
with zipfile.ZipFile(filename, 'r') as zip:
print(f'Extracting into {extract}')
os.makedirs(extract, exist_ok=True)
if save_headers:
with open(save_headers, 'wb') as data:
pickle.dump(req.headers, data)
elif req.status_code == 304:
print("Not modified")
record_error(f"Error: {req.status_code} {req.text}")
except requests.exceptions.BaseHTTPError as e:
record_error(f'Error downloading: {e.msg}')
except zipfile.BadZipFile:
record_error(f'Not a zip file: {filename}')
remove = True
except KeyboardInterrupt as e:
record_error('Download aborting...')
remove = True
raise e
except Exception as e:
record_error(f'Unhandled error: {e}')
remove = True
if remove:
if os.path.isfile(filename):
print(f'Removing {filename}')
if save_headers and os.path.isfile(save_headers):
print(f'Removing {save_headers}')
return req.headers
def get_page(cat_url):
request = requests.get(cat_url)
return BeautifulSoup(request.text, features="html.parser")
def load_prev_headers(filename, header_file):
req_headers = {}
if os.path.isfile(header_file) and os.path.isfile(filename):
with open(header_file, 'rb') as data:
prev_headers = pickle.load(data)
if 'etag' in prev_headers:
req_headers['If-None-Match'] = prev_headers['etag']
if 'last-modified' in prev_headers:
req_headers['If-Modified-Since'] = prev_headers['last-modified']
return req_headers
def mirror(cat_url, args):
page = get_page(cat_url)
group_urls = {}
if 'viewsimfilecategory.php' in cat_url:
simgroup = page.find('div', {'class': 'headertop'}).h1
group_url = cat_url
simgroup = None
for row in page.find_all('tr'):
simfile = row.find(
"a", href=lambda href: href and "viewsimfile.php" in href)
group_link = row.find(
"a", href=lambda href: href and "viewsimfilecategory.php" in href)
if group_link:
simgroup = group_link
group_url = group_link['href']
if not (simfile and simgroup):
songname = ' '.join(simfile.get_text().replace('/', '-').split())
groupname = ' '.join(simgroup.get_text().replace('/', '-').split())
print(f"collection: '{groupname}' simfile: '{songname}'")
simlink = simfile['href']
sim_id = urllib.parse.parse_qs(urllib.parse.urlparse(
except KeyError:
print(f"WARNING: no simfileid found on URL {simlink}")
group_urls[groupname] = urllib.parse.urljoin(cat_url, group_url)
url = f'https://zenius-i-vanisher.com/v5.2/download.php?type=ddrsimfile&simfileid={sim_id}'
if args.dry_run:
print(f"Dry run requested, not downloading {url}")
filename = os.path.join(args.zipdir, f'{sim_id}.zip')
headers = os.path.join(args.zipdir, f'{sim_id}.headers')
req_headers = load_prev_headers(filename, headers)
retrieve(url, filename, extract=os.path.join(args.songdir, groupname),
headers=frozendict(req_headers), save_headers=headers)
for groupname, group_url in group_urls.items():
page = get_page(group_url)
banner_urls = {urllib.parse.urljoin(group_url, banner['src'])
for banner in page.select('p.centre img')
if 'simfileNoBanner.png' not in banner['src']}
for url in banner_urls:
filename = os.path.join(args.songdir, groupname, 'banner.png')
headers = os.path.join(args.zipdir, f'{groupname}-banner.headers')
req_headers = load_prev_headers(filename, headers)
retrieve(url, filename, extract=False,
headers=frozendict(req_headers), save_headers=headers)
return group_urls.values()
def main():
args = parse_args()
os.makedirs(args.songdir, exist_ok=True)
os.makedirs(args.zipdir, exist_ok=True)
seen_cats = set()
pending = queue.Queue()
for url in args.categories:
while not pending.empty():
url = pending.get()
found = None
if url not in seen_cats:
found = mirror(url, args)
if args.recurse:
for url in found:
print(f'Scheduling discovered category {url}')
if __name__ == "__main__":
if errors:
print('Downloading got errors:')
for url, filename, message in errors:
print(f'{url} ({filename}): {message}')