You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
212 lines
7.0 KiB
212 lines
7.0 KiB
import argparse |
|
import functools |
|
import os |
|
import os.path |
|
import pickle |
|
import queue |
|
import urllib |
|
import urllib.parse |
|
import zipfile |
|
|
|
import requests |
|
import requests.exceptions |
|
from bs4 import BeautifulSoup |
|
from frozendict import frozendict |
|
|
|
def parse_args(*args): |
|
parser = argparse.ArgumentParser(description="Mirror simfiles from ZIV") |
|
|
|
parser.add_argument('categories', type=str, nargs='*', |
|
help='ZIV category pages to mirror', |
|
default=['https://zenius-i-vanisher.com/v5.2/simfiles.php?category=latest20official']) |
|
|
|
parser.add_argument('--songdir', type=str, help="Directory to keep songs in", default="songs") |
|
parser.add_argument('--zipdir', type=str, help="Directory to keep downloaded zip files in", default="zips") |
|
|
|
parser.add_argument('--recurse', '-r', |
|
help='Recursively fetch the main categories for each song', |
|
action='store_true') |
|
|
|
feature = parser.add_mutually_exclusive_group(required=False) |
|
feature.add_argument('--dry-run', '-n', |
|
help="Only perform a dry run; don't download any files", |
|
dest='dry_run', action='store_true') |
|
feature.add_argument('--no-dry-run', |
|
help="Dwonload all files", |
|
dest='dry_run', action='store_false') |
|
feature.set_defaults(dry_run=False) |
|
|
|
return parser.parse_args(*args) |
|
|
|
errors = [] |
|
|
|
@functools.lru_cache() |
|
def retrieve(url, filename, save_headers=None, extract=None, **kwargs): |
|
print(f'Downloading {url} -> {filename}') |
|
remove = False |
|
|
|
def record_error(message): |
|
errors.append((url, filename, message)) |
|
print(message) |
|
|
|
try: |
|
req = requests.get(url, **kwargs, stream=True) |
|
if req.status_code == 200: |
|
with open(f'{filename}.part', 'wb') as output: |
|
for chunk in req.iter_content(1024): |
|
output.write(chunk) |
|
|
|
os.replace(f'{filename}.part', filename) |
|
|
|
if extract: |
|
with zipfile.ZipFile(filename, 'r') as zip: |
|
print(f'Extracting into {extract}') |
|
os.makedirs(extract, exist_ok=True) |
|
zip.extractall(extract) |
|
|
|
if save_headers: |
|
with open(save_headers, 'wb') as data: |
|
pickle.dump(req.headers, data) |
|
elif req.status_code == 304: |
|
print("Not modified") |
|
else: |
|
record_error(f"Error: {req.status_code} {req.text}") |
|
except requests.exceptions.BaseHTTPError as e: |
|
record_error(f'Error downloading: {e.msg}') |
|
except zipfile.BadZipFile: |
|
record_error(f'Not a zip file: {filename}') |
|
remove = True |
|
except KeyboardInterrupt as e: |
|
record_error('Download aborting...') |
|
remove = True |
|
raise e |
|
except Exception as e: |
|
record_error(f'Unhandled error: {e}') |
|
remove = True |
|
finally: |
|
if remove: |
|
if os.path.isfile(filename): |
|
print(f'Removing {filename}') |
|
os.remove(filename) |
|
if save_headers and os.path.isfile(save_headers): |
|
print(f'Removing {save_headers}') |
|
os.remove(save_headers) |
|
|
|
return req.headers |
|
|
|
|
|
@functools.lru_cache() |
|
def get_page(cat_url): |
|
request = requests.get(cat_url) |
|
return BeautifulSoup(request.text, features="html.parser") |
|
|
|
|
|
def load_prev_headers(filename, header_file): |
|
req_headers = {} |
|
if os.path.isfile(header_file) and os.path.isfile(filename): |
|
with open(header_file, 'rb') as data: |
|
prev_headers = pickle.load(data) |
|
if 'etag' in prev_headers: |
|
req_headers['If-None-Match'] = prev_headers['etag'] |
|
if 'last-modified' in prev_headers: |
|
req_headers['If-Modified-Since'] = prev_headers['last-modified'] |
|
return req_headers |
|
|
|
|
|
def mirror(cat_url, args): |
|
page = get_page(cat_url) |
|
|
|
group_urls = {} |
|
|
|
if 'viewsimfilecategory.php' in cat_url: |
|
simgroup = page.find('div', {'class': 'headertop'}).h1 |
|
group_url = cat_url |
|
else: |
|
simgroup = None |
|
|
|
for row in page.find_all('tr'): |
|
simfile = row.find( |
|
"a", href=lambda href: href and "viewsimfile.php" in href) |
|
group_link = row.find( |
|
"a", href=lambda href: href and "viewsimfilecategory.php" in href) |
|
if group_link: |
|
simgroup = group_link |
|
group_url = group_link['href'] |
|
|
|
if not (simfile and simgroup): |
|
continue |
|
|
|
songname = ' '.join(simfile.get_text().replace('/', '-').split()) |
|
groupname = ' '.join(simgroup.get_text().replace('/', '-').split()) |
|
|
|
print(f"collection: '{groupname}' simfile: '{songname}'") |
|
|
|
simlink = simfile['href'] |
|
try: |
|
sim_id = urllib.parse.parse_qs(urllib.parse.urlparse( |
|
simfile['href']).query)['simfileid'][0] |
|
except KeyError: |
|
print(f"WARNING: no simfileid found on URL {simlink}") |
|
continue |
|
|
|
group_urls[groupname] = urllib.parse.urljoin(cat_url, group_url) |
|
|
|
url = f'https://zenius-i-vanisher.com/v5.2/download.php?type=ddrsimfile&simfileid={sim_id}' |
|
|
|
if args.dry_run: |
|
print(f"Dry run requested, not downloading {url}") |
|
continue |
|
|
|
filename = os.path.join(args.zipdir, f'{sim_id}.zip') |
|
headers = os.path.join(args.zipdir, f'{sim_id}.headers') |
|
req_headers = load_prev_headers(filename, headers) |
|
|
|
retrieve(url, filename, extract=os.path.join(args.songdir, groupname), |
|
headers=frozendict(req_headers), save_headers=headers) |
|
|
|
for groupname, group_url in group_urls.items(): |
|
page = get_page(group_url) |
|
banner_urls = {urllib.parse.urljoin(group_url, banner['src']) |
|
for banner in page.select('p.centre img') |
|
if 'simfileNoBanner.png' not in banner['src']} |
|
for url in banner_urls: |
|
filename = os.path.join(args.songdir, groupname, 'banner.png') |
|
headers = os.path.join(args.zipdir, f'{groupname}-banner.headers') |
|
req_headers = load_prev_headers(filename, headers) |
|
retrieve(url, filename, extract=False, |
|
headers=frozendict(req_headers), save_headers=headers) |
|
|
|
return group_urls.values() |
|
|
|
|
|
def main(): |
|
args = parse_args() |
|
|
|
os.makedirs(args.songdir, exist_ok=True) |
|
os.makedirs(args.zipdir, exist_ok=True) |
|
|
|
seen_cats = set() |
|
pending = queue.Queue() |
|
for url in args.categories: |
|
pending.put(url) |
|
|
|
while not pending.empty(): |
|
url = pending.get() |
|
found = None |
|
if url not in seen_cats: |
|
seen_cats.add(url) |
|
found = mirror(url, args) |
|
if args.recurse: |
|
for url in found: |
|
print(f'Scheduling discovered category {url}') |
|
pending.put(url) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
if errors: |
|
print('Downloading got errors:') |
|
for url, filename, message in errors: |
|
print(f'{url} ({filename}): {message}')
|
|
|