Use worms, add more ycli functions.

This commit is contained in:
voussoir 2021-10-15 21:00:04 -07:00
parent 0adeb55790
commit c22f20fcf8
No known key found for this signature in database
GPG key ID: 5F7554F8C26DACCB
15 changed files with 562 additions and 398 deletions

242
frontends/ycdl_cli.py Normal file
View file

@ -0,0 +1,242 @@
import argparse
import sys
import traceback
from voussoirkit import betterhelp
from voussoirkit import interactive
from voussoirkit import pipeable
from voussoirkit import vlogging
from voussoirkit import operatornotify
import ycdl
log = vlogging.getLogger(__name__, 'ycdl_cli')
# HELPERS ##########################################################################################
def closest_db():
return ycdl.ycdldb.YCDLDB.closest_ycdldb()
# ARGPARSE #########################################################################################
def add_channel_argparse(args):
ycdldb = closest_db()
ycdldb.add_channel(
channel_id=args.channel_id,
automark=args.automark,
download_directory=args.download_directory,
get_videos=args.get_videos,
name=args.name,
queuefile_extension=args.queuefile_extension,
)
if args.autoyes or interactive.getpermission('Commit?'):
ycdldb.commit()
return 0
def channel_list_argparse(args):
ycdldb = closest_db()
channels = sorted(ycdldb.get_channels(), key=lambda c: c.name.lower())
for channel in channels:
line = args.format.format(
automark=channel.automark,
autorefresh=channel.autorefresh,
id=channel.id,
name=channel.name,
queuefile_extension=channel.queuefile_extension,
uploads_playlist=channel.uploads_playlist,
)
pipeable.stdout(line)
return 0
def delete_channel_argparse(args):
ycdldb = closest_db()
for channel_id in args.channel_id:
channel = ycdldb.get_channel(channel_id)
channel.delete()
if args.autoyes or interactive.getpermission('Commit?'):
ycdldb.commit()
return 0
def init_argparse(args):
ycdldb = ycdl.ycdldb.YCDLDB(create=True)
ycdldb.commit()
pipeable.stdout(ycdldb.data_directory.absolute_path)
return 0
def refresh_channels_argparse(args):
needs_commit = False
status = 0
ycdldb = closest_db()
if args.channels:
channels = [ycdldb.get_channel(c) for c in args.channels]
for channel in channels:
try:
channel.refresh(force=args.force)
needs_commit = True
except Exception as exc:
log.warning(traceback.format_exc())
status = 1
else:
excs = ycdldb.refresh_all_channels(force=args.force, skip_failures=True)
needs_commit = True
if not needs_commit:
return status
if args.autoyes or interactive.getpermission('Commit?'):
ycdldb.commit()
return status
DOCSTRING = '''
YCDL CLI
========
{add_channel}
{channel_list}
{delete_channel}
{init}
{refresh_channels}
TO SEE DETAILS ON EACH COMMAND, RUN
> ycdl_cli.py <command> --help
'''
SUB_DOCSTRINGS = dict(
add_channel='''
add_channel:
Add a channel to the database.
> ycdl_cli.py add_channel channel_id <flags>
flags:
--automark X:
Set the channel's automark to this value, which should be 'pending',
'downloaded', or 'ignored'.
--download_directory X:
Set the channel's download directory to this path, which must
be a directory.
--name X:
Override the channel's own name with a name of your choosing.
--no_videos:
By default, the channel's videos will be fetched right away.
Add this argument if you don't want to do that yet.
--queuefile_extension X:
Set the queuefile extension for all videos downloaded from this channel.
Examples:
> ycdl_cli.py add_channel UCFhXFikryT4aFcLkLw2LBLA
'''.strip(),
channel_list='''
channel_list:
Print all channels in the database.
> ycdl_cli.py channel_list <flags>
--format X:
A string like "{id}: {name}" to format the attributes of the channel.
The available attributes are id, name, automark, autorefresh,
uploads_playlist queuefile_extension.
> ycdl_cli.py list_channels
'''.strip(),
delete_channel='''
delete_channel:
Delete a channel and all its videos from the database.
You can pass multiple channel IDs.
> ycdl_cli.py delete_channel channel_id [channel_id channel_id]
Examples:
> ycdl_cli.py delete_channel UCOYBuFGi8T3NM5fNAptCLCw
> ycdl_cli.py delete_channel UCOYBuFGi8T3NM5fNAptCLCw UCmu9PVIZBk-ZCi-Sk2F2utA UCEKJKJ3FO-9SFv5x5BzyxhQ
'''.strip(),
init='''
init:
Create a new YCDL database in the current directory.
> ycdl_cli.py init
'''.strip(),
refresh_channels='''
refresh_channels:
Refresh some or all channels in the database.
> ycdl_cli.py refresh_channels <flags>
flags:
--channels X Y Z:
Any number of channel IDs.
--force:
If omitted, only new videos are downloaded.
If included, channels are refreshed completely.
Examples:
> ycdl_cli.py refresh_channels --force
> ycdl_cli.py refresh_channels --channels UC1_uAIS3r8Vu6JjXWvastJg
'''
)
DOCSTRING = betterhelp.add_previews(DOCSTRING, SUB_DOCSTRINGS)
@operatornotify.main_decorator(subject='ycdl_cli')
@vlogging.main_decorator
def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
subparsers = parser.add_subparsers()
p_add_channel = subparsers.add_parser('add_channel', aliases=['add-channel'])
p_add_channel.add_argument('channel_id')
p_add_channel.add_argument('--automark', default='pending')
p_add_channel.add_argument('--download_directory', '--download-directory', default=None)
p_add_channel.add_argument('--name', default=None)
p_add_channel.add_argument('--no_videos', '--no-videos', dest='get_videos', action='store_false')
p_add_channel.add_argument('--queuefile_extension', '--queuefile-extension', default=None)
p_add_channel.add_argument('--yes', dest='autoyes', action='store_true')
p_add_channel.set_defaults(func=add_channel_argparse)
p_channel_list = subparsers.add_parser('channel_list', aliases=['channel-list'])
p_channel_list.add_argument('--format', default='{id}:{name}')
p_channel_list.set_defaults(func=channel_list_argparse)
p_delete_channel = subparsers.add_parser('delete_channel', aliases=['delete-channel'])
p_delete_channel.add_argument('channel_id', nargs='+')
p_delete_channel.add_argument('--yes', dest='autoyes', action='store_true')
p_delete_channel.set_defaults(func=delete_channel_argparse)
p_init = subparsers.add_parser('init')
p_init.set_defaults(func=init_argparse)
p_refresh_channels = subparsers.add_parser('refresh_channels', aliases=['refresh-channels'])
p_refresh_channels.add_argument('--channels', nargs='*')
p_refresh_channels.add_argument('--force', action='store_true')
p_refresh_channels.add_argument('--yes', dest='autoyes', action='store_true')
p_refresh_channels.set_defaults(func=refresh_channels_argparse)
try:
return betterhelp.subparser_main(argv, parser, DOCSTRING, SUB_DOCSTRINGS)
except ycdl.exceptions.NoClosestYCDLDB as exc:
pipeable.stderr(exc.error_message)
pipeable.stderr('Try `ycdl_cli.py init` to create the database.')
return 1
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -59,7 +59,7 @@ def after_request(response):
def init_ycdldb(*args, **kwargs):
global ycdldb
ycdldb = ycdl.ycdldb.YCDLDB(*args, **kwargs)
ycdldb = ycdl.ycdldb.YCDLDB.closest_ycdldb(*args, **kwargs)
def refresher_thread(rate):
while True:

View file

@ -115,7 +115,7 @@ def post_add_channel():
except ycdl.ytapi.ChannelNotFound:
return flasktools.json_response({}, status=404)
channel = common.ycdldb.add_channel(channel_id, get_videos=True)
channel = common.ycdldb.add_channel(channel_id, get_videos=True, commit=True)
return flasktools.json_response(channel.jsonify())
@site.route('/channel/<channel_id>/delete', methods=['POST'])
@ -125,7 +125,7 @@ def post_delete_channel(channel_id):
except ycdl.exceptions.NoSuchChannel as exc:
return flasktools.json_response(exc.jsonify(), status=404)
channel.delete()
channel.delete(commit=True)
return flasktools.json_response({})
@site.route('/channel/<channel_id>/refresh', methods=['POST'])
@ -137,14 +137,14 @@ def post_refresh_channel(channel_id):
except ycdl.exceptions.NoSuchChannel as exc:
return flasktools.json_response(exc.jsonify(), status=404)
channel.refresh(force=force)
channel.refresh(force=force, commit=True)
return flasktools.json_response(channel.jsonify())
@site.route('/refresh_all_channels', methods=['POST'])
def post_refresh_all_channels():
force = request.form.get('force', False)
force = stringtools.truthystring(force, False)
common.ycdldb.refresh_all_channels(force=force, skip_failures=True)
common.ycdldb.refresh_all_channels(force=force, skip_failures=True, commit=True)
return flasktools.json_response({})
@site.route('/channel/<channel_id>/set_automark', methods=['POST'])
@ -153,7 +153,7 @@ def post_set_automark(channel_id):
channel = common.ycdldb.get_channel(channel_id)
try:
channel.set_automark(state)
channel.set_automark(state, commit=True)
except ycdl.exceptions.InvalidVideoState:
flask.abort(400)
@ -167,7 +167,7 @@ def post_set_autorefresh(channel_id):
try:
autorefresh = stringtools.truthystring(autorefresh)
channel.set_autorefresh(autorefresh)
channel.set_autorefresh(autorefresh, commit=True)
except (ValueError, TypeError):
flask.abort(400)
@ -179,7 +179,7 @@ def post_set_download_directory(channel_id):
channel = common.ycdldb.get_channel(channel_id)
try:
channel.set_download_directory(download_directory)
channel.set_download_directory(download_directory, commit=True)
except pathclass.NotDirectory:
exc = {
'error_type': 'NOT_DIRECTORY',
@ -196,7 +196,7 @@ def post_set_queuefile_extension(channel_id):
extension = request.form['extension']
channel = common.ycdldb.get_channel(channel_id)
channel.set_queuefile_extension(extension)
channel.set_queuefile_extension(extension, commit=True)
response = {'queuefile_extension': channel.queuefile_extension}
return flasktools.json_response(response)

View file

@ -19,7 +19,7 @@ def post_mark_video_state():
for video_id in video_ids:
video = common.ycdldb.get_video(video_id)
video.mark_state(state, commit=False)
common.ycdldb.sql.commit()
common.ycdldb.commit()
except ycdl.exceptions.NoSuchVideo:
common.ycdldb.rollback()
@ -40,7 +40,7 @@ def post_start_download():
video_ids = video_ids.split(',')
for video_id in video_ids:
common.ycdldb.download_video(video_id, commit=False)
common.ycdldb.sql.commit()
common.ycdldb.commit()
except ycdl.ytapi.VideoNotFound:
common.ycdldb.rollback()

View file

@ -259,6 +259,8 @@ https://stackoverflow.com/a/35153397
</div> <!-- tab-videos -->
<div class="tab" data-tab-title="Settings">
<div>Channel ID: <code>{{channel.id}}</code></div>
<div>
<label><input type="checkbox" id="set_autorefresh_checkbox" {{"checked" if channel.autorefresh else ""}} onchange="return set_autorefresh_form(event);"/> Automatically refresh this channel regularly.</label>
<span id="set_autorefresh_spinner" class="hidden">Working...</span>

View file

@ -40,7 +40,6 @@ from voussoirkit import vlogging
log = vlogging.getLogger(__name__, 'ycdl_flask_dev')
import ycdl
import youtube_credentials
import backend
site = backend.site
@ -54,7 +53,6 @@ HTTPS_DIR = pathclass.Path(__file__).parent.with_child('https')
def ycdl_flask_launch(
*,
create,
localhost_only,
port,
refresh_rate,
@ -79,8 +77,12 @@ def ycdl_flask_launch(
if localhost_only:
site.localhost_only = True
youtube_core = ycdl.ytapi.Youtube(youtube_credentials.get_youtube_key())
backend.common.init_ycdldb(youtube_core, create=create)
try:
backend.common.init_ycdldb()
except ycdl.exceptions.NoClosestYCDLDB as exc:
log.error(exc.error_message)
log.error('Try `ycdl_cli.py init` to create the database.')
return 1
message = f'Starting server on port {port}, pid={os.getpid()}.'
if use_https:
@ -100,7 +102,6 @@ def ycdl_flask_launch(
def ycdl_flask_launch_argparse(args):
return ycdl_flask_launch(
create=args.create,
localhost_only=args.localhost_only,
port=args.port,
refresh_rate=args.refresh_rate,
@ -113,7 +114,6 @@ def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('port', nargs='?', type=int, default=5000)
parser.add_argument('--dont_create', '--dont-create', '--no-create', dest='create', action='store_false', default=True)
parser.add_argument('--https', dest='use_https', action='store_true', default=None)
parser.add_argument('--localhost_only', '--localhost-only', dest='localhost_only', action='store_true')
parser.add_argument('--refresh_rate', '--refresh-rate', dest='refresh_rate', type=int, default=None)

View file

@ -9,7 +9,6 @@ gunicorn ycdl_flask_prod:site --bind "0.0.0.0:PORT" --access-logfile "-"
import werkzeug.middleware.proxy_fix
import ycdl
import youtube_credentials
from ycdl_flask import backend
@ -19,6 +18,5 @@ site = backend.site
site.debug = False
# NOTE: Consider adding a local .json config file.
youtube_core = ycdl.ytapi.Youtube(youtube_credentials.get_youtube_key())
backend.common.init_ycdldb(youtube_core, create=False)
backend.common.init_ycdldb()
backend.common.start_refresher_thread(86400)

View file

@ -1,13 +1,47 @@
'''
Run `python -i ycdl_repl.py to get an interpreter
session with these variables preloaded.
'''
import logging
logging.basicConfig()
logging.getLogger('ycdl').setLevel(logging.DEBUG)
import argparse
import code
import sys
from voussoirkit import interactive
from voussoirkit import pipeable
from voussoirkit import vlogging
import ycdl
import youtube_credentials
youtube = ycdl.ytapi.Youtube(youtube_credentials.get_youtube_key())
Y = ycdl.ycdldb.YCDLDB(youtube)
def yrepl_argparse(args):
global Y
try:
Y = ycdl.ycdldb.YCDLDB.closest_ycdldb()
except ycdl.exceptions.NoClosestYCDLDB as exc:
pipeable.stderr(exc.error_message)
pipeable.stderr('Try `ycdl_cli.py init` to create the database.')
return 1
if args.exec_statement:
exec(args.exec_statement)
Y.commit()
else:
while True:
try:
code.interact(banner='', local=dict(globals(), **locals()))
except SystemExit:
pass
if len(Y.savepoints) == 0:
break
print('You have uncommited changes, are you sure you want to quit?')
if interactive.getpermission():
break
@vlogging.main_decorator
def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--exec', dest='exec_statement', default=None)
parser.set_defaults(func=yrepl_argparse)
args = parser.parse_args(argv)
return args.func(args)
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -2,7 +2,6 @@ import argparse
import sys
import ycdl
import youtube_credentials
class Migrator:
'''
@ -320,8 +319,7 @@ def upgrade_all(data_directory):
Given the directory containing a ycdl database, apply all of the
needed upgrade_x_to_y functions in order.
'''
youtube = ycdl.ytapi.Youtube(youtube_credentials.get_youtube_key())
ycdldb = ycdl.ycdldb.YCDLDB(youtube, data_directory, skip_version_check=True)
ycdldb = ycdl.ycdldb.YCDLDB(data_directory, skip_version_check=True)
cur = ycdldb.sql.cursor()

View file

@ -4,10 +4,8 @@ import traceback
from voussoirkit import downloady
import ycdl
import youtube_credentials
youtube_core = ycdl.ytapi.Youtube(youtube_credentials.get_youtube_key())
ycdldb = ycdl.ycdldb.YCDLDB(youtube_core)
ycdldb = ycdl.ycdldb.YCDLDB()
DIRECTORY = '.\\youtube thumbnails'

View file

@ -50,7 +50,7 @@ COMMIT;
SQL_COLUMNS = sqlhelpers.extract_table_column_map(DB_INIT)
SQL_INDEX = sqlhelpers.reverse_table_column_map(SQL_COLUMNS)
DEFAULT_DATADIR = '.'
DEFAULT_DATADIR = '_ycdl'
DEFAULT_DBNAME = 'ycdl.db'
DEFAULT_CONFIGNAME = 'ycdl.json'

View file

@ -54,6 +54,11 @@ class NoSuchVideo(YCDLException):
class NoVideos(YCDLException):
error_message = 'Channel {} has no videos.'
# CHANNEL ERRORS ###################################################################################
class ChannelRefreshFailed(YCDLException):
error_message = 'failed to refresh {channel} ({exc}).'
# VIDEO ERRORS #####################################################################################
class InvalidVideoState(YCDLException):
@ -64,14 +69,6 @@ class InvalidVideoState(YCDLException):
class RSSAssistFailed(YCDLException):
error_message = '{}'
# SQL ERRORS #######################################################################################
class BadSQL(YCDLException):
pass
class BadTable(BadSQL):
error_message = 'Table "{}" does not exist.'
# GENERAL ERRORS ###################################################################################
class BadDataDirectory(YCDLException):
@ -86,3 +83,10 @@ Please run utilities\\database_upgrader.py "{filepath.absolute_path}"
'''.strip()
class DatabaseOutOfDate(YCDLException):
error_message = OUTOFDATE
class NoClosestYCDLDB(YCDLException):
'''
For calls to YCDLDB.closest_ycdldb where none exists between cwd and
drive root.
'''
error_message = 'There is no YCDLDB in "{}" or its parents.'

View file

@ -1,41 +1,33 @@
import datetime
import googleapiclient.errors
import typing
from voussoirkit import pathclass
from voussoirkit import stringtools
from voussoirkit import vlogging
from voussoirkit import worms
log = vlogging.getLogger(__name__)
from . import constants
from . import exceptions
from . import ytrss
def normalize_db_row(db_row, table) -> dict:
'''
Raises KeyError if table is not one of the recognized tables.
Raises TypeError if db_row is not the right type.
'''
if isinstance(db_row, dict):
return db_row
if isinstance(db_row, (list, tuple)):
return dict(zip(constants.SQL_COLUMNS[table], db_row))
raise TypeError(f'db_row should be {dict}, {list}, or {tuple}, not {type(db_row)}.')
class Base:
class ObjectBase(worms.Object):
def __init__(self, ycdldb):
super().__init__()
super().__init__(ycdldb)
self.ycdldb = ycdldb
class Channel(Base):
class Channel(ObjectBase):
table = 'channels'
no_such_exception = exceptions.NoSuchChannel
def __init__(self, ycdldb, db_row):
super().__init__(ycdldb)
db_row = normalize_db_row(db_row, self.table)
db_row = self.ycdldb.normalize_db_row(db_row, self.table)
self.id = db_row['id']
self.name = db_row['name']
self.name = db_row['name'] or self.id
self.uploads_playlist = db_row['uploads_playlist']
self.download_directory = self.normalize_download_directory(
db_row['download_directory'],
@ -48,6 +40,9 @@ class Channel(Base):
def __repr__(self):
return f'Channel:{self.id}'
def __str__(self):
return f'Channel:{self.id}:{self.name}'
@staticmethod
def normalize_autorefresh(autorefresh):
if isinstance(autorefresh, (str, int)):
@ -82,6 +77,20 @@ class Channel(Base):
return download_directory
@staticmethod
def normalize_name(name):
if name is None:
return None
if not isinstance(name, str):
raise TypeError(f'name should be {str}, not {type(name)}.')
name = name.strip()
if not name:
return None
return name
@staticmethod
def normalize_queuefile_extension(queuefile_extension) -> typing.Optional[str]:
if queuefile_extension is None:
@ -112,19 +121,17 @@ class Channel(Base):
videos = self.ycdldb.youtube.get_videos(new_ids)
return videos
def delete(self, commit=True):
self.ycdldb.log.info('Deleting %s.', self)
@worms.transaction
def delete(self):
log.info('Deleting %s.', self)
self.ycdldb.sql_delete(table='videos', pairs={'author_id': self.id})
self.ycdldb.sql_delete(table='channels', pairs={'id': self.id})
if commit:
self.ycdldb.commit()
self.ycdldb.delete(table='videos', pairs={'author_id': self.id})
self.ycdldb.delete(table='channels', pairs={'id': self.id})
def get_most_recent_video_id(self):
query = 'SELECT id FROM videos WHERE author_id == ? ORDER BY published DESC LIMIT 1'
bindings = [self.id]
row = self.ycdldb.sql_select_one(query, bindings)
row = self.ycdldb.select_one(query, bindings)
if row is None:
raise exceptions.NoVideos(self)
return row[0]
@ -132,7 +139,7 @@ class Channel(Base):
def has_pending(self):
query = 'SELECT 1 FROM videos WHERE author_id == ? AND state == "pending" LIMIT 1'
bindings = [self.id]
return self.ycdldb.sql_select_one(query, bindings) is not None
return self.ycdldb.select_one(query, bindings) is not None
def jsonify(self):
j = {
@ -142,8 +149,9 @@ class Channel(Base):
}
return j
def refresh(self, *, force=False, rss_assisted=True, commit=True):
self.ycdldb.log.info('Refreshing %s.', self.id)
@worms.transaction
def refresh(self, *, force=False, rss_assisted=True):
log.info('Refreshing %s.', self.id)
if force or (not self.uploads_playlist):
self.reset_uploads_playlist_id()
@ -154,16 +162,20 @@ class Channel(Base):
try:
video_generator = self._rss_assisted_videos()
except exceptions.RSSAssistFailed as exc:
self.ycdldb.log.debug('Caught %s.', exc)
log.debug('Caught %s.', exc)
video_generator = self.ycdldb.youtube.get_playlist_videos(self.uploads_playlist)
seen_ids = set()
try:
for video in video_generator:
seen_ids.add(video.id)
status = self.ycdldb.ingest_video(video, commit=False)
status = self.ycdldb.ingest_video(video)
if (not status['new']) and (not force):
break
except googleapiclient.errors.HttpError as exc:
raise exceptions.ChannelRefreshFailed(channel=self.id, exc=exc)
# Now we will refresh some other IDs that may not have been refreshed
# by the previous loop.
@ -186,15 +198,12 @@ class Channel(Base):
refresh_ids.update(v.id for v in videos)
if refresh_ids:
self.ycdldb.log.debug('Refreshing %d ids separately.', len(refresh_ids))
log.debug('Refreshing %d ids separately.', len(refresh_ids))
# We call ingest_video instead of insert_video so that
# premieres / livestreams which have finished can be automarked.
for video_id in self.ycdldb.youtube.get_videos(refresh_ids):
self.ycdldb.ingest_video(video_id, commit=False)
if commit:
self.ycdldb.commit()
self.ycdldb.ingest_video(video_id)
def reset_uploads_playlist_id(self):
'''
@ -204,7 +213,8 @@ class Channel(Base):
self.set_uploads_playlist_id(self.uploads_playlist)
return self.uploads_playlist
def set_automark(self, state, commit=True):
@worms.transaction
def set_automark(self, state):
if state not in constants.VIDEO_STATES:
raise exceptions.InvalidVideoState(state)
@ -212,53 +222,56 @@ class Channel(Base):
'id': self.id,
'automark': state,
}
self.ycdldb.sql_update(table='channels', pairs=pairs, where_key='id')
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.automark = state
if commit:
self.ycdldb.commit()
def set_autorefresh(self, autorefresh, commit=True):
@worms.transaction
def set_autorefresh(self, autorefresh):
autorefresh = self.normalize_autorefresh(autorefresh)
pairs = {
'id': self.id,
'autorefresh': autorefresh,
}
self.ycdldb.sql_update(table='channels', pairs=pairs, where_key='id')
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.autorefresh = autorefresh
if commit:
self.ycdldb.commit()
def set_download_directory(self, download_directory, commit=True):
@worms.transaction
def set_download_directory(self, download_directory):
download_directory = self.normalize_download_directory(download_directory)
pairs = {
'id': self.id,
'download_directory': download_directory.absolute_path if download_directory else None,
}
self.ycdldb.sql_update(table='channels', pairs=pairs, where_key='id')
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.download_directory = download_directory
if commit:
self.ycdldb.commit()
@worms.transaction
def set_name(self, name):
name = self.normalize_name(name)
def set_queuefile_extension(self, queuefile_extension, commit=True):
pairs = {
'id': self.id,
'name': name,
}
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.name = name
@worms.transaction
def set_queuefile_extension(self, queuefile_extension):
queuefile_extension = self.normalize_queuefile_extension(queuefile_extension)
pairs = {
'id': self.id,
'queuefile_extension': queuefile_extension,
}
self.ycdldb.sql_update(table='channels', pairs=pairs, where_key='id')
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.queuefile_extension = queuefile_extension
if commit:
self.ycdldb.commit()
def set_uploads_playlist_id(self, playlist_id, commit=True):
self.ycdldb.log.debug('Setting %s upload playlist to %s.', self.id, playlist_id)
@worms.transaction
def set_uploads_playlist_id(self, playlist_id):
log.debug('Setting %s upload playlist to %s.', self.id, playlist_id)
if not isinstance(playlist_id, str):
raise TypeError(f'Playlist id must be a string, not {type(playlist_id)}.')
@ -266,18 +279,16 @@ class Channel(Base):
'id': self.id,
'uploads_playlist': playlist_id,
}
self.ycdldb.sql_update(table='channels', pairs=pairs, where_key='id')
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.uploads_playlist = playlist_id
if commit:
self.ycdldb.commit()
class Video(Base):
class Video(ObjectBase):
table = 'videos'
no_such_exception = exceptions.NoSuchVideo
def __init__(self, ycdldb, db_row):
super().__init__(ycdldb)
db_row = normalize_db_row(db_row, self.table)
db_row = self.ycdldb.normalize_db_row(db_row, self.table)
self.id = db_row['id']
self.published = db_row['published']
@ -300,13 +311,11 @@ class Video(Base):
except exceptions.NoSuchChannel:
return None
def delete(self, commit=True):
self.ycdldb.log.info('Deleting %s.', self)
@worms.transaction
def delete(self):
log.info('Deleting %s.', self)
self.ycdldb.sql_delete(table='videos', pairs={'id': self.id})
if commit:
self.ycdldb.commit()
self.ycdldb.delete(table='videos', pairs={'id': self.id})
def jsonify(self):
j = {
@ -322,24 +331,25 @@ class Video(Base):
}
return j
def mark_state(self, state, commit=True):
@worms.transaction
def mark_state(self, state):
'''
Mark the video as ignored, pending, or downloaded.
Note: Marking as downloaded will not create the queue file, this only
updates the database. See yclddb.download_video.
'''
if state not in constants.VIDEO_STATES:
raise exceptions.InvalidVideoState(state)
self.ycdldb.log.info('Marking %s as %s.', self, state)
log.info('Marking %s as %s.', self, state)
pairs = {
'id': self.id,
'state': state,
}
self.state = state
self.ycdldb.sql_update(table='videos', pairs=pairs, where_key='id')
if commit:
self.ycdldb.commit()
self.ycdldb.update(table='videos', pairs=pairs, where_key='id')
@property
def published_string(self):

View file

@ -1,176 +1,92 @@
import json
import sqlite3
from voussoirkit import cacheclass
from voussoirkit import configlayers
from voussoirkit import pathclass
from voussoirkit import vlogging
from voussoirkit import worms
log = vlogging.getLogger(__name__)
from . import constants
from . import exceptions
from . import objects
from . import ytapi
from . import ytrss
from voussoirkit import cacheclass
from voussoirkit import configlayers
from voussoirkit import pathclass
from voussoirkit import sqlhelpers
from voussoirkit import vlogging
class YCDLDBCacheManagerMixin:
_THING_CLASSES = {
'channel':
{
'class': objects.Channel,
'exception': exceptions.NoSuchChannel,
},
'video':
{
'class': objects.Video,
'exception': exceptions.NoSuchVideo,
},
}
def __init__(self):
super().__init__()
def get_cached_instance(self, thing_type, db_row):
'''
Check if there is already an instance in the cache and return that.
Otherwise, a new instance is created, cached, and returned.
Note that in order to call this method you have to already have a
db_row which means performing some select. If you only have the ID,
use get_thing_by_id, as there may already be a cached instance to save
you the select.
'''
thing_map = self._THING_CLASSES[thing_type]
thing_class = thing_map['class']
thing_table = thing_class.table
thing_cache = self.caches[thing_type]
if isinstance(db_row, dict):
thing_id = db_row['id']
else:
thing_index = constants.SQL_INDEX[thing_table]
thing_id = db_row[thing_index['id']]
try:
thing = thing_cache[thing_id]
except KeyError:
thing = thing_class(self, db_row)
thing_cache[thing_id] = thing
return thing
def get_thing_by_id(self, thing_type, thing_id):
'''
This method will first check the cache to see if there is already an
instance with that ID, in which case we don't need to perform any SQL
select. If it is not in the cache, then a new instance is created,
cached, and returned.
'''
thing_map = self._THING_CLASSES[thing_type]
thing_class = thing_map['class']
if isinstance(thing_id, thing_class):
# This could be used to check if your old reference to an object is
# still in the cache, or re-select it from the db to make sure it
# still exists and re-cache.
# Probably an uncommon need but... no harm I think.
thing_id = thing_id.id
thing_cache = self.caches[thing_type]
try:
return thing_cache[thing_id]
except KeyError:
pass
query = f'SELECT * FROM {thing_class.table} WHERE id == ?'
bindings = [thing_id]
thing_row = self.sql_select_one(query, bindings)
if thing_row is None:
raise thing_map['exception'](thing_id)
thing = thing_class(self, thing_row)
thing_cache[thing_id] = thing
return thing
def get_things(self, thing_type):
'''
Yield things, unfiltered, in whatever order they appear in the database.
'''
thing_map = self._THING_CLASSES[thing_type]
table = thing_map['class'].table
query = f'SELECT * FROM {table}'
things = self.sql_select(query)
for thing_row in things:
thing = self.get_cached_instance(thing_type, thing_row)
yield thing
def get_things_by_sql(self, thing_type, query, bindings=None):
'''
Use an arbitrary SQL query to select things from the database.
Your query select *, all the columns of the thing's table.
'''
thing_rows = self.sql_select(query, bindings)
for thing_row in thing_rows:
yield self.get_cached_instance(thing_type, thing_row)
import youtube_credentials
class YCDLDBChannelMixin:
def __init__(self):
super().__init__()
@worms.transaction
def add_channel(
self,
channel_id,
*,
commit=True,
automark='pending',
download_directory=None,
queuefile_extension=None,
get_videos=False,
name=None,
):
'''
Raises exceptions.InvalidVideoState if automark is not
one of constants.VIDEO_STATES.
Raises TypeError if name is not a string.
Raises TypeError if queuefile_extension is not a string.
Raises pathclass.NotDirectory is download_directory is not an existing
directory (via objects.Channel.normalize_download_directory).
'''
try:
return self.get_channel(channel_id)
except exceptions.NoSuchChannel:
pass
if automark not in constants.VIDEO_STATES:
raise exceptions.InvalidVideoState(automark)
name = objects.Channel.normalize_name(name)
if name is None:
name = self.youtube.get_user_name(channel_id)
download_directory = objects.Channel.normalize_download_directory(download_directory)
download_directory = download_directory.absolute_path if download_directory else None
queuefile_extension = objects.Channel.normalize_queuefile_extension(queuefile_extension)
self.log.info('Adding channel %s %s', channel_id, name)
log.info('Adding channel %s %s', channel_id, name)
data = {
'id': channel_id,
'name': name,
'uploads_playlist': self.youtube.get_user_uploads_playlist_id(channel_id),
'download_directory': download_directory.absolute_path if download_directory else None,
'download_directory': download_directory,
'queuefile_extension': queuefile_extension,
'automark': 'pending',
'automark': automark,
'autorefresh': True,
}
self.sql_insert(table='channels', data=data)
self.insert(table='channels', data=data)
channel = objects.Channel(self, data)
self.caches['channel'][channel_id] = channel
if get_videos:
channel.refresh(commit=False)
channel.refresh()
if commit:
self.commit()
return channel
def get_channel(self, channel_id):
return self.get_thing_by_id('channel', channel_id)
return self.get_object_by_id(objects.Channel, channel_id)
def get_channels(self):
return self.get_things(thing_type='channel')
return self.get_objects(objects.Channel)
def get_channels_by_sql(self, query, bindings=None):
return self.get_things_by_sql('channel', query, bindings)
return self.get_objects_by_sql(objects.Channel, query, bindings)
def _rss_assisted_refresh(self, channels, skip_failures=False, commit=True):
@worms.transaction
def _rss_assisted_refresh(self, channels, skip_failures=False):
'''
Youtube provides RSS feeds for every channel. These feeds do not
require the API token and seem to have generous ratelimits, or
@ -200,6 +116,7 @@ class YCDLDBChannelMixin:
channel.refresh(rss_assisted=False)
except Exception as exc:
if skip_failures:
log.warning(exc)
excs.append(exc)
else:
raise
@ -210,7 +127,7 @@ class YCDLDBChannelMixin:
new_ids = ytrss.get_user_videos_since(channel.id, most_recent_video)
yield from new_ids
except (exceptions.NoVideos, exceptions.RSSAssistFailed) as exc:
self.log.debug(
log.debug(
'RSS assist for %s failed "%s", using traditional refresh.',
channel.id,
exc.error_message
@ -219,115 +136,43 @@ class YCDLDBChannelMixin:
new_ids = (id for channel in channels for id in assisted(channel))
for video in self.youtube.get_videos(new_ids):
self.ingest_video(video, commit=False)
if commit:
self.commit()
self.ingest_video(video)
return excs
@worms.transaction
def refresh_all_channels(
self,
*,
force=False,
rss_assisted=True,
skip_failures=False,
commit=True,
):
self.log.info('Refreshing all channels.')
log.info('Refreshing all channels.')
channels = self.get_channels_by_sql('SELECT * FROM channels WHERE autorefresh == 1')
if rss_assisted and not force:
return self._rss_assisted_refresh(channels, skip_failures=skip_failures, commit=commit)
return self._rss_assisted_refresh(channels, skip_failures=skip_failures)
excs = []
for channel in channels:
try:
channel.refresh(force=force, commit=commit)
channel.refresh(force=force)
except Exception as exc:
if skip_failures:
self.log.warning(exc)
log.warning(exc)
excs.append(exc)
else:
raise
if commit:
self.commit()
return excs
class YCDLSQLMixin:
def __init__(self):
super().__init__()
self._cached_sql_tables = None
def assert_table_exists(self, table):
if not self._cached_sql_tables:
self._cached_sql_tables = self.get_sql_tables()
if table not in self._cached_sql_tables:
raise exceptions.BadTable(table)
def commit(self, message=None):
if message is not None:
self.log.debug('Committing - %s.', message)
self.sql.commit()
def get_sql_tables(self):
query = 'SELECT name FROM sqlite_master WHERE type = "table"'
cur = self.sql_execute(query)
tables = set(row[0] for row in cur.fetchall())
return tables
def rollback(self):
self.log.debug('Rolling back.')
self.sql_execute('ROLLBACK')
def sql_delete(self, table, pairs):
self.assert_table_exists(table)
(qmarks, bindings) = sqlhelpers.delete_filler(pairs)
query = f'DELETE FROM {table} {qmarks}'
self.sql_execute(query, bindings)
def sql_execute(self, query, bindings=[]):
if bindings is None:
bindings = []
cur = self.sql.cursor()
self.log.loud('%s %s', query, bindings)
cur.execute(query, bindings)
return cur
def sql_insert(self, table, data):
self.assert_table_exists(table)
column_names = constants.SQL_COLUMNS[table]
(qmarks, bindings) = sqlhelpers.insert_filler(column_names, data)
query = f'INSERT INTO {table} VALUES({qmarks})'
self.sql_execute(query, bindings)
def sql_select(self, query, bindings=None):
cur = self.sql_execute(query, bindings)
while True:
fetch = cur.fetchone()
if fetch is None:
break
yield fetch
def sql_select_one(self, query, bindings=None):
cur = self.sql_execute(query, bindings)
return cur.fetchone()
def sql_update(self, table, pairs, where_key):
self.assert_table_exists(table)
(qmarks, bindings) = sqlhelpers.update_filler(pairs, where_key=where_key)
query = f'UPDATE {table} {qmarks}'
self.sql_execute(query, bindings)
class YCDLDBVideoMixin:
def __init__(self):
super().__init__()
def download_video(self, video, commit=True, force=False):
@worms.transaction
def download_video(self, video, force=False):
'''
Create the queuefile within the channel's associated directory, or
the default directory from the config file.
@ -342,7 +187,7 @@ class YCDLDBVideoMixin:
raise TypeError(video)
if video.state != 'pending' and not force:
self.log.debug('%s does not need to be downloaded.', video.id)
log.debug('%s does not need to be downloaded.', video.id)
return
try:
@ -356,18 +201,17 @@ class YCDLDBVideoMixin:
download_directory = pathclass.Path(download_directory)
queuefile = download_directory.with_child(video.id).replace_extension(extension)
self.log.info('Creating %s.', queuefile.absolute_path)
def create_queuefile():
log.info('Creating %s.', queuefile.absolute_path)
download_directory.makedirs(exist_ok=True)
queuefile.touch()
video.mark_state('downloaded', commit=False)
if commit:
self.commit()
self.on_commit_queue.append({'action': create_queuefile})
video.mark_state('downloaded')
def get_video(self, video_id):
return self.get_thing_by_id('video', video_id)
return self.get_object_by_id(objects.Video, video_id)
def get_videos(self, channel_id=None, *, state=None, orderby=None):
wheres = []
@ -403,32 +247,31 @@ class YCDLDBVideoMixin:
query = 'SELECT * FROM videos' + wheres + orderbys
self.log.debug('%s %s', query, bindings)
explain = self.sql_execute('EXPLAIN QUERY PLAN ' + query, bindings)
self.log.debug('\n'.join(str(x) for x in explain.fetchall()))
log.debug('%s %s', query, bindings)
explain = self.execute('EXPLAIN QUERY PLAN ' + query, bindings)
log.debug('\n'.join(str(x) for x in explain.fetchall()))
rows = self.sql_select(query, bindings)
rows = self.select(query, bindings)
for row in rows:
yield self.get_cached_instance('video', row)
yield self.get_cached_instance(objects.Video, row)
def get_videos_by_sql(self, query, bindings=None):
return self.get_things_by_sql('video', query, bindings)
return self.get_objects_by_sql(objects.Video, query, bindings)
def insert_playlist(self, playlist_id, commit=True):
@worms.transaction
def insert_playlist(self, playlist_id):
video_generator = self.youtube.get_playlist_videos(playlist_id)
results = [self.insert_video(video, commit=False) for video in video_generator]
if commit:
self.commit()
results = [self.insert_video(video) for video in video_generator]
return results
def ingest_video(self, video, commit=True):
@worms.transaction
def ingest_video(self, video):
'''
Call `insert_video`, and additionally use the channel's automark to
mark this video's state.
'''
status = self.insert_video(video, commit=False)
status = self.insert_video(video)
if not status['new']:
return status
@ -444,28 +287,26 @@ class YCDLDBVideoMixin:
if author.automark == 'downloaded':
if video.live_broadcast is not None:
self.log.debug(
log.debug(
'Not downloading %s because live_broadcast=%s.',
video.id,
video.live_broadcast,
)
return status
# download_video contains a call to mark_state.
self.download_video(video.id, commit=False)
self.download_video(video.id)
else:
video.mark_state(author.automark, commit=False)
if commit:
self.commit()
video.mark_state(author.automark)
return status
def insert_video(self, video, *, add_channel=True, commit=True):
@worms.transaction
def insert_video(self, video, *, add_channel=True):
if not isinstance(video, ytapi.Video):
video = self.youtube.get_video(video)
if add_channel:
self.add_channel(video.author_id, get_videos=False, commit=False)
self.add_channel(video.author_id, get_videos=False)
try:
existing = self.get_video(video.id)
@ -490,19 +331,16 @@ class YCDLDBVideoMixin:
}
if existing:
self.log.loud('Updating Video %s.', video.id)
self.sql_update(table='videos', pairs=data, where_key='id')
log.loud('Updating Video %s.', video.id)
self.update(table='videos', pairs=data, where_key='id')
else:
self.log.loud('Inserting Video %s.', video.id)
self.sql_insert(table='videos', data=data)
log.loud('Inserting Video %s.', video.id)
self.insert(table='videos', data=data)
# Override the cached copy with the new copy so that the cache contains
# updated information (view counts etc.).
video = objects.Video(self, data)
self.caches['video'][video.id] = video
if commit:
self.commit()
self.caches[objects.Video][video.id] = video
# For the benefit of ingest_video, which will only apply the channel's
# automark to newly released videos, let's consider the video to be
@ -516,20 +354,21 @@ class YCDLDBVideoMixin:
return {'new': is_new, 'video': video}
class YCDLDB(
YCDLDBCacheManagerMixin,
YCDLDBChannelMixin,
YCDLDBVideoMixin,
YCDLSQLMixin,
worms.DatabaseWithCaching,
):
def __init__(
self,
youtube,
youtube=None,
*,
create=True,
create=False,
data_directory=None,
skip_version_check=False,
):
super().__init__()
if youtube is None:
youtube = ytapi.Youtube(youtube_credentials.get_youtube_key())
self.youtube = youtube
# DATA DIR PREP
@ -541,10 +380,41 @@ class YCDLDB(
if self.data_directory.exists and not self.data_directory.is_dir:
raise exceptions.BadDataDirectory(self.data_directory.absolute_path)
# LOGGING
self.log = vlogging.getLogger(f'{__name__}:{self.data_directory.absolute_path}')
# DATABASE
self._init_sql(create=create, skip_version_check=skip_version_check)
# CONFIG
self.config_filepath = self.data_directory.with_child(constants.DEFAULT_CONFIGNAME)
self.load_config()
# WORMS
self._init_column_index()
self._init_caches()
def _check_version(self):
'''
Compare database's user_version against constants.DATABASE_VERSION,
raising exceptions.DatabaseOutOfDate if not correct.
'''
existing = self.execute('PRAGMA user_version').fetchone()[0]
if existing != constants.DATABASE_VERSION:
raise exceptions.DatabaseOutOfDate(
existing=existing,
new=constants.DATABASE_VERSION,
filepath=self.data_directory,
)
def _init_caches(self):
self.caches = {
objects.Channel: cacheclass.Cache(maxlen=20_000),
objects.Video: cacheclass.Cache(maxlen=50_000),
}
def _init_column_index(self):
self.COLUMNS = constants.SQL_COLUMNS
self.COLUMN_INDEX = constants.SQL_INDEX
def _init_sql(self, create, skip_version_check):
self.database_filepath = self.data_directory.with_child(constants.DEFAULT_DBNAME)
existing_database = self.database_filepath.exists
if not existing_database and not create:
@ -561,38 +431,46 @@ class YCDLDB(
else:
self._first_time_setup()
# CONFIG
self.config_filepath = self.data_directory.with_child(constants.DEFAULT_CONFIGNAME)
self.load_config()
self.caches = {
'channel': cacheclass.Cache(maxlen=20_000),
'video': cacheclass.Cache(maxlen=50_000),
}
def _check_version(self):
'''
Compare database's user_version against constants.DATABASE_VERSION,
raising exceptions.DatabaseOutOfDate if not correct.
'''
existing = self.sql.execute('PRAGMA user_version').fetchone()[0]
if existing != constants.DATABASE_VERSION:
raise exceptions.DatabaseOutOfDate(
existing=existing,
new=constants.DATABASE_VERSION,
filepath=self.data_directory,
)
def _first_time_setup(self):
self.log.info('Running first-time database setup.')
self.sql.executescript(constants.DB_INIT)
log.info('Running first-time database setup.')
self.executescript(constants.DB_INIT)
self.commit()
def _load_pragmas(self):
self.log.debug('Reloading pragmas.')
self.sql.executescript(constants.DB_PRAGMAS)
log.debug('Reloading pragmas.')
self.executescript(constants.DB_PRAGMAS)
self.commit()
@classmethod
def closest_ycdldb(cls, youtube=None, path='.', *args, **kwargs):
'''
Starting from the given path and climbing upwards towards the filesystem
root, look for an existing YCDL data directory and return the
YCDLDB object. If none exists, raise exceptions.NoClosestYCDLDB.
'''
path = pathclass.Path(path)
starting = path
while True:
possible = path.with_child(constants.DEFAULT_DATADIR)
if possible.is_dir:
break
parent = path.parent
if path == parent:
raise exceptions.NoClosestYCDLDB(starting.absolute_path)
path = parent
path = possible
ycdldb = cls(
youtube=youtube,
data_directory=path,
create=False,
*args,
**kwargs,
)
log.debug('Found closest YCDLDB at %s.', path)
return ycdldb
def get_all_states(self):
'''
Get a list of all the different states that are currently in use in
@ -602,7 +480,7 @@ class YCDLDB(
# arbitrarily many states for user-defined purposes, but I kind of went
# back on that so I'm not sure if it will be useful.
query = 'SELECT DISTINCT state FROM videos'
states = self.sql_select(query)
states = self.select(query)
states = [row[0] for row in states]
return sorted(states)

View file

@ -1,4 +1,4 @@
import apiclient.discovery
import googleapiclient.discovery
import isodate
from voussoirkit import gentools
@ -53,7 +53,7 @@ class Video:
class Youtube:
def __init__(self, key):
self.youtube = apiclient.discovery.build(
self.youtube = googleapiclient.discovery.build(
cache_discovery=False,
developerKey=key,
serviceName='youtube',