Add module ingest_jsonfile.

master
voussoir 2023-06-25 13:24:39 -07:00
parent 643386b88c
commit ed1b7dc4eb
3 changed files with 115 additions and 1 deletions

View File

@ -35,6 +35,10 @@ def get_wiki_gateway(args):
from timesearch_modules import get_wiki
get_wiki.get_wiki_argparse(args)
def ingest_jsonfile_gateway(args):
from timesearch_modules import ingest_jsonfile
ingest_jsonfile.ingest_jsonfile_argparse(args)
def livestream_gateway(args):
from timesearch_modules import livestream
livestream.livestream_argparse(args)
@ -208,6 +212,41 @@ def main(argv):
)
p_get_wiki.set_defaults(func=get_wiki_gateway)
# INGEST_JSONFILE
p_ingest_jsonfile = subparsers.add_parser(
'ingest_jsonfile',
description='''
This module was added after reddit's June 2023 API changes which
resulted in pushshift losing API access, and pushshift's own API was
disabled. The community has made archive files available for download.
These archive files contain 1 object (a submission or a comment) per
line in a JSON format.
You can ingest these into timesearch so that you can continue to use
timesearch's offline_reading or index features.
''',
)
p_ingest_jsonfile.add_argument(
'json_file',
help='''
Path to a file containing 1 json object per line. Each object must be
either a submission or a comment.
''',
)
p_ingest_jsonfile.add_argument(
'-r',
'--subreddit',
dest='subreddit',
default=None,
)
p_ingest_jsonfile.add_argument(
'-u',
'--user',
dest='username',
default=None,
)
p_ingest_jsonfile.set_defaults(func=ingest_jsonfile_gateway)
# LIVESTREAM
p_livestream = subparsers.add_parser(
'livestream',

View File

@ -0,0 +1,71 @@
import json
import time
import traceback
from voussoirkit import pathclass
from . import common
from . import exceptions
from . import pushshift
from . import tsdb
def is_submission(obj):
return (
obj.get('name', '').startswith('t3_')
or obj.get('over_18') is not None
)
def is_comment(obj):
return (
obj.get('name', '').startswith('t1_')
or obj.get('parent_id', '').startswith('t3_')
or obj.get('link_id', '').startswith('t3_')
)
def jsonfile_to_objects(filepath):
filepath = pathclass.Path(filepath)
filepath.assert_is_file()
with filepath.open('r', encoding='utf-8') as handle:
for line in handle:
line = line.strip()
if not line:
break
obj = json.loads(line)
if is_submission(obj):
yield pushshift.DummySubmission(**obj)
elif is_comment(obj):
yield pushshift.DummyComment(**obj)
else:
raise ValueError(f'Could not recognize object type {obj}.')
def ingest_jsonfile(
filepath,
subreddit=None,
username=None,
):
if not common.is_xor(subreddit, username):
raise exceptions.NotExclusive(['subreddit', 'username'])
if subreddit:
(database, subreddit) = tsdb.TSDB.for_subreddit(subreddit, fix_name=True)
elif username:
(database, username) = tsdb.TSDB.for_user(username, fix_name=True)
cur = database.sql.cursor()
objects = jsonfile_to_objects(filepath)
database.insert(objects)
cur.execute('SELECT COUNT(idint) FROM submissions')
submissioncount = cur.fetchone()[0]
cur.execute('SELECT COUNT(idint) FROM comments')
commentcount = cur.fetchone()[0]
print('Ended with %d submissions and %d comments in %s' % (submissioncount, commentcount, database.filepath.basename))
def ingest_jsonfile_argparse(args):
return ingest_jsonfile(
subreddit=args.subreddit,
username=args.username,
filepath=args.json_file,
)

View File

@ -326,7 +326,11 @@ class TSDB:
def insert(self, objects, commit=True):
if not isinstance(objects, (list, tuple, types.GeneratorType)):
objects = [objects]
log.debug('Trying to insert %d objects.', len(objects))
if isinstance(objects, types.GeneratorType):
log.debug('Trying to insert a generator of objects.')
else:
log.debug('Trying to insert %d objects.', len(objects))
new_values = {
'tsdb': self,