Multithreaded updating with threadpool.

This commit is contained in:
Ethan Dalool 2020-02-13 20:13:13 -08:00
parent 2ee9d072fe
commit ad87a45db1

135
droidz.py
View file

@ -12,6 +12,7 @@ import types
from voussoirkit import pathclass from voussoirkit import pathclass
from voussoirkit import ratelimiter from voussoirkit import ratelimiter
from voussoirkit import sqlhelpers from voussoirkit import sqlhelpers
from voussoirkit import threadpool
from voussoirkit import winwhich from voussoirkit import winwhich
CATEGORIES = [ CATEGORIES = [
@ -57,7 +58,6 @@ HEADERS = {
'User-Agent': USERAGENT 'User-Agent': USERAGENT
} }
REQUEST_RATELIMITER = ratelimiter.Ratelimiter(allowance=1, period=1)
DOWNLOAD_RATELIMITER = ratelimiter.Ratelimiter(allowance=1, period=5) DOWNLOAD_RATELIMITER = ratelimiter.Ratelimiter(allowance=1, period=5)
WINRAR = winwhich.which('winrar') WINRAR = winwhich.which('winrar')
@ -65,18 +65,19 @@ WINRAR = winwhich.which('winrar')
def get_now(): def get_now():
return datetime.datetime.now(datetime.timezone.utc).timestamp() return datetime.datetime.now(datetime.timezone.utc).timestamp()
def request(url):
REQUEST_RATELIMITER.limit()
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
return response
def id_from_direct_url(direct_url): def id_from_direct_url(direct_url):
id = direct_url.split('/direct/')[-1] id = direct_url.split('/direct/')[-1]
id = id.split('/')[0].split('?')[0] id = id.split('/')[0].split('?')[0]
return id return id
def insert_id(id): # DB FUNCTIONS
################################################################################
def select_stick(id):
cur = sql.cursor()
cur.execute('SELECT * FROM sticks WHERE id == ?', [id])
return cur.fetchone()
def insert_id(id, commit=True):
cur = sql.cursor() cur = sql.cursor()
cur.execute('SELECT 1 FROM sticks WHERE id == ?', [id]) cur.execute('SELECT 1 FROM sticks WHERE id == ?', [id])
existing = cur.fetchone() existing = cur.fetchone()
@ -88,14 +89,53 @@ def insert_id(id):
query = f'INSERT INTO sticks VALUES({qmarks})' query = f'INSERT INTO sticks VALUES({qmarks})'
cur.execute(query, bindings) cur.execute(query, bindings)
if commit:
sql.commit()
status = types.SimpleNamespace(id=id, is_new=not existing) status = types.SimpleNamespace(id=id, is_new=not existing)
return status return status
def insert_ids(ids, commit=True):
for id in ids:
insert_id(id, commit=False)
if commit:
sql.commit()
def insert_stick(data, commit=True):
cur = sql.cursor()
cur.execute('SELECT 1 FROM sticks WHERE id == ?', [data['id']])
existing = cur.fetchone()
if existing:
(qmarks, bindings) = sqlhelpers.update_filler(data, 'id')
query = f'UPDATE sticks {qmarks}'
else:
(qmarks, bindings) = sqlhelpers.insert_filler(SQL_COLUMNS['sticks'], data)
query = f'INSERT INTO sticks VALUES({qmarks})'
cur.execute(query, bindings)
if commit:
sql.commit()
def insert_sticks(datas, commit=True):
for data in datas:
insert_stick(data, commit=False)
if commit:
sql.commit()
# SCRAPE # SCRAPE
################################################################################ ################################################################################
def scrape_direct(id): def request(url):
url = f'http://droidz.org/direct/{id}'
print(url) print(url)
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
return response
def scrape_direct(id, commit=True):
url = f'http://droidz.org/direct/{id}'
response = request(url) response = request(url)
text = response.text text = response.text
@ -143,22 +183,35 @@ def scrape_direct(id):
'retrieved': retrieved, 'retrieved': retrieved,
} }
insert_id(id)
cur = sql.cursor()
(qmarks, bindings) = sqlhelpers.update_filler(data, 'id')
query = f'UPDATE sticks {qmarks}'
cur.execute(query, bindings)
sql.commit()
return data return data
def scrape_directs(ids, threads=1, commit=True):
if threads < 1:
raise ValueError(threads)
if threads == 1:
for id in ids:
yield scrape_direct(id)
else:
pool = threadpool.ThreadPool(size=threads)
kwargss = [
{'function': scrape_direct, 'args': [id], 'name': id}
for id in ids
]
jobs = pool.add_many(kwargss)
while jobs:
job = jobs.pop(0)
job.join()
if job.exception:
raise job.exception
yield job.value
def scrape_category(category): def scrape_category(category):
page = 1 page = 1
all_directs = set() all_directs = set()
while True: while True:
url = f'http://droidz.org/stickmain/{category}.php?page={page}' url = f'http://droidz.org/stickmain/{category}.php?page={page}'
print(url)
response = request(url) response = request(url)
soup = bs4.BeautifulSoup(response.text, 'html.parser') soup = bs4.BeautifulSoup(response.text, 'html.parser')
this_directs = soup.find_all('a', href=re.compile(r'/direct/\d+')) this_directs = soup.find_all('a', href=re.compile(r'/direct/\d+'))
@ -169,16 +222,10 @@ def scrape_category(category):
page += 1 page += 1
for direct in this_directs: for direct in this_directs:
id = id_from_direct_url(direct['href']) id = id_from_direct_url(direct['href'])
insert_id(id) yield id
sql.commit()
print(f'Got {len(all_directs)} directs.')
return all_directs
def scrape_latest(): def scrape_latest():
url = 'http://droidz.org/stickmain/' url = 'http://droidz.org/stickmain/'
print(url)
response = request(url) response = request(url)
soup = bs4.BeautifulSoup(response.text, 'html.parser') soup = bs4.BeautifulSoup(response.text, 'html.parser')
h2s = soup.find_all('h2') h2s = soup.find_all('h2')
@ -191,39 +238,42 @@ def scrape_latest():
directs = div.find_all('a', href=re.compile(r'/direct/\d+')) directs = div.find_all('a', href=re.compile(r'/direct/\d+'))
for direct in directs: for direct in directs:
id = id_from_direct_url(direct['href']) id = id_from_direct_url(direct['href'])
status = insert_id(id) yield id
if not status.is_new:
break
return status
# UPDATE # UPDATE
################################################################################ ################################################################################
def incremental_update(): def incremental_update(threads=1):
status = scrape_latest() latest_ids = scrape_latest()
for id in latest_ids:
status = insert_id(id, commit=False)
if status.is_new: if status.is_new:
print('The Latest box didn\'t contain everything.') print('The Latest box didn\'t contain everything.')
print('Need to check the categories for new sticks.') print('Need to check the categories for new sticks.')
for category in CATEGORIES: for category in CATEGORIES:
scrape_category(category) ids = scrape_category(category)
insert_ids(ids)
else:
print('No new sticks for incremental update.')
cur = sql.cursor() cur = sql.cursor()
cur.execute('SELECT id FROM sticks WHERE retrieved IS NULL') cur.execute('SELECT id FROM sticks WHERE retrieved IS NULL')
ids = [row[0] for row in cur.fetchall()] ids = [row[0] for row in cur.fetchall()]
for id in ids: sticks = scrape_directs(ids, threads=threads)
scrape_direct(id) insert_sticks(sticks)
def full_update(): def full_update(threads=1):
for category in CATEGORIES: for category in CATEGORIES:
scrape_category(category) ids = scrape_category(category)
insert_ids(ids)
cur = sql.cursor() cur = sql.cursor()
cur.execute('SELECT id FROM sticks ORDER BY retrieved ASC') cur.execute('SELECT id FROM sticks ORDER BY retrieved ASC')
ids = [row[0] for row in cur.fetchall()] ids = [row[0] for row in cur.fetchall()]
for id in ids: sticks = scrape_directs(ids, threads=threads)
scrape_direct(id) insert_sticks(sticks)
# DOWNLOAD # DOWNLOAD
################################################################################ ################################################################################
@ -319,9 +369,9 @@ DOCSTRING = betterhelp.add_previews(DOCSTRING, SUB_DOCSTRINGS)
def update_argparse(args): def update_argparse(args):
if args.full: if args.full:
return full_update() return full_update(threads=args.threads)
else: else:
return incremental_update() return incremental_update(threads=args.threads)
def download_argparse(args): def download_argparse(args):
if args.extract and not WINRAR: if args.extract and not WINRAR:
@ -337,6 +387,7 @@ subparsers = parser.add_subparsers()
p_update = subparsers.add_parser('update') p_update = subparsers.add_parser('update')
p_update.add_argument('--full', dest='full', action='store_true') p_update.add_argument('--full', dest='full', action='store_true')
p_update.add_argument('--threads', dest='threads', type=int, default=1)
p_update.set_defaults(func=update_argparse) p_update.set_defaults(func=update_argparse)
p_download = subparsers.add_parser('download') p_download = subparsers.add_parser('download')