Je kunt niet meer dan 25 onderwerpen selecteren
Onderwerpen moeten beginnen met een letter of nummer, kunnen streepjes bevatten ('-') en kunnen maximaal 35 tekens lang zijn.
212 regels
7.0 KiB
212 regels
7.0 KiB
import argparse
|
|
import functools
|
|
import os
|
|
import os.path
|
|
import pickle
|
|
import queue
|
|
import urllib
|
|
import urllib.parse
|
|
import zipfile
|
|
|
|
import requests
|
|
import requests.exceptions
|
|
from bs4 import BeautifulSoup
|
|
from frozendict import frozendict
|
|
|
|
def parse_args(*args):
|
|
parser = argparse.ArgumentParser(description="Mirror simfiles from ZIV")
|
|
|
|
parser.add_argument('categories', type=str, nargs='*',
|
|
help='ZIV category pages to mirror',
|
|
default=['https://zenius-i-vanisher.com/v5.2/simfiles.php?category=latest20official'])
|
|
|
|
parser.add_argument('--songdir', type=str, help="Directory to keep songs in", default="songs")
|
|
parser.add_argument('--zipdir', type=str, help="Directory to keep downloaded zip files in", default="zips")
|
|
|
|
parser.add_argument('--recurse', '-r',
|
|
help='Recursively fetch the main categories for each song',
|
|
action='store_true')
|
|
|
|
feature = parser.add_mutually_exclusive_group(required=False)
|
|
feature.add_argument('--dry-run', '-n',
|
|
help="Only perform a dry run; don't download any files",
|
|
dest='dry_run', action='store_true')
|
|
feature.add_argument('--no-dry-run',
|
|
help="Dwonload all files",
|
|
dest='dry_run', action='store_false')
|
|
feature.set_defaults(dry_run=False)
|
|
|
|
return parser.parse_args(*args)
|
|
|
|
errors = []
|
|
|
|
@functools.lru_cache()
|
|
def retrieve(url, filename, save_headers=None, extract=None, **kwargs):
|
|
print(f'Downloading {url} -> {filename}')
|
|
remove = False
|
|
|
|
def record_error(message):
|
|
errors.append((url, filename, message))
|
|
print(message)
|
|
|
|
try:
|
|
req = requests.get(url, **kwargs, stream=True)
|
|
if req.status_code == 200:
|
|
with open(f'{filename}.part', 'wb') as output:
|
|
for chunk in req.iter_content(1024):
|
|
output.write(chunk)
|
|
|
|
os.replace(f'{filename}.part', filename)
|
|
|
|
if extract:
|
|
with zipfile.ZipFile(filename, 'r') as zip:
|
|
print(f'Extracting into {extract}')
|
|
os.makedirs(extract, exist_ok=True)
|
|
zip.extractall(extract)
|
|
|
|
if save_headers:
|
|
with open(save_headers, 'wb') as data:
|
|
pickle.dump(req.headers, data)
|
|
elif req.status_code == 304:
|
|
print("Not modified")
|
|
else:
|
|
record_error(f"Error: {req.status_code} {req.text}")
|
|
except requests.exceptions.BaseHTTPError as e:
|
|
record_error(f'Error downloading: {e.msg}')
|
|
except zipfile.BadZipFile:
|
|
record_error(f'Not a zip file: {filename}')
|
|
remove = True
|
|
except KeyboardInterrupt as e:
|
|
record_error('Download aborting...')
|
|
remove = True
|
|
raise e
|
|
except Exception as e:
|
|
record_error(f'Unhandled error: {e}')
|
|
remove = True
|
|
finally:
|
|
if remove:
|
|
if os.path.isfile(filename):
|
|
print(f'Removing {filename}')
|
|
os.remove(filename)
|
|
if save_headers and os.path.isfile(save_headers):
|
|
print(f'Removing {save_headers}')
|
|
os.remove(save_headers)
|
|
|
|
return req.headers
|
|
|
|
|
|
@functools.lru_cache()
|
|
def get_page(cat_url):
|
|
request = requests.get(cat_url)
|
|
return BeautifulSoup(request.text, features="html.parser")
|
|
|
|
|
|
def load_prev_headers(filename, header_file):
|
|
req_headers = {}
|
|
if os.path.isfile(header_file) and os.path.isfile(filename):
|
|
with open(header_file, 'rb') as data:
|
|
prev_headers = pickle.load(data)
|
|
if 'etag' in prev_headers:
|
|
req_headers['If-None-Match'] = prev_headers['etag']
|
|
if 'last-modified' in prev_headers:
|
|
req_headers['If-Modified-Since'] = prev_headers['last-modified']
|
|
return req_headers
|
|
|
|
|
|
def mirror(cat_url, args):
|
|
page = get_page(cat_url)
|
|
|
|
group_urls = {}
|
|
|
|
if 'viewsimfilecategory.php' in cat_url:
|
|
simgroup = page.find('div', {'class': 'headertop'}).h1
|
|
group_url = cat_url
|
|
else:
|
|
simgroup = None
|
|
|
|
for row in page.find_all('tr'):
|
|
simfile = row.find(
|
|
"a", href=lambda href: href and "viewsimfile.php" in href)
|
|
group_link = row.find(
|
|
"a", href=lambda href: href and "viewsimfilecategory.php" in href)
|
|
if group_link:
|
|
simgroup = group_link
|
|
group_url = group_link['href']
|
|
|
|
if not (simfile and simgroup):
|
|
continue
|
|
|
|
songname = ' '.join(simfile.get_text().replace('/', '-').split())
|
|
groupname = ' '.join(simgroup.get_text().replace('/', '-').split())
|
|
|
|
print(f"collection: '{groupname}' simfile: '{songname}'")
|
|
|
|
simlink = simfile['href']
|
|
try:
|
|
sim_id = urllib.parse.parse_qs(urllib.parse.urlparse(
|
|
simfile['href']).query)['simfileid'][0]
|
|
except KeyError:
|
|
print(f"WARNING: no simfileid found on URL {simlink}")
|
|
continue
|
|
|
|
group_urls[groupname] = urllib.parse.urljoin(cat_url, group_url)
|
|
|
|
url = f'https://zenius-i-vanisher.com/v5.2/download.php?type=ddrsimfile&simfileid={sim_id}'
|
|
|
|
if args.dry_run:
|
|
print(f"Dry run requested, not downloading {url}")
|
|
continue
|
|
|
|
filename = os.path.join(args.zipdir, f'{sim_id}.zip')
|
|
headers = os.path.join(args.zipdir, f'{sim_id}.headers')
|
|
req_headers = load_prev_headers(filename, headers)
|
|
|
|
retrieve(url, filename, extract=os.path.join(args.songdir, groupname),
|
|
headers=frozendict(req_headers), save_headers=headers)
|
|
|
|
for groupname, group_url in group_urls.items():
|
|
page = get_page(group_url)
|
|
banner_urls = {urllib.parse.urljoin(group_url, banner['src'])
|
|
for banner in page.select('p.centre img')
|
|
if 'simfileNoBanner.png' not in banner['src']}
|
|
for url in banner_urls:
|
|
filename = os.path.join(args.songdir, groupname, 'banner.png')
|
|
headers = os.path.join(args.zipdir, f'{groupname}-banner.headers')
|
|
req_headers = load_prev_headers(filename, headers)
|
|
retrieve(url, filename, extract=False,
|
|
headers=frozendict(req_headers), save_headers=headers)
|
|
|
|
return group_urls.values()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
|
|
os.makedirs(args.songdir, exist_ok=True)
|
|
os.makedirs(args.zipdir, exist_ok=True)
|
|
|
|
seen_cats = set()
|
|
pending = queue.Queue()
|
|
for url in args.categories:
|
|
pending.put(url)
|
|
|
|
while not pending.empty():
|
|
url = pending.get()
|
|
found = None
|
|
if url not in seen_cats:
|
|
seen_cats.add(url)
|
|
found = mirror(url, args)
|
|
if args.recurse:
|
|
for url in found:
|
|
print(f'Scheduling discovered category {url}')
|
|
pending.put(url)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
if errors:
|
|
print('Downloading got errors:')
|
|
for url, filename, message in errors:
|
|
print(f'{url} ({filename}): {message}')
|
|
|