import argparse import logging import requests import sqlite3 import sys import time from voussoirkit import backoff from voussoirkit import betterhelp from voussoirkit import ratelimiter from voussoirkit import sqlhelpers from voussoirkit import threadpool from voussoirkit import vlogging log = vlogging.getLogger('hnarchive') VERSION = 1 HEADERS = { 'User-Agent': f'voussoir/hnarchive v{VERSION}.', } DB_INIT = ''' PRAGMA user_version = 1; CREATE TABLE IF NOT EXISTS items( id INT PRIMARY KEY NOT NULL, deleted INT, type TEXT, author TEXT, time INT, text TEXT, dead INT, parent TEXT, poll TEXT, url TEXT, score INT, title TEXT, descendants INT, retrieved INT ); CREATE INDEX IF NOT EXISTS index_items_id on items(id); CREATE INDEX IF NOT EXISTS index_items_parent on items(parent); CREATE INDEX IF NOT EXISTS index_items_time on items(time); CREATE INDEX IF NOT EXISTS index_items_type_time on items(type, time); CREATE INDEX IF NOT EXISTS index_items_age_at_retrieval on items(retrieved - time); ''' COLUMNS = sqlhelpers.extract_table_column_map(DB_INIT) ITEMS_COLUMNS = COLUMNS['items'] sql = sqlite3.connect('hnarchive.db') sql.executescript(DB_INIT) # HELPERS ########################################################################################## def ctrlc_commit(function): def wrapped(*args, **kwargs): try: function(*args, **kwargs) except KeyboardInterrupt: commit() return wrapped def int_or_none(x): if x is None: return x return int(x) # API ############################################################################################## session = requests.Session() session.headers.update(HEADERS) def get(url, retries=1): start_time = time.time() bo = backoff.Quadratic(a=0.2, b=0, c=1, max=10) while retries > 0: log.loud(url) try: response = session.get(url, timeout=2) response.raise_for_status() break except requests.exceptions.HTTPError as exc: if exc.response.status_code == 429: pass elif 400 <= exc.response.status_code <= 499: raise retries -= 1 log.loud('Request failed, %d tries remain.', retries) time.sleep(bo.next()) except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout): log.loud('Request failed, %d tries remain.', retries) time.sleep(bo.next()) end_time = time.time() log.loud('%s took %s.', url, end_time - start_time) return response def get_item(id): url = f'https://hacker-news.firebaseio.com/v0/item/{id}.json' response = get(url, retries=8) item = response.json() if item is None: return None if 'time' not in item: # For example, 78692 from the api shows {"id": 78692, "type": "story"}, # but the web says "No such item." # https://hacker-news.firebaseio.com/v0/item/78692.json # https://news.ycombinator.com/item?id=78692 return None return item def get_items(ids, threads=None): if threads: return get_items_multithreaded(ids, threads) else: return get_items_singlethreaded(ids) def get_items_multithreaded(ids, threads): pool = threadpool.ThreadPool(threads, paused=True) job_gen = ({'function': get_item, 'kwargs': {'id': id}} for id in ids) pool.add_generator(job_gen) for job in pool.result_generator(buffer_size=250): if job.exception: raise job.exception if job.value is not None: yield job.value def get_items_singlethreaded(ids): for id in ids: item = get_item(id) if item is not None: yield item def get_latest_id(): url = 'https://hacker-news.firebaseio.com/v0/maxitem.json' response = get(url) latest_id = int(response.text) return latest_id def livestream(): bo = backoff.Linear(m=2, b=5, max=60) id = select_latest_id() or 1 # missed_loops: # Usually, livestream assumes that `item is None` means the requested item # id hasn't been published yet. But, if that item is actually just deleted, # we would be stuck waiting for it forever. missed_loops is used to # ocassionally check get_latest_id to see if new items are available, so we # know that the current id is really just deleted. # Items are released in small batches of < ~10 at a time. It is important # that the number in `latest > id+XXX` is big enough that we are sure the # requested item is really dead and not just part of a fresh batch that # beat our check in a race condition (consider that between the last # iteration which triggered the check and the call to get_latest_id, the # item we were waiting for is published in a new batch). I chose 50 because # catching up with 50 items is not a big deal. missed_loops = 0 while True: item = get_item(id) if item is None: log.debug('%s does not exist yet.', id) missed_loops += 1 if missed_loops % 5 == 0: latest = get_latest_id() if latest > (id+50): log.debug('Skipping %s because future ids exist.', id) id += 1 continue time.sleep(bo.next()) continue id += 1 missed_loops = 0 bo.rewind(2) yield item # DATABASE ######################################################################################### def commit(): log.info('Committing.') sql.commit() def insert_item(data): id = data['id'] retrieved = int(time.time()) existing = select_item(id) if existing is None: row = { 'id': id, 'deleted': bool(data.get('deleted', False)), 'type': data['type'], 'author': data.get('by', None), 'time': int(data['time']), 'text': data.get('text', None), 'dead': bool(data.get('dead', False)), 'parent': data.get('parent', None), 'poll': data.get('poll', None), 'url': data.get('url', None), 'score': int_or_none(data.get('score', None)), 'title': data.get('title', None), 'descendants': int_or_none(data.get('descendants', None)), 'retrieved': retrieved, } log.info('Inserting item %s.', id) (qmarks, bindings) = sqlhelpers.insert_filler(ITEMS_COLUMNS, row, require_all=True) query = f'INSERT INTO items VALUES({qmarks})' sql.execute(query, bindings) log.loud('Inserted item %s.', id) else: row = { 'id': id, 'deleted': bool(data.get('deleted', False)), 'type': data['type'], 'author': data.get('by', existing.get('author', None)), 'time': int(data['time']), 'text': data.get('text', existing.get('text', None)), 'dead': bool(data.get('dead', False)), 'parent': data.get('parent', None), 'poll': data.get('poll', existing.get('poll', None)), 'url': data.get('url', existing.get('url', None)), 'score': int_or_none(data.get('score', existing.get('score', None))), 'title': data.get('title', existing.get('title', None)), 'descendants': int_or_none(data.get('descendants', None)), 'retrieved': retrieved, } log.info('Updating item %s.', id) (qmarks, bindings) = sqlhelpers.update_filler(row, where_key='id') query = f'UPDATE items {qmarks}' sql.execute(query, bindings) log.loud('Updated item %s.', id) return {'row': row, 'is_new': existing is None} def insert_items(items, commit_period=200): ticker = 0 for item in items: insert_item(item) ticker = (ticker + 1) % commit_period if ticker == 0: commit() commit() def select_item(id): cur = sql.execute('SELECT * FROM items WHERE id == ?', [id]) row = cur.fetchone() if row is None: return None item = dict(zip(ITEMS_COLUMNS, row)) return item def select_latest_id(): cur = sql.execute('SELECT id FROM items ORDER BY id DESC LIMIT 1') row = cur.fetchone() if row is None: return None return row[0] # COMMAND LINE ##################################################################################### DOCSTRING = ''' hnarchive.py ============ {get} {livestream} {update} {update_items} TO SEE DETAILS ON EACH COMMAND, RUN > hnarchive.py --help '''.lstrip() SUB_DOCSTRINGS = dict( get=''' get: Get items between two IDs, inclusive. flags: --lower id: Lower bound item ID. --upper id: Upper bound item ID. --threads X: Use X threads to download items. Default = 1 thread. --commit_period X: Commit the database after every X insertions. Default = 200. '''.strip(), livestream=''' livestream: Watch for new items in an infinite loop. flags: --commit_period X: Commit the database after every X insertions. Default = 200. '''.strip(), update=''' update: Get new items, from the highest ID in the database to the present. flags: --threads X: Use X threads to download items. Default = 1 thread. --commit_period X: Commit the database after every X insertions. Default = 200. '''.strip(), update_items=''' update_items: Redownload items to update their scores, descendant counts, etc. flags: --days X: Update items where the retrieval date is less than X days ahead of the submission date. Stories are only open for comments for 14 days, so the `descendants` count of any story younger than 14 days should be considered volatile. It seems the upvote button does not disappear at any age, though I don't know whether votes on old submissions will actually count. Regardless, votes and comments tend to solidify within a day or two after submission so a small number should be sufficient. --threads X: Use X threads to download items. Default = 1 thread. --only_mature: If True, only update items where the submission date is more than 14 days ago. Without this, you will be updating items which are very close to the present time, an effort which you may find wasteful. --commit_period X: Commit the database after every X insertions. Default = 200. '''.strip(), ) DOCSTRING = betterhelp.add_previews(DOCSTRING, SUB_DOCSTRINGS) @ctrlc_commit def get_argparse(args): lower = args.lower or 1 upper = args.upper or get_latest_id() ids = range(lower, upper+1) items = get_items(ids, threads=args.threads) insert_items(items, commit_period=args.commit_period) @ctrlc_commit def livestream_argparse(args): insert_items(livestream(), commit_period=args.commit_period) @ctrlc_commit def update_argparse(args): while True: lower = select_latest_id() or 1 upper = get_latest_id() if lower == upper: break ids = range(lower, upper+1) items = get_items(ids, threads=args.threads) insert_items(items, commit_period=args.commit_period) @ctrlc_commit def update_items_argparse(args): seconds = args.days * 86400 if args.only_mature: then = time.time() - (86400 * 14) query = 'SELECT id FROM items WHERE retrieved - time <= ? AND time < ?' bindings = [seconds, then] else: query = 'SELECT id FROM items WHERE retrieved - time <= ?' bindings = [seconds] cur = sql.execute(query, bindings) ids = cur.fetchall() log.info('Updating %d items.', len(ids)) ids = [id for (id,) in ids] items = get_items(ids, threads=args.threads) insert_items(items, commit_period=args.commit_period) def main(argv): argv = vlogging.set_level_by_argv(log, argv) parser = argparse.ArgumentParser(description=__doc__) subparsers = parser.add_subparsers() p_get = subparsers.add_parser('get') p_get.add_argument('--lower', type=int, default=None) p_get.add_argument('--upper', type=int, default=None) p_get.add_argument('--threads', type=int, default=None) p_get.add_argument('--commit_period', '--commit-period', type=int, default=200) p_get.set_defaults(func=get_argparse) p_livestream = subparsers.add_parser('livestream') p_livestream.add_argument('--commit_period', '--commit-period', type=int, default=200) p_livestream.set_defaults(func=livestream_argparse) p_update = subparsers.add_parser('update') p_update.add_argument('--threads', type=int, default=None) p_update.add_argument('--commit_period', '--commit-period', type=int, default=200) p_update.set_defaults(func=update_argparse) p_update_items = subparsers.add_parser('update_items', aliases=['update-items']) p_update_items.add_argument('--days', type=float, required=True) p_update_items.add_argument('--threads', type=int, default=None) p_update_items.add_argument('--only_mature', '--only-mature', action='store_true') p_update_items.add_argument('--commit_period', '--commit-period', type=int, default=200) p_update_items.set_defaults(func=update_items_argparse) return betterhelp.subparser_main( argv, parser, main_docstring=DOCSTRING, sub_docstrings=SUB_DOCSTRINGS, ) if __name__ == '__main__': raise SystemExit(main(sys.argv[1:]))