172 lines
5.5 KiB
Python
172 lines
5.5 KiB
Python
import argparse
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
|
|
from voussoirkit import bytestring
|
|
from voussoirkit import downloady
|
|
from voussoirkit import pipeable
|
|
|
|
DEFAULT_PIECE_SIZE = bytestring.MEBIBYTE
|
|
DEFAULT_THREAD_COUNT = 10
|
|
|
|
def init(url, localname=None, piece_size=DEFAULT_PIECE_SIZE):
|
|
localname = localname or downloady.basename_from_url(url) or str(int(time.time()))
|
|
plan = downloady.prepare_plan(url, localname, headers={'range': 'bytes=0-'})
|
|
sql_name = localname + '.db'
|
|
if os.path.exists(sql_name):
|
|
raise ValueError('database already exists %s' % sql_name)
|
|
sql = sqlite3.connect(sql_name)
|
|
cur = sql.cursor()
|
|
cur.execute('CREATE TABLE IF NOT EXISTS meta (url TEXT)')
|
|
cur.execute('CREATE TABLE IF NOT EXISTS pieces (indx INT, start INT, end INT, done INT)')
|
|
cur.execute('INSERT INTO meta VALUES(?)', [url])
|
|
cur.execute('CREATE INDEX IF NOT EXISTS index_done on pieces(done)')
|
|
index = 0
|
|
while True:
|
|
start = index * piece_size
|
|
end = start + piece_size
|
|
end = min(end, plan['remote_total_bytes'])
|
|
done = 0
|
|
cur.execute('INSERT INTO pieces VALUES(?, ?, ?, ?)', [index, start, end, done])
|
|
start += piece_size
|
|
if start > plan['remote_total_bytes']:
|
|
break
|
|
index += 1
|
|
sql.commit()
|
|
print('Initialized %s with %d pieces' % (sql_name, index + 1))
|
|
|
|
def reset(databasename):
|
|
sql = sqlite3.connect(databasename)
|
|
cur = sql.cursor()
|
|
cur.execute('UPDATE pieces SET done = 0 WHERE done == 1')
|
|
sql.commit()
|
|
|
|
def piece_thread(entry):
|
|
try:
|
|
headers = {'range': 'bytes=%d-%d' % (entry['start'], entry['end'])}
|
|
x = downloady.download_file(entry['url'], entry['localname'], headers=headers, timeout=10)
|
|
entry['job'].finish()
|
|
return x
|
|
except:
|
|
traceback.print_exc()
|
|
entry['job'].reset()
|
|
|
|
class Downloader:
|
|
def __init__(self, sql_name, thread_count=10):
|
|
self.thread_count = thread_count
|
|
self.sql_name = sql_name
|
|
self.localname = self.sql_name.rsplit('.db')[0]
|
|
self.sql = sqlite3.connect(self.sql_name)
|
|
self.cur = self.sql.cursor()
|
|
self.url = self.cur.execute('SELECT * FROM meta').fetchone()[0]
|
|
|
|
def start(self):
|
|
self.cur.execute('SELECT * FROM pieces WHERE done == 0')
|
|
fetch = self.cur.fetchall()
|
|
jobs = []
|
|
for entry in fetch:
|
|
entry = {
|
|
'url': self.url,
|
|
'localname': self.localname,
|
|
'index': entry[0],
|
|
'start': entry[1],
|
|
'end': entry[2],
|
|
}
|
|
job = Job(entry)
|
|
jobs.append(job)
|
|
|
|
while len(jobs) > 0:
|
|
finished_jobs = []
|
|
active_jobs = []
|
|
pending_jobs = []
|
|
|
|
for job in jobs:
|
|
if job.state == 'finished':
|
|
finished_jobs.append(job)
|
|
elif job.state == 'in progress':
|
|
active_jobs.append(job)
|
|
else:
|
|
pending_jobs.append(job)
|
|
|
|
for job in finished_jobs:
|
|
self.cur.execute('UPDATE pieces SET done == 1 WHERE indx == ?', [job.entry['index']])
|
|
print('finished #%d' % job.entry['index'])
|
|
self.sql.commit()
|
|
|
|
need = self.thread_count - len(active_jobs)
|
|
while need > 0 and len(pending_jobs) > 0:
|
|
job = pending_jobs.pop()
|
|
print('starting #%d' % job.entry['index'])
|
|
job.start()
|
|
active_jobs.append(job)
|
|
need -= 1
|
|
|
|
jobs = pending_jobs + active_jobs
|
|
print('%d jobs in progress' % len(active_jobs))
|
|
print('%d jobs waiting' % len(pending_jobs))
|
|
time.sleep(2)
|
|
|
|
class Job:
|
|
def __init__(self, entry):
|
|
self.state = 'pending'
|
|
self.entry = entry
|
|
|
|
def start(self):
|
|
self.state = 'in progress'
|
|
self.entry['job'] = self
|
|
thread = threading.Thread(target=piece_thread, args=[self.entry])
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def finish(self):
|
|
self.state = 'finished'
|
|
|
|
def reset(self):
|
|
self.state = 'pending'
|
|
|
|
|
|
def init_argparse(args):
|
|
if isinstance(args.piece_size, str):
|
|
piece_size = bytestring.parsebytes(args.piece_size)
|
|
else:
|
|
piece_size = args.piece_size
|
|
url = pipeable.input(args.url, split_lines=False)
|
|
init(url, localname=args.localname, piece_size=piece_size)
|
|
|
|
def reset_argparse(args):
|
|
reset(args.databasename)
|
|
|
|
def download_argparse(args):
|
|
downloader = Downloader(args.databasename, int(args.thread_count))
|
|
downloader.start()
|
|
|
|
def main(argv):
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers()
|
|
subparsers.required = True
|
|
subparsers.dest = 'command'
|
|
|
|
p_init = subparsers.add_parser('init')
|
|
p_init.add_argument('url')
|
|
p_init.add_argument('localname', nargs='?', default=None)
|
|
p_init.add_argument('--piece-size', dest='piece_size', default=DEFAULT_PIECE_SIZE)
|
|
p_init.set_defaults(func=init_argparse)
|
|
|
|
p_reset = subparsers.add_parser('reset')
|
|
p_reset.add_argument('databasename')
|
|
p_reset.set_defaults(func=reset_argparse)
|
|
|
|
p_download = subparsers.add_parser('download')
|
|
p_download.add_argument('databasename')
|
|
p_download.add_argument('--threads', dest='thread_count', default=DEFAULT_THREAD_COUNT)
|
|
p_download.set_defaults(func=download_argparse)
|
|
|
|
args = parser.parse_args(argv)
|
|
return args.func(args)
|
|
|
|
if __name__ == '__main__':
|
|
raise SystemExit(main(sys.argv[1:]))
|