1
0
Fork 0
hnarchive/hnarchive.py

436 lines
13 KiB
Python

import argparse
import logging
import requests
import sqlite3
import sys
import time
from voussoirkit import backoff
from voussoirkit import betterhelp
from voussoirkit import ratelimiter
from voussoirkit import sqlhelpers
from voussoirkit import threadpool
from voussoirkit import vlogging
log = vlogging.getLogger('hnarchive')
VERSION = 1
HEADERS = {
'User-Agent': f'voussoir/hnarchive v{VERSION}.',
}
session = requests.Session()
session.headers.update(HEADERS)
DB_INIT = '''
PRAGMA user_version = 1;
CREATE TABLE IF NOT EXISTS items(
id INT PRIMARY KEY NOT NULL,
deleted INT,
type TEXT,
author TEXT,
time INT,
text TEXT,
dead INT,
parent TEXT,
poll TEXT,
url TEXT,
score INT,
title TEXT,
descendants INT,
retrieved INT
);
CREATE INDEX IF NOT EXISTS index_items_id on items(id);
CREATE INDEX IF NOT EXISTS index_items_parent on items(parent);
CREATE INDEX IF NOT EXISTS index_items_time on items(time);
CREATE INDEX IF NOT EXISTS index_items_type_time on items(type, time);
CREATE INDEX IF NOT EXISTS index_items_age_at_retrieval on items(retrieved - time);
'''
COLUMNS = sqlhelpers.extract_table_column_map(DB_INIT)
ITEMS_COLUMNS = COLUMNS['items']
sql = sqlite3.connect('hnarchive.db')
sql.executescript(DB_INIT)
# HELPERS ##########################################################################################
def ctrlc_commit(function):
def wrapped(*args, **kwargs):
try:
function(*args, **kwargs)
except KeyboardInterrupt:
commit()
return wrapped
def int_or_none(x):
if x is None:
return x
return int(x)
# API ##############################################################################################
def get(url, retries=1):
start_time = time.time()
bo = backoff.Quadratic(a=0.2, b=0, c=1, max=10)
while retries > 0:
log.loud(url)
try:
response = session.get(url, timeout=2)
response.raise_for_status()
break
except requests.exceptions.HTTPError as exc:
if exc.response.status_code == 429:
pass
elif 400 <= exc.response.status_code <= 499:
raise
retries -= 1
log.loud('Request failed, %d tries remain.', retries)
time.sleep(bo.next())
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
log.loud('Request failed, %d tries remain.', retries)
time.sleep(bo.next())
end_time = time.time()
log.loud('%s took %s.', url, end_time - start_time)
return response
def get_item(id):
url = f'https://hacker-news.firebaseio.com/v0/item/{id}.json'
response = get(url, retries=8)
item = response.json()
if item is None:
return None
if 'time' not in item:
# For example, 78692 from the api shows {"id": 78692, "type": "story"},
# but the web says "No such item."
# https://hacker-news.firebaseio.com/v0/item/78692.json
# https://news.ycombinator.com/item?id=78692
return None
return item
def get_items(ids, threads=None):
if threads:
return get_items_multithreaded(ids, threads)
else:
return get_items_singlethreaded(ids)
def get_items_multithreaded(ids, threads):
pool = threadpool.ThreadPool(threads, paused=True)
job_gen = ({'function': get_item, 'kwargs': {'id': id}} for id in ids)
pool.add_generator(job_gen)
for job in pool.result_generator(buffer_size=250):
if job.exception:
raise job.exception
if job.value is not None:
yield job.value
def get_items_singlethreaded(ids):
for id in ids:
item = get_item(id)
if item is not None:
yield item
def get_latest_id():
url = 'https://hacker-news.firebaseio.com/v0/maxitem.json'
response = get(url)
latest_id = int(response.text)
return latest_id
def livestream():
bo = backoff.Linear(m=2, b=5, max=60)
id = select_latest_id() or 1
# missed_loops:
# Usually, livestream assumes that `item is None` means the requested item
# id hasn't been published yet. But, if that item is actually just deleted,
# we would be stuck waiting for it forever. missed_loops is used to
# ocassionally check get_latest_id to see if new items are available, so we
# know that the current id is really just deleted.
# Items are released in small batches of < ~10 at a time. It is important
# that the number in `latest > id+XXX` is big enough that we are sure the
# requested item is really dead and not just part of a fresh batch that
# beat our check in a race condition (consider that between the last
# iteration which triggered the check and the call to get_latest_id, the
# item we were waiting for is published in a new batch). I chose 50 because
# catching up with 50 items is not a big deal.
missed_loops = 0
while True:
item = get_item(id)
if item is None:
log.debug('%s does not exist yet.', id)
missed_loops += 1
if missed_loops % 5 == 0:
latest = get_latest_id()
if latest > (id+50):
log.debug('Skipping %s because future ids exist.', id)
id += 1
continue
time.sleep(bo.next())
continue
id += 1
missed_loops = 0
bo.rewind(2)
yield item
# DATABASE #########################################################################################
def commit():
log.info('Committing.')
sql.commit()
def insert_item(data):
id = data['id']
retrieved = int(time.time())
existing = select_item(id)
if existing is None:
row = {
'id': id,
'deleted': bool(data.get('deleted', False)),
'type': data['type'],
'author': data.get('by', None),
'time': int(data['time']),
'text': data.get('text', None),
'dead': bool(data.get('dead', False)),
'parent': data.get('parent', None),
'poll': data.get('poll', None),
'url': data.get('url', None),
'score': int_or_none(data.get('score', None)),
'title': data.get('title', None),
'descendants': int_or_none(data.get('descendants', None)),
'retrieved': retrieved,
}
log.info('Inserting item %s.', id)
(qmarks, bindings) = sqlhelpers.insert_filler(ITEMS_COLUMNS, row, require_all=True)
query = f'INSERT INTO items VALUES({qmarks})'
sql.execute(query, bindings)
log.loud('Inserted item %s.', id)
else:
row = {
'id': id,
'deleted': bool(data.get('deleted', False)),
'type': data['type'],
'author': data.get('by', existing.get('author', None)),
'time': int(data['time']),
'text': data.get('text', existing.get('text', None)),
'dead': bool(data.get('dead', False)),
'parent': data.get('parent', None),
'poll': data.get('poll', existing.get('poll', None)),
'url': data.get('url', existing.get('url', None)),
'score': int_or_none(data.get('score', existing.get('score', None))),
'title': data.get('title', existing.get('title', None)),
'descendants': int_or_none(data.get('descendants', None)),
'retrieved': retrieved,
}
log.info('Updating item %s.', id)
(qmarks, bindings) = sqlhelpers.update_filler(row, where_key='id')
query = f'UPDATE items {qmarks}'
sql.execute(query, bindings)
log.loud('Updated item %s.', id)
return {'row': row, 'is_new': existing is None}
def insert_items(items, commit_period=200):
ticker = 0
for item in items:
insert_item(item)
ticker = (ticker + 1) % commit_period
if ticker == 0:
commit()
commit()
def select_item(id):
cur = sql.execute('SELECT * FROM items WHERE id == ?', [id])
row = cur.fetchone()
if row is None:
return None
item = dict(zip(ITEMS_COLUMNS, row))
return item
def select_latest_id():
cur = sql.execute('SELECT id FROM items ORDER BY id DESC LIMIT 1')
row = cur.fetchone()
if row is None:
return None
return row[0]
# COMMAND LINE #####################################################################################
DOCSTRING = '''
hnarchive.py
============
{get}
{livestream}
{update}
{update_items}
TO SEE DETAILS ON EACH COMMAND, RUN
> hnarchive.py <command> --help
'''.lstrip()
SUB_DOCSTRINGS = dict(
get='''
get:
Get items between two IDs, inclusive.
flags:
--lower id:
Lower bound item ID.
--upper id:
Upper bound item ID.
--threads X:
Use X threads to download items. Default = 1 thread.
--commit_period X:
Commit the database after every X insertions. Default = 200.
'''.strip(),
livestream='''
livestream:
Watch for new items in an infinite loop.
flags:
--commit_period X:
Commit the database after every X insertions. Default = 200.
'''.strip(),
update='''
update:
Get new items, from the highest ID in the database to the present.
flags:
--threads X:
Use X threads to download items. Default = 1 thread.
--commit_period X:
Commit the database after every X insertions. Default = 200.
'''.strip(),
update_items='''
update_items:
Redownload items to update their scores, descendant counts, etc.
flags:
--days X:
Update items where the retrieval date is less than X days ahead of the
submission date.
Stories are only open for comments for 14 days, so the `descendants`
count of any story younger than 14 days should be considered volatile.
It seems the upvote button does not disappear at any age, though I
don't know whether votes on old submissions will actually count.
Regardless, votes and comments tend to solidify within a day or two
after submission so a small number should be sufficient.
--threads X:
Use X threads to download items. Default = 1 thread.
--only_mature:
If True, only update items where the submission date is more than 14
days ago. Without this, you will be updating items which are very close
to the present time, an effort which you may find wasteful.
--commit_period X:
Commit the database after every X insertions. Default = 200.
'''.strip(),
)
DOCSTRING = betterhelp.add_previews(DOCSTRING, SUB_DOCSTRINGS)
@ctrlc_commit
def get_argparse(args):
lower = args.lower or 1
upper = args.upper or get_latest_id()
ids = range(lower, upper+1)
items = get_items(ids, threads=args.threads)
insert_items(items, commit_period=args.commit_period)
@ctrlc_commit
def livestream_argparse(args):
insert_items(livestream(), commit_period=args.commit_period)
@ctrlc_commit
def update_argparse(args):
while True:
lower = select_latest_id() or 1
upper = get_latest_id()
if lower == upper:
break
ids = range(lower, upper+1)
items = get_items(ids, threads=args.threads)
insert_items(items, commit_period=args.commit_period)
@ctrlc_commit
def update_items_argparse(args):
seconds = args.days * 86400
if args.only_mature:
then = time.time() - (86400 * 14)
query = 'SELECT id FROM items WHERE retrieved - time <= ? AND time < ?'
bindings = [seconds, then]
else:
query = 'SELECT id FROM items WHERE retrieved - time <= ?'
bindings = [seconds]
cur = sql.execute(query, bindings)
ids = cur.fetchall()
log.info('Updating %d items.', len(ids))
if not ids:
return
ids = [id for (id,) in ids]
items = get_items(ids, threads=args.threads)
insert_items(items, commit_period=args.commit_period)
def main(argv):
argv = vlogging.set_level_by_argv(log, argv)
parser = argparse.ArgumentParser(description=__doc__)
subparsers = parser.add_subparsers()
p_get = subparsers.add_parser('get')
p_get.add_argument('--lower', type=int, default=None)
p_get.add_argument('--upper', type=int, default=None)
p_get.add_argument('--threads', type=int, default=None)
p_get.add_argument('--commit_period', '--commit-period', type=int, default=200)
p_get.set_defaults(func=get_argparse)
p_livestream = subparsers.add_parser('livestream')
p_livestream.add_argument('--commit_period', '--commit-period', type=int, default=200)
p_livestream.set_defaults(func=livestream_argparse)
p_update = subparsers.add_parser('update')
p_update.add_argument('--threads', type=int, default=None)
p_update.add_argument('--commit_period', '--commit-period', type=int, default=200)
p_update.set_defaults(func=update_argparse)
p_update_items = subparsers.add_parser('update_items', aliases=['update-items'])
p_update_items.add_argument('--days', type=float, required=True)
p_update_items.add_argument('--threads', type=int, default=None)
p_update_items.add_argument('--only_mature', '--only-mature', action='store_true')
p_update_items.add_argument('--commit_period', '--commit-period', type=int, default=200)
p_update_items.set_defaults(func=update_items_argparse)
return betterhelp.subparser_main(
argv,
parser,
main_docstring=DOCSTRING,
sub_docstrings=SUB_DOCSTRINGS,
)
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))