Update to new worms version.

This commit is contained in:
voussoir 2022-07-15 22:30:06 -07:00
parent 12400b407c
commit ca353e9977
No known key found for this signature in database
GPG key ID: 5F7554F8C26DACCB
9 changed files with 131 additions and 122 deletions

View file

@ -56,6 +56,7 @@ def get_videos_from_args(args):
def add_channel_argparse(args):
ycdldb = closest_db()
with ycdldb.transaction:
ycdldb.add_channel(
channel_id=args.channel_id,
automark=args.automark,
@ -65,8 +66,8 @@ def add_channel_argparse(args):
queuefile_extension=args.queuefile_extension,
)
if args.autoyes or interactive.getpermission('Commit?'):
ycdldb.commit()
if not (args.autoyes or interactive.getpermission('Commit?')):
ycdldb.rollback()
return 0
@ -97,12 +98,16 @@ def delete_channel_argparse(args):
ycdldb = closest_db()
needs_commit = False
with ycdldb.transaction:
for channel in get_channels_from_args(args):
channel.delete()
needs_commit = True
if args.autoyes or interactive.getpermission('Commit?'):
ycdldb.commit()
if not needs_commit:
return 0
if not (args.autoyes or interactive.getpermission('Commit?')):
ycdldb.rollback()
return 0
@ -110,6 +115,7 @@ def download_video_argparse(args):
ycdldb = closest_db()
needs_commit = False
with ycdldb.transaction:
for video in get_videos_from_args(args):
queuefile = ycdldb.download_video(
video,
@ -123,14 +129,13 @@ def download_video_argparse(args):
if not needs_commit:
return 0
if args.autoyes or interactive.getpermission('Commit?'):
ycdldb.commit()
if not (args.autoyes or interactive.getpermission('Commit?')):
ycdldb.rollback()
return 0
def init_argparse(args):
ycdldb = ycdl.ycdldb.YCDLDB(create=True)
ycdldb.commit()
pipeable.stdout(ycdldb.data_directory.absolute_path)
return 0
@ -139,6 +144,7 @@ def refresh_channels_argparse(args):
status = 0
ycdldb = closest_db()
with ycdldb.transaction:
if args.channels:
channels = [ycdldb.get_channel(c) for c in args.channels]
for channel in channels:
@ -155,8 +161,8 @@ def refresh_channels_argparse(args):
if not needs_commit:
return status
if args.autoyes or interactive.getpermission('Commit?'):
ycdldb.commit()
if not (args.autoyes or interactive.getpermission('Commit?')):
ycdldb.rollback()
return status

View file

@ -73,6 +73,10 @@ def init_ycdldb(*args, **kwargs):
global ycdldb
ycdldb = ycdl.ycdldb.YCDLDB.closest_ycdldb(*args, **kwargs)
def refresh_all_channels():
with ycdldb.transaction:
ycdldb.refresh_all_channels(force=False, skip_failures=True)
def refresher_thread(rate):
global last_refresh
while True:
@ -87,8 +91,7 @@ def refresher_thread(rate):
log.info('Starting refresh job.')
refresh_job = threading.Thread(
target=ycdldb.refresh_all_channels,
kwargs={'force': False, 'skip_failures': True, 'commit': True},
target=refresh_all_channels,
daemon=True,
)
refresh_job.start()

View file

@ -18,6 +18,7 @@ def _get_or_insert_video(video_id):
try:
video = common.ycdldb.get_video(video_id)
except ycdl.exceptions.NoSuchVideo:
with common.ycdldb.transaction:
video = common.ycdldb.insert_video(video_id)['video']
return video
@ -67,7 +68,11 @@ def _render_videos_listing(videos, channel, state, orderby):
@site.route('/channel/<channel_id>/<state>')
def get_channel(channel_id, state=None):
try:
channel = common.ycdldb.add_channel(channel_id, commit=True)
channel = common.ycdldb.get_channel(channel_id)
except ycdl.exceptions.NoSuchChannel:
try:
with common.ycdldb.transaction:
channel = common.ycdldb.add_channel(channel_id)
except ycdl.ytapi.ChannelNotFound:
flask.abort(404)
@ -116,7 +121,8 @@ def post_add_channel():
except ycdl.ytapi.ChannelNotFound:
return flasktools.json_response({}, status=404)
channel = common.ycdldb.add_channel(channel_id, get_videos=True, commit=True)
with common.ycdldb.transaction:
channel = common.ycdldb.add_channel(channel_id, get_videos=True)
return flasktools.json_response(channel.jsonify())
@site.route('/channel/<channel_id>/delete', methods=['POST'])
@ -126,7 +132,8 @@ def post_delete_channel(channel_id):
except ycdl.exceptions.NoSuchChannel as exc:
return flasktools.json_response(exc.jsonify(), status=404)
channel.delete(commit=True)
with common.ycdldb.transaction:
channel.delete()
response = {'id': channel.id, 'deleted': channel.deleted}
return flasktools.json_response(response)
@ -139,14 +146,16 @@ def post_refresh_channel(channel_id):
except ycdl.exceptions.NoSuchChannel as exc:
return flasktools.json_response(exc.jsonify(), status=404)
channel.refresh(force=force, commit=True)
with common.ycdldb.transaction:
channel.refresh(force=force)
return flasktools.json_response(channel.jsonify())
@site.route('/refresh_all_channels', methods=['POST'])
def post_refresh_all_channels():
force = request.form.get('force', False)
force = stringtools.truthystring(force, False)
common.ycdldb.refresh_all_channels(force=force, skip_failures=True, commit=True)
with common.ycdldb.transaction:
common.ycdldb.refresh_all_channels(force=force, skip_failures=True)
common.last_refresh = time.time()
return flasktools.json_response({})
@ -157,7 +166,8 @@ def post_set_automark(channel_id):
channel = common.ycdldb.get_channel(channel_id)
try:
channel.set_automark(state, commit=True)
with common.ycdldb.transaction:
channel.set_automark(state)
except ycdl.exceptions.InvalidVideoState as exc:
return flasktools.json_response(exc.jsonify(), status=400)
@ -172,7 +182,8 @@ def post_set_autorefresh(channel_id):
try:
autorefresh = stringtools.truthystring(autorefresh)
channel.set_autorefresh(autorefresh, commit=True)
with common.ycdldb.transaction:
channel.set_autorefresh(autorefresh)
except (ValueError, TypeError):
flask.abort(400)
@ -186,7 +197,8 @@ def post_set_download_directory(channel_id):
channel = common.ycdldb.get_channel(channel_id)
try:
channel.set_download_directory(download_directory, commit=True)
with common.ycdldb.transaction:
channel.set_download_directory(download_directory)
except pathclass.NotDirectory:
exc = {
'error_type': 'NOT_DIRECTORY',
@ -204,7 +216,8 @@ def post_set_name(channel_id):
name = request.form['name']
channel = common.ycdldb.get_channel(channel_id)
channel.set_name(name, commit=True)
with common.ycdldb.transaction:
channel.set_name(name)
response = {'id': channel.id, 'name': channel.name}
return flasktools.json_response(response)
@ -215,7 +228,8 @@ def post_set_queuefile_extension(channel_id):
extension = request.form['extension']
channel = common.ycdldb.get_channel(channel_id)
channel.set_queuefile_extension(extension, commit=True)
with common.ycdldb.transaction:
channel.set_queuefile_extension(extension)
response = {'id': channel.id, 'queuefile_extension': channel.queuefile_extension}
return flasktools.json_response(response)

View file

@ -22,9 +22,9 @@ def post_mark_video_state():
return flasktools.json_response(exc.jsonify(), status=404)
try:
with common.ycdldb.transaction:
for video in videos:
video.mark_state(state, commit=False)
common.ycdldb.commit()
video.mark_state(state)
except ycdl.exceptions.InvalidVideoState as exc:
common.ycdldb.rollback()
return flasktools.json_response(exc.jsonify(), status=400)
@ -42,8 +42,8 @@ def post_start_download():
except ycdl.exceptions.NoSuchVideo as exc:
return flasktools.json_response(exc.jsonify(), status=404)
with common.ycdldb.transaction:
for video in videos:
common.ycdldb.download_video(video, commit=False)
common.ycdldb.commit()
common.ycdldb.download_video(video)
return flasktools.json_response({'video_ids': video_ids, 'state': 'downloaded'})

View file

@ -44,7 +44,7 @@ class Migrator:
# be pointing to the version of B which has not been reconstructed yet,
# which is about to get renamed to B_old and then A's reference will be
# broken.
self.ycdldb.sql_execute('PRAGMA foreign_keys = OFF')
self.ycdldb.pragma_write('foreign_keys', 'OFF')
self.ycdldb.sql_execute('BEGIN')
for (name, table) in self.tables.items():
if name not in self.existing_tables:
@ -323,8 +323,7 @@ def upgrade_all(data_directory):
cur = ycdldb.sql.cursor()
cur.execute('PRAGMA user_version')
current_version = cur.fetchone()[0]
current_version = ycdldb.pragma_read('user_version')
needed_version = ycdl.constants.DATABASE_VERSION
if current_version == needed_version:
@ -336,15 +335,10 @@ def upgrade_all(data_directory):
upgrade_function = 'upgrade_%d_to_%d' % (current_version, version_number)
upgrade_function = eval(upgrade_function)
try:
ycdldb.sql.execute('PRAGMA foreign_keys = ON')
with ycdldb.transaction:
ycdldb.pragma_write('foreign_keys', 'ON')
upgrade_function(ycdldb)
except Exception as exc:
ycdldb.rollback()
raise
else:
ycdldb.sql.cursor().execute('PRAGMA user_version = %d' % version_number)
ycdldb.commit()
ycdldb.pragma_write('user_version', version_number)
current_version = version_number
print('Upgrades finished.')

View file

@ -24,8 +24,8 @@ def merge_db(from_db_path, to_db_path, channel):
to_db = sqlite3.connect(to_db_path)
from_db = sqlite3.connect(from_db_path)
to_version = to_db.execute('PRAGMA user_version').fetchone()[0]
from_version = from_db.execute('PRAGMA user_version').fetchone()[0]
to_version = to_db.pragma_read('user_version')
from_version = from_db.pragma_read('user_version')
if to_version != from_version:
raise Exception(f'Databases have different versions: to={to_version}, from={from_version}.')

View file

@ -1,19 +1,8 @@
from voussoirkit import sqlhelpers
DATABASE_VERSION = 10
DB_VERSION_PRAGMA = f'''
PRAGMA user_version = {DATABASE_VERSION};
'''
DB_PRAGMAS = f'''
PRAGMA cache_size = 10000;
'''
DB_INIT = f'''
BEGIN;
----------------------------------------------------------------------------------------------------
{DB_PRAGMAS}
{DB_VERSION_PRAGMA}
CREATE TABLE IF NOT EXISTS channels(
id TEXT,
name TEXT,
@ -23,6 +12,8 @@ CREATE TABLE IF NOT EXISTS channels(
automark TEXT,
autorefresh INT
);
CREATE INDEX IF NOT EXISTS index_channel_id on channels(id);
----------------------------------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS videos(
id TEXT,
published INT,
@ -35,15 +26,11 @@ CREATE TABLE IF NOT EXISTS videos(
live_broadcast TEXT,
state TEXT
);
CREATE INDEX IF NOT EXISTS index_channel_id on channels(id);
CREATE INDEX IF NOT EXISTS index_video_author_published on videos(author_id, published);
CREATE INDEX IF NOT EXISTS index_video_author_state_published on videos(author_id, state, published);
CREATE INDEX IF NOT EXISTS index_video_id on videos(id);
CREATE INDEX IF NOT EXISTS index_video_published on videos(published);
CREATE INDEX IF NOT EXISTS index_video_state_published on videos(state, published);
----------------------------------------------------------------------------------------------------
COMMIT;
'''
SQL_COLUMNS = sqlhelpers.extract_table_column_map(DB_INIT)

View file

@ -133,12 +133,13 @@ class Channel(ObjectBase):
videos = self.ycdldb.youtube.get_videos(new_ids)
return videos
@worms.transaction
@worms.atomic
def delete(self):
log.info('Deleting %s.', self)
self.ycdldb.delete(table='videos', pairs={'author_id': self.id})
self.ycdldb.delete(table='channels', pairs={'id': self.id})
self.deleted = True
def get_most_recent_video_id(self) -> str:
'''
@ -172,7 +173,7 @@ class Channel(ObjectBase):
}
return j
@worms.transaction
@worms.atomic
def refresh(self, *, force=False, rss_assisted=True):
'''
Fetch new videos on the channel.
@ -251,7 +252,7 @@ class Channel(ObjectBase):
self.set_uploads_playlist_id(self.uploads_playlist)
return self.uploads_playlist
@worms.transaction
@worms.atomic
def set_automark(self, state):
self.ycdldb.assert_valid_state(state)
@ -262,7 +263,7 @@ class Channel(ObjectBase):
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.automark = state
@worms.transaction
@worms.atomic
def set_autorefresh(self, autorefresh):
autorefresh = self.normalize_autorefresh(autorefresh)
@ -273,7 +274,7 @@ class Channel(ObjectBase):
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.autorefresh = autorefresh
@worms.transaction
@worms.atomic
def set_download_directory(self, download_directory):
download_directory = self.normalize_download_directory(download_directory)
@ -284,7 +285,7 @@ class Channel(ObjectBase):
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.download_directory = download_directory
@worms.transaction
@worms.atomic
def set_name(self, name):
name = self.normalize_name(name)
@ -295,7 +296,7 @@ class Channel(ObjectBase):
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.name = name
@worms.transaction
@worms.atomic
def set_queuefile_extension(self, queuefile_extension):
queuefile_extension = self.normalize_queuefile_extension(queuefile_extension)
@ -306,7 +307,7 @@ class Channel(ObjectBase):
self.ycdldb.update(table='channels', pairs=pairs, where_key='id')
self.queuefile_extension = queuefile_extension
@worms.transaction
@worms.atomic
def set_uploads_playlist_id(self, playlist_id):
log.debug('Setting %s upload playlist to %s.', self, playlist_id)
if not isinstance(playlist_id, str):
@ -347,11 +348,12 @@ class Video(ObjectBase):
except exceptions.NoSuchChannel:
return None
@worms.transaction
@worms.atomic
def delete(self):
log.info('Deleting %s.', self)
self.ycdldb.delete(table='videos', pairs={'id': self.id})
self.deleted = True
def jsonify(self):
j = {
@ -367,7 +369,7 @@ class Video(ObjectBase):
}
return j
@worms.transaction
@worms.atomic
def mark_state(self, state):
'''
Mark the video as ignored, pending, or downloaded.

View file

@ -22,7 +22,7 @@ class YCDLDBChannelMixin:
def __init__(self):
super().__init__()
@worms.transaction
@worms.atomic
def add_channel(
self,
channel_id,
@ -88,7 +88,7 @@ class YCDLDBChannelMixin:
def get_channels_by_sql(self, query, bindings=None):
return self.get_objects_by_sql(objects.Channel, query, bindings)
@worms.transaction
@worms.atomic
def _rss_assisted_refresh(self, channels, skip_failures=False):
'''
Youtube provides RSS feeds for every channel. These feeds do not
@ -155,7 +155,7 @@ class YCDLDBChannelMixin:
return excs
@worms.transaction
@worms.atomic
def refresh_all_channels(
self,
*,
@ -186,7 +186,7 @@ class YCDLDBVideoMixin:
def __init__(self):
super().__init__()
@worms.transaction
@worms.atomic
def download_video(
self,
video,
@ -300,9 +300,9 @@ class YCDLDBVideoMixin:
query = 'SELECT * FROM videos' + wheres + orderbys
log.debug('%s %s', query, bindings)
explain = self.execute('EXPLAIN QUERY PLAN ' + query, bindings)
log.debug('\n'.join(str(x) for x in explain.fetchall()))
# log.debug('%s %s', query, bindings)
# explain = self.execute('EXPLAIN QUERY PLAN ' + query, bindings)
# log.debug('\n'.join(str(x) for x in explain.fetchall()))
rows = self.select(query, bindings)
for row in rows:
@ -311,14 +311,14 @@ class YCDLDBVideoMixin:
def get_videos_by_sql(self, query, bindings=None):
return self.get_objects_by_sql(objects.Video, query, bindings)
@worms.transaction
@worms.atomic
def insert_playlist(self, playlist_id):
video_generator = self.youtube.get_playlist_videos(playlist_id)
results = [self.insert_video(video) for video in video_generator]
return results
@worms.transaction
@worms.atomic
def ingest_video(self, video):
'''
Call `insert_video`, and additionally use the channel's automark to
@ -353,7 +353,7 @@ class YCDLDBVideoMixin:
return status
@worms.transaction
@worms.atomic
def insert_video(self, video, *, add_channel=True):
if not isinstance(video, ytapi.Video):
video = self.youtube.get_video(video)
@ -443,13 +443,14 @@ class YCDLDB(
# WORMS
self._init_column_index()
self._init_caches()
self.id_type = str
def _check_version(self):
'''
Compare database's user_version against constants.DATABASE_VERSION,
raising exceptions.DatabaseOutOfDate if not correct.
'''
existing = self.execute('PRAGMA user_version').fetchone()[0]
existing = self.pragma_read('user_version')
if existing != constants.DATABASE_VERSION:
raise exceptions.DatabaseOutOfDate(
existing=existing,
@ -475,25 +476,27 @@ class YCDLDB(
raise FileNotFoundError(msg)
self.data_directory.makedirs(exist_ok=True)
self.sql = sqlite3.connect(self.database_filepath)
self.sql.row_factory = sqlite3.Row
self.sql_read = self._make_sqlite_read_connection(self.database_filepath)
self.sql_write = self._make_sqlite_write_connection(self.database_filepath)
if existing_database:
if not skip_version_check:
self._check_version()
with self.transaction:
self._load_pragmas()
else:
self._first_time_setup()
def _first_time_setup(self):
log.info('Running first-time database setup.')
with self.transaction:
self._load_pragmas()
self.pragma_write('user_version', constants.DATABASE_VERSION)
self.executescript(constants.DB_INIT)
self.commit()
def _load_pragmas(self):
log.debug('Reloading pragmas.')
self.executescript(constants.DB_PRAGMAS)
self.commit()
self.pragma_write('cache_size', 10000)
@classmethod
def closest_ycdldb(cls, youtube=None, path='.', *args, **kwargs):