From ad87a45db11573de5cf47a5793a3ccdeb0982bd8 Mon Sep 17 00:00:00 2001 From: Ethan Dalool Date: Thu, 13 Feb 2020 20:13:13 -0800 Subject: [PATCH] Multithreaded updating with threadpool. --- droidz.py | 135 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 93 insertions(+), 42 deletions(-) diff --git a/droidz.py b/droidz.py index 957bb31..0c64340 100644 --- a/droidz.py +++ b/droidz.py @@ -12,6 +12,7 @@ import types from voussoirkit import pathclass from voussoirkit import ratelimiter from voussoirkit import sqlhelpers +from voussoirkit import threadpool from voussoirkit import winwhich CATEGORIES = [ @@ -57,7 +58,6 @@ HEADERS = { 'User-Agent': USERAGENT } -REQUEST_RATELIMITER = ratelimiter.Ratelimiter(allowance=1, period=1) DOWNLOAD_RATELIMITER = ratelimiter.Ratelimiter(allowance=1, period=5) WINRAR = winwhich.which('winrar') @@ -65,18 +65,19 @@ WINRAR = winwhich.which('winrar') def get_now(): return datetime.datetime.now(datetime.timezone.utc).timestamp() -def request(url): - REQUEST_RATELIMITER.limit() - response = requests.get(url, headers=HEADERS) - response.raise_for_status() - return response - def id_from_direct_url(direct_url): id = direct_url.split('/direct/')[-1] id = id.split('/')[0].split('?')[0] return id -def insert_id(id): +# DB FUNCTIONS +################################################################################ +def select_stick(id): + cur = sql.cursor() + cur.execute('SELECT * FROM sticks WHERE id == ?', [id]) + return cur.fetchone() + +def insert_id(id, commit=True): cur = sql.cursor() cur.execute('SELECT 1 FROM sticks WHERE id == ?', [id]) existing = cur.fetchone() @@ -88,14 +89,53 @@ def insert_id(id): query = f'INSERT INTO sticks VALUES({qmarks})' cur.execute(query, bindings) + if commit: + sql.commit() + status = types.SimpleNamespace(id=id, is_new=not existing) return status +def insert_ids(ids, commit=True): + for id in ids: + insert_id(id, commit=False) + + if commit: + sql.commit() + +def insert_stick(data, commit=True): + cur = sql.cursor() + + cur.execute('SELECT 1 FROM sticks WHERE id == ?', [data['id']]) + existing = cur.fetchone() + if existing: + (qmarks, bindings) = sqlhelpers.update_filler(data, 'id') + query = f'UPDATE sticks {qmarks}' + else: + (qmarks, bindings) = sqlhelpers.insert_filler(SQL_COLUMNS['sticks'], data) + query = f'INSERT INTO sticks VALUES({qmarks})' + + cur.execute(query, bindings) + + if commit: + sql.commit() + +def insert_sticks(datas, commit=True): + for data in datas: + insert_stick(data, commit=False) + + if commit: + sql.commit() + # SCRAPE ################################################################################ -def scrape_direct(id): - url = f'http://droidz.org/direct/{id}' +def request(url): print(url) + response = requests.get(url, headers=HEADERS) + response.raise_for_status() + return response + +def scrape_direct(id, commit=True): + url = f'http://droidz.org/direct/{id}' response = request(url) text = response.text @@ -143,22 +183,35 @@ def scrape_direct(id): 'retrieved': retrieved, } - insert_id(id) - - cur = sql.cursor() - (qmarks, bindings) = sqlhelpers.update_filler(data, 'id') - query = f'UPDATE sticks {qmarks}' - cur.execute(query, bindings) - sql.commit() - return data +def scrape_directs(ids, threads=1, commit=True): + if threads < 1: + raise ValueError(threads) + + if threads == 1: + for id in ids: + yield scrape_direct(id) + + else: + pool = threadpool.ThreadPool(size=threads) + kwargss = [ + {'function': scrape_direct, 'args': [id], 'name': id} + for id in ids + ] + jobs = pool.add_many(kwargss) + while jobs: + job = jobs.pop(0) + job.join() + if job.exception: + raise job.exception + yield job.value + def scrape_category(category): page = 1 all_directs = set() while True: url = f'http://droidz.org/stickmain/{category}.php?page={page}' - print(url) response = request(url) soup = bs4.BeautifulSoup(response.text, 'html.parser') this_directs = soup.find_all('a', href=re.compile(r'/direct/\d+')) @@ -169,16 +222,10 @@ def scrape_category(category): page += 1 for direct in this_directs: id = id_from_direct_url(direct['href']) - insert_id(id) - - sql.commit() - - print(f'Got {len(all_directs)} directs.') - return all_directs + yield id def scrape_latest(): url = 'http://droidz.org/stickmain/' - print(url) response = request(url) soup = bs4.BeautifulSoup(response.text, 'html.parser') h2s = soup.find_all('h2') @@ -191,39 +238,42 @@ def scrape_latest(): directs = div.find_all('a', href=re.compile(r'/direct/\d+')) for direct in directs: id = id_from_direct_url(direct['href']) - status = insert_id(id) - if not status.is_new: - break - - return status + yield id # UPDATE ################################################################################ -def incremental_update(): - status = scrape_latest() +def incremental_update(threads=1): + latest_ids = scrape_latest() + for id in latest_ids: + status = insert_id(id, commit=False) + if status.is_new: print('The Latest box didn\'t contain everything.') print('Need to check the categories for new sticks.') for category in CATEGORIES: - scrape_category(category) + ids = scrape_category(category) + insert_ids(ids) + else: + print('No new sticks for incremental update.') cur = sql.cursor() cur.execute('SELECT id FROM sticks WHERE retrieved IS NULL') ids = [row[0] for row in cur.fetchall()] - for id in ids: - scrape_direct(id) + sticks = scrape_directs(ids, threads=threads) + insert_sticks(sticks) -def full_update(): +def full_update(threads=1): for category in CATEGORIES: - scrape_category(category) + ids = scrape_category(category) + insert_ids(ids) cur = sql.cursor() cur.execute('SELECT id FROM sticks ORDER BY retrieved ASC') ids = [row[0] for row in cur.fetchall()] - for id in ids: - scrape_direct(id) + sticks = scrape_directs(ids, threads=threads) + insert_sticks(sticks) # DOWNLOAD ################################################################################ @@ -319,9 +369,9 @@ DOCSTRING = betterhelp.add_previews(DOCSTRING, SUB_DOCSTRINGS) def update_argparse(args): if args.full: - return full_update() + return full_update(threads=args.threads) else: - return incremental_update() + return incremental_update(threads=args.threads) def download_argparse(args): if args.extract and not WINRAR: @@ -337,6 +387,7 @@ subparsers = parser.add_subparsers() p_update = subparsers.add_parser('update') p_update.add_argument('--full', dest='full', action='store_true') +p_update.add_argument('--threads', dest='threads', type=int, default=1) p_update.set_defaults(func=update_argparse) p_download = subparsers.add_parser('download')