This commit is contained in:
unknown 2016-03-07 12:25:50 -08:00
parent 9b149ad4c1
commit 4d9871494b
3 changed files with 823 additions and 578 deletions

View file

@ -2,3 +2,15 @@ Spinal
========
A couple of tools for copying files and directories.
2016 03 02
- Fixed issue where the copy's path casing was based on the input string and not the path's actual casing (since Windows doesn't care).
- Change the returned written_bytes to 0 if the file did not need to be copied. This is better for tracking how much actually happens during each backup.
- Fixed encode errors caused by callback_v1's print statement.
2016 03 03
- Moved directory / filename exclusion logic into the walk_generator so the caller doesn't need to worry about it.
- walk_generator now yields absolute filenames since copy_dir no longer needs to process exclusions, and that was the only reason walk_generator used to yield them in parts.
2016 03 04
- Created a FilePath class to cache os.stat data, which should reduce the number of unecessary filesystem calls.

View file

@ -1,7 +1,11 @@
import collections
import glob
import json
import os
import ratelimiter
import shutil
import stat
import string
import sys
import time
@ -10,8 +14,17 @@ KIBIBYTE = BYTE * 1024
MIBIBYTE = KIBIBYTE * 1024
GIBIBYTE = MIBIBYTE * 1024
TEBIBYTE = GIBIBYTE * 1024
SIZE_UNITS = (TEBIBYTE, GIBIBYTE, MIBIBYTE, KIBIBYTE, BYTE)
CHUNK_SIZE = 64 * KIBIBYTE
UNIT_STRINGS = {
BYTE: 'b',
KIBIBYTE: 'KiB',
MIBIBYTE: 'MiB',
GIBIBYTE: 'GiB',
TEBIBYTE: 'TiB',
}
CHUNK_SIZE = 128 * KIBIBYTE
# Number of bytes to read and write at a time
@ -33,6 +46,69 @@ class SourceNotFile(Exception):
class SpinalError(Exception):
pass
class FilePath:
def __init__(self, path):
self.path = os.path.abspath(path)
self._stat = None
self._isdir = None
self._isfile = None
self._islink = None
self._size = None
def __hash__(self):
return self.path.__hash__()
def __repr__(self):
return repr(self.path)
@property
def isdir(self):
return self.type_getter('_isdir', stat.S_ISDIR)
@property
def isfile(self):
return self.type_getter('_isfile', stat.S_ISREG)
@property
def islink(self):
return self.type_getter('_islink', stat.S_ISLNK)
@property
def size(self):
if self._size is None:
if self.stat is False:
self._size = None
else:
self._size = self.stat.st_size
return self._size
@property
def stat(self):
if self._stat is None:
try:
self._stat = os.stat(self.path)
except FileNotFoundError:
self._stat = False
return self._stat
def type_getter(self, attr, resolution):
if getattr(self, attr) is None:
if self.stat is False:
return False
else:
setattr(self, attr, resolution(self.stat.st_mode))
return getattr(self, attr)
def bytes_to_unit_string(bytes):
size_unit = 1
for unit in SIZE_UNITS:
if bytes >= unit:
size_unit = unit
break
size_unit_string = UNIT_STRINGS[size_unit]
size_string = '%.3f %s' % ((bytes / size_unit), size_unit_string)
return size_string
def callback_exclusion(name, path_type):
'''
@ -40,18 +116,19 @@ def callback_exclusion(name, path_type):
'''
print('Excluding', name)
def callback_v1(filename, written_bytes, total_bytes):
def callback_v1(fpobj, written_bytes, total_bytes):
'''
Example of a copy callback function.
Prints "filename written/total (percent%)"
Prints "fpobj written/total (percent%)"
'''
filename = fpobj.path.encode('ascii', 'replace').decode()
if written_bytes >= total_bytes:
ends = '\n'
else:
ends = ''
percent = (100 * written_bytes) / total_bytes
percent = '%03.3f' % percent
percent = '%07.3f' % percent
written = '{:,}'.format(written_bytes)
total = '{:,}'.format(total_bytes)
written = written.rjust(len(total), ' ')
@ -60,6 +137,197 @@ def callback_v1(filename, written_bytes, total_bytes):
print(status, end=ends)
sys.stdout.flush()
def copy(source, file_args=None, file_kwargs=None, dir_args=None, dir_kwargs=None):
'''
Perform copy_dir or copy_file as appropriate for the source path.
'''
source = str_to_fp(source)
if source.isfile:
file_args = file_args or tuple()
file_kwargs = file_kwargs or dict()
return copy_file(source, *file_args, **file_kwargs)
elif source.isdir:
dir_args = dir_args or tuple()
dir_kwargs = dir_kwargs or dict()
return copy_dir(source, *dir_args, **dir_kwargs)
raise SpinalError('Neither file nor dir: %s' % source)
def copy_dir(
source,
destination=None,
destination_new_root=None,
bytes_per_second=None,
callback_directory=None,
callback_file=None,
callback_permission_denied=None,
dry_run=False,
exclude_directories=None,
exclude_filenames=None,
exclusion_callback=None,
overwrite_old=True,
precalcsize=False,
):
'''
Copy all of the contents from source to destination,
including subdirectories.
source:
The directory which will be copied.
destination:
The directory in which copied files are placed. Alternatively, use
destination_new_root.
destination_new_root:
Determine the destination path by calling
`new_root(source, destination_new_root)`.
Thus, this path acts as a root and the rest of the path is matched.
bytes_per_second:
Restrict file copying to this many bytes per second. Can be an integer
or an existing Ratelimiter object.
The provided BYTE, KIBIBYTE, etc constants may help.
Default = None
callback_directory:
This function will be called after each file copy with three parameters:
name of file copied, number of bytes written to destination so far,
total bytes needed (from precalcsize).
Default = None
callback_file:
Will be passed into each individual `copy_file` operation as the
`callback` for that file.
Default = None
callback_permission_denied:
Will be passed into each individual `copy_file` operation as the
`callback_permission_denied` for that file.
Default = None
dry_run:
Do everything except the actual file copying.
Default = False
exclude_filenames:
Passed directly into `walk_generator`.
Default = None
exclude_directories:
Passed directly into `walk_generator`.
Default = None
exclusion_callback:
Passed directly into `walk_generator`.
Default = None
overwrite_old:
If True, overwrite the destination file if the source file
has a more recent "last modified" timestamp.
Default = True
precalcsize:
If True, calculate the size of source before beginning the
operation. This number can be used in the callback_directory function.
Else, callback_directory will receive written bytes as total bytes
(showing 100% always).
This can take a long time.
Default = False
Returns: [destination path, number of bytes written to destination]
(Written bytes is 0 if all files already existed.)
'''
# Prepare parameters
if not is_xor(destination, destination_new_root):
m = 'One and only one of `destination` and '
m += '`destination_new_root` can be passed'
raise ValueError(m)
source = str_to_fp(source)
source = get_path_casing(source)
if destination_new_root is not None:
destination = new_root(source, destination_new_root)
destination = str_to_fp(destination)
if is_subfolder(source, destination):
raise RecursiveDirectory(source, destination)
if not source.isdir:
raise SourceNotDirectory(source)
if destination.isfile:
raise DestinationIsFile(destination)
if precalcsize is True:
total_bytes = get_dir_size(source)
else:
total_bytes = 0
if isinstance(bytes_per_second, ratelimiter.Ratelimiter):
limiter = bytes_per_second
elif bytes_per_second is not None:
limiter = ratelimiter.Ratelimiter(allowance_per_period=bytes_per_second, period=1)
else:
limiter = None
# Copy
written_bytes = 0
walker = walk_generator(
source,
exclude_directories=exclude_directories,
exclude_filenames=exclude_filenames,
exclusion_callback=exclusion_callback,
)
for (source_abspath) in walker:
# Terminology:
# abspath: C:\folder\subfolder\filename.txt
# location: C:\folder\subfolder
# base_name: filename.txt
# folder: subfolder
destination_abspath = source_abspath.path.replace(source.path, destination.path)
destination_abspath = str_to_fp(destination_abspath)
if destination_abspath.isdir:
raise DestinationIsDirectory(destination_abspath)
destination_location = os.path.split(destination_abspath.path)[0]
if not os.path.isdir(destination_location):
os.makedirs(destination_location)
copied = copy_file(
source_abspath,
destination_abspath,
bytes_per_second=limiter,
callback=callback_file,
callback_permission_denied=callback_permission_denied,
dry_run=dry_run,
overwrite_old=overwrite_old,
)
copiedname = copied[0]
written_bytes += copied[1]
if callback_directory is not None:
if precalcsize is False:
callback_directory(copiedname, written_bytes, written_bytes)
else:
callback_directory(copiedname, written_bytes, total_bytes)
return [destination, written_bytes]
def copy_file(
source,
destination=None,
@ -68,6 +336,7 @@ def copy_file(
callback=None,
dry_run=False,
overwrite_old=True,
callback_permission_denied=None,
):
'''
Copy a file from one place to another.
@ -94,11 +363,20 @@ def copy_file(
callback:
If provided, this function will be called after writing
each CHUNK_SIZE bytes to destination with three parameters:
name of file being copied, number of bytes written so far,
the FilePath object being copied, number of bytes written so far,
total number of bytes needed.
Default = None
callback_permission_denied:
If provided, this function will be called when a source file denies
read access, with the file path and the exception object as parameters.
THE OPERATION WILL RETURN NORMALLY.
If not provided, the PermissionError is raised.
Default = None
dry_run:
Do everything except the actual file copying.
@ -111,6 +389,7 @@ def copy_file(
Default = True
Returns: [destination filename, number of bytes written to destination]
(Written bytes is 0 if the file already existed.)
'''
# Prepare parameters
if not is_xor(destination, destination_new_root):
@ -118,16 +397,17 @@ def copy_file(
m += '`destination_new_root` can be passed'
raise ValueError(m)
source = str_to_fp(source)
source = get_path_casing(source)
if destination_new_root is not None:
destination = new_root(source, destination_new_root)
destination = str_to_fp(destination)
source = os.path.abspath(source)
destination = os.path.abspath(destination)
if not os.path.isfile(source):
if not source.isfile:
raise SourceNotFile(source)
if os.path.isdir(destination):
if destination.isdir:
raise DestinationIsDirectory(destination)
if isinstance(bytes_per_second, ratelimiter.Ratelimiter):
@ -137,35 +417,47 @@ def copy_file(
else:
limiter = None
source_bytes = os.path.getsize(source)
# Determine overwrite
destination_exists = os.path.exists(destination)
if destination_exists:
if overwrite_old is False:
return [destination, source_bytes]
if destination.stat is not False:
destination_modtime = destination.stat.st_mtime
source_modtime = os.path.getmtime(source)
destination_modtime = os.path.getmtime(destination)
if overwrite_old is False:
return [destination, 0]
source_modtime = source.stat.st_mtime
if source_modtime == destination_modtime:
return [destination, source_bytes]
return [destination, 0]
# Copy
if dry_run:
if callback is not None:
callback(destination, source_bytes, source_bytes)
return [destination, source_bytes]
callback(destination, 0, 0)
return [destination, 0]
source_bytes = source.size
destination_location = os.path.split(destination.path)[0]
if not os.path.exists(destination_location):
os.makedirs(destination_location)
written_bytes = 0
source_file = open(source, 'rb')
destionation_file = open(destination, 'wb')
try:
source_file = open(source.path, 'rb')
destination_file = open(destination.path, 'wb')
except PermissionError as exception:
if callback_permission_denied is not None:
callback_permission_denied(source, exception)
return [destination, 0]
else:
raise
while True:
data_chunk = source_file.read(CHUNK_SIZE)
data_bytes = len(data_chunk)
if data_bytes == 0:
break
destionation_file.write(data_chunk)
destination_file.write(data_chunk)
written_bytes += data_bytes
if limiter is not None:
@ -176,64 +468,102 @@ def copy_file(
# Fin
source_file.close()
destionation_file.close()
shutil.copystat(source, destination)
destination_file.close()
shutil.copystat(source.path, destination.path)
return [destination, written_bytes]
def copy_dir(
source_dir,
destination_dir=None,
destination_new_root=None,
bytes_per_second=None,
callback_directory=None,
callback_file=None,
dry_run=False,
def get_path_casing(path):
'''
Take what is perhaps incorrectly cased input and get the path's actual
casing according to the filesystem.
Thank you
Ethan Furman http://stackoverflow.com/a/7133137/5430534
xvorsx http://stackoverflow.com/a/14742779/5430534
'''
p = str_to_fp(path)
path = p.path
(drive, subpath) = os.path.splitdrive(path)
pattern = ["%s[%s]" % (piece[:-1], piece[-1]) for piece in subpath.split(os.sep)[1:]]
pattern = os.sep.join(pattern)
pattern = drive.upper() + os.sep + pattern
try:
return str_to_fp(glob.glob(pattern)[0])
except IndexError:
return p
def get_dir_size(path):
'''
Calculate the total number of bytes across all files in this directory
and its subdirectories.
'''
path = str_to_fp(path)
if not path.isdir:
raise SourceNotDirectory(path)
total_bytes = 0
for (directory, filename) in walk_generator(path):
filename = os.path.join(directory, filename)
filesize = os.path.getsize(filename)
total_bytes += filesize
return total_bytes
def is_subfolder(parent, child):
'''
Determine whether parent contains child.
'''
parent = normalize(str_to_fp(parent).path) + os.sep
child = normalize(str_to_fp(child).path) + os.sep
return child.startswith(parent)
def is_xor(*args):
'''
Return True if and only if one arg is truthy.
'''
return [bool(a) for a in args].count(True) == 1
def new_root(filepath, root):
'''
Prepend `root` to `filepath`, drive letter included. For example:
"C:\\folder\\subfolder\\file.txt" and "C:\\backups" becomes
"C:\\backups\\C\\folder\\subfolder\\file.txt"
I use this so that my G: drive can have backups from my C: and D: drives
while preserving directory structure in G:\\D and G:\\C.
'''
filepath = str_to_fp(filepath).path
root = str_to_fp(root).path
filepath = filepath.replace(':', os.sep)
filepath = os.path.normpath(filepath)
filepath = os.path.join(root, filepath)
return str_to_fp(filepath)
def normalize(text):
'''
Apply os.path.normpath and os.path.normcase.
'''
return os.path.normpath(os.path.normcase(text))
def str_to_fp(path):
'''
If `path` is a string, create a FilePath object, otherwise just return it.
'''
if isinstance(path, str):
path = FilePath(path)
return path
def walk_generator(
path,
exclude_directories=None,
exclude_filenames=None,
exclusion_callback=None,
overwrite_old=True,
precalcsize=False,
):
'''
Copy all of the contents from source_dir to destination_dir,
including subdirectories.
source_dir:
The directory which will be copied.
destination_dir:
The directory in which copied files are placed. Alternatively, use
destination_new_root.
destination_new_root:
Determine the destination path by calling
`new_root(source_dir, destination_new_root)`.
Thus, this path acts as a root and the rest of the path is matched.
bytes_per_second:
Restrict file copying to this many bytes per second. Can be an integer
or an existing Ratelimiter object.
The provided BYTE, KIBIBYTE, etc constants may help.
Default = None
callback_directory:
This function will be called after each file copy with three parameters:
name of file copied, number of bytes written to destination_dir so far,
total bytes needed (from precalcsize).
Default = None
callback_file:
Will be passed into each individual copy_file() as the `callback`
for that file.
Default = None
dry_run:
Do everything except the actual file copying.
Default = False
Yield (location, filename) from the file tree similar to os.walk.
Example value: ('C:\\Users\\Michael\\Music', 'song.mp3')
exclude_filenames:
A set of filenames that will not be copied. Entries can be absolute
@ -256,193 +586,62 @@ def copy_dir(
two parameters: the path, and 'file' or 'directory'.
Default = None
overwrite_old:
If True, overwrite the destination file if the source file
has a more recent "last modified" timestamp.
Default = True
precalcsize:
If True, calculate the size of source_dir before beginning the
operation. This number can be used in the callback_directory function.
Else, callback_directory will receive written bytes as total bytes
(showing 100% always).
This can take a long time.
Default = False
Returns: [destination_dir path, number of bytes written to destination_dir]
'''
# Prepare parameters
if not is_xor(destination_dir, destination_new_root):
m = 'One and only one of `destination_dir` and '
m += '`destination_new_root` can be passed'
raise ValueError(m)
if destination_new_root is not None:
destination_dir = new_root(source_dir, destination_new_root)
source_dir = os.path.normpath(os.path.abspath(source_dir))
destination_dir = os.path.normpath(os.path.abspath(destination_dir))
if is_subfolder(source_dir, destination_dir):
raise RecursiveDirectory(source_dir, destination_dir)
if not os.path.isdir(source_dir):
raise SourceNotDirectory(source_dir)
if os.path.isfile(destination_dir):
raise DestinationIsFile(destination_dir)
if exclusion_callback is None:
exclusion_callback = lambda *x: None
if exclude_directories is None:
exclude_directories = set()
if exclude_filenames is None:
exclude_filenames = set()
if exclude_directories is None:
exclude_directories = set()
if exclusion_callback is None:
exclusion_callback = lambda *x: None
exclude_filenames = {normalize(f) for f in exclude_filenames}
exclude_directories = {normalize(f) for f in exclude_directories}
if precalcsize is True:
total_bytes = get_dir_size(source_dir)
else:
total_bytes = 0
path = str_to_fp(path).path
if isinstance(bytes_per_second, ratelimiter.Ratelimiter):
limiter = bytes_per_second
elif bytes_per_second is not None:
limiter = ratelimiter.Ratelimiter(allowance_per_period=bytes_per_second, period=1)
else:
limiter = None
if normalize(path) in exclude_directories:
exclusion_callback(path, 'directory')
return
# Copy
written_bytes = 0
for (source_location, base_filename) in walk_generator(source_dir):
# Terminology:
# abspath: C:\folder\subfolder\filename.txt
# base_filename: filename.txt
# folder: subfolder
# location: C:\folder\subfolder
#source_location = normalize(source_location)
#base_filename = normalize(base_filename)
if normalize(os.path.split(path)[1]) in exclude_directories:
exclusion_callback(path, 'directory')
return
source_folder_name = os.path.split(source_location)[1]
source_abspath = os.path.join(source_location, base_filename)
directory_queue = collections.deque()
directory_queue.append(path)
destination_abspath = source_abspath.replace(source_dir, destination_dir)
destination_location = os.path.split(destination_abspath)[0]
# This is a recursion-free workplace.
# Thank you for your cooperation.
while len(directory_queue) > 0:
location = directory_queue.popleft()
contents = os.listdir(location)
if base_filename in exclude_filenames:
exclusion_callback(source_abspath, 'file')
continue
if source_abspath in exclude_filenames:
exclusion_callback(source_abspath, 'file')
continue
if source_location in exclude_directories:
exclusion_callback(source_location, 'directory')
continue
if source_folder_name in exclude_directories:
exclusion_callback(source_location, 'directory')
continue
directories = []
for base_name in contents:
absolute_name = os.path.join(location, base_name)
if os.path.isdir(destination_abspath):
raise DestinationIsDirectory(destination_abspath)
if os.path.isdir(absolute_name):
if normalize(absolute_name) in exclude_directories:
exclusion_callback(absolute_name, 'directory')
continue
if not os.path.isdir(destination_location):
os.makedirs(destination_location)
if normalize(base_name) in exclude_directories:
exclusion_callback(absolute_name, 'directory')
continue
copied = copy_file(
source_abspath,
destination_abspath,
bytes_per_second=limiter,
callback=callback_file,
dry_run=dry_run,
overwrite_old=overwrite_old,
)
directories.append(absolute_name)
copiedname = copied[0]
written_bytes += copied[1]
if callback_directory is not None:
if precalcsize is False:
callback_directory(copiedname, written_bytes, written_bytes)
else:
callback_directory(copiedname, written_bytes, total_bytes)
if normalize(base_name) in exclude_filenames:
exclusion_callback(absolute_name, 'file')
continue
if normalize(absolute_name) in exclude_filenames:
exclusion_callback(absolute_filename, 'file')
continue
return [destination_dir, written_bytes]
yield(str_to_fp(absolute_name))
def execute_spinaltask(task):
'''
Execute a spinal task.
'''
pass
def get_dir_size(source_dir):
'''
Calculate the total number of bytes across all files in this directory
and its subdirectories.
'''
source_dir = os.path.abspath(source_dir)
if not os.path.isdir(source_dir):
raise SourceNotDirectory(source_dir)
total_bytes = 0
for (directory, filename) in walk_generator(source_dir):
filename = os.path.join(directory, filename)
filesize = os.path.getsize(filename)
total_bytes += filesize
return total_bytes
def is_subfolder(parent, child):
'''
Determine whether parent contains child.
'''
parent = normalize(os.path.abspath(parent)) + os.sep
child = normalize(os.path.abspath(child)) + os.sep
return child.startswith(parent)
def is_xor(*args):
'''
Return True if and only if one arg is truthy.
'''
return [bool(a) for a in args].count(True) == 1
def new_root(filepath, root):
'''
Prepend `root` to `filepath`, drive letter included. For example:
"C:\\folder\\subfolder\\file.txt" and "C:\\backups" becomes
"C:\\backups\\C\\folder\\subfolder\\file.txt"
I use this so that my G: drive can have backups from my C: and D: drives
while preserving directory structure in G:\\D and G:\\C.
'''
filepath = os.path.abspath(filepath)
root = os.path.abspath(root)
filepath = filepath.replace(':', os.sep)
filepath = os.path.normpath(filepath)
filepath = os.path.join(root, filepath)
return filepath
def normalize(text):
'''
Apply os.path.normpath and os.path.normcase.
'''
return os.path.normpath(os.path.normcase(text))
def walk_generator(path):
'''
Yield filenames from os.walk so the caller doesn't need to deal with the
nested for-loops.
'''
path = os.path.abspath(path)
walker = os.walk(path)
for (location, folders, files) in walker:
for filename in files:
yield (location, filename)
directories.reverse()
directory_queue.extendleft(directories)

View file

@ -1,3 +1,4 @@
import bs4
import json
import requests
import os
@ -21,7 +22,7 @@ IMGUR_ALBUMFOLDERS = True
# Else, files will be named <album_id>_<img_id>.jpg and placed
# in the local folder.
GFYCAT_MP4 = True
GFYCAT_MP4 = False
# If True, download gfycat urls in .mp4
# Else, .webm
@ -55,403 +56,436 @@ DO_GENERIC = True
last_request = 0
if DOWNLOAD_DIRECTORY != '':
if DOWNLOAD_DIRECTORY[-1] not in ['/', '\\']:
DOWNLOAD_DIRECTORY += '\\'
if not os.path.exists(DOWNLOAD_DIRECTORY):
os.makedirs(DOWNLOAD_DIRECTORY)
if DOWNLOAD_DIRECTORY[-1] not in ['/', '\\']:
DOWNLOAD_DIRECTORY += '\\'
if not os.path.exists(DOWNLOAD_DIRECTORY):
os.makedirs(DOWNLOAD_DIRECTORY)
class StatusExc(Exception):
pass
pass
def download_file(url, localname, headers={}):
localname = DOWNLOAD_DIRECTORY + localname
if 'twimg' in url:
localname = localname.replace(':large', '')
localname = localname.replace(':small', '')
if os.path.exists(localname):
print('\t%s already exists!!' % localname)
return localname
print('\tDownloading %s' % localname)
downloading = request_get(url, stream=True, headers=headers)
localfile = open(localname, 'wb')
for chunk in downloading.iter_content(chunk_size=1024):
if chunk:
localfile.write(chunk)
localfile.close()
return localname
localname = os.path.join(DOWNLOAD_DIRECTORY, localname)
dirname = os.path.split(localname)[0]
if dirname != '' and not os.path.exists(dirname):
os.makedirs(dirname)
if 'twimg' in url:
localname = localname.replace(':large', '')
localname = localname.replace(':small', '')
if os.path.exists(localname):
print('\t%s already exists!!' % localname)
return localname
print('\tDownloading %s' % localname)
downloading = request_get(url, stream=True, headers=headers)
localfile = open(localname, 'wb')
for chunk in downloading.iter_content(chunk_size=1024):
if chunk:
localfile.write(chunk)
localfile.close()
return localname
def request_get(url, stream=False, headers={}):
global last_request
now = time.time()
diff = now - last_request
if diff < SLEEPINESS:
diff = SLEEPINESS - diff
time.sleep(diff)
last_request = time.time()
h = HEADERS.copy()
h.update(headers)
req = requests.get(url, stream=stream, headers=h)
if req.status_code not in [200,206]:
raise StatusExc("Status code %d on url %s" % (req.status_code, url))
return req
global last_request
now = time.time()
diff = now - last_request
if diff < SLEEPINESS:
diff = SLEEPINESS - diff
time.sleep(diff)
last_request = time.time()
h = HEADERS.copy()
h.update(headers)
req = requests.get(url, stream=stream, headers=h)
if req.status_code not in [200,206]:
raise StatusExc("Status code %d on url %s" % (req.status_code, url))
return req
##############################################################################
##
def handle_imgur_html(url):
pagedata = request_get(url)
pagedata = pagedata.text.replace(' ', '')
pagedata = pagedata.split('\n')
pagedata = [line for line in pagedata if IMGUR_ALBUM_INDV in line]
pagedata = [line.split('content="')[1] for line in pagedata]
links = [line.split('"')[0] for line in pagedata]
links = [line.split('?')[0] for line in links]
print(links)
return links
def handle_imgur(url, albumid='', customname=None):
name = url.split('/')[-1]
if 'imgur.com' in name:
# This link doesn't appear to have an image id
return
url = url.replace('/gallery/', '/a/')
basename = name.split('.')[0]
if '.' in name:
# This is a direct image link
if customname:
# replace the imgur ID with the customname, keep ext.
name = '%s.%s' % (customname, name.split('.')[-1])
if albumid and albumid != basename:
if IMGUR_ALBUMFOLDERS:
if not os.path.exists(DOWNLOAD_DIRECTORY + albumid):
os.makedirs(DOWNLOAD_DIRECTORY + albumid)
localpath = '%s\\%s' % (albumid, name)
else:
localpath = '%s_%s' % (albumid, name)
else:
localpath = name
return download_file(url, localpath)
else:
# Not a direct image link, let's read the html.
images = handle_imgur_html(url)
if customname:
name = customname
print('\tFound %d images' % len(images))
localfiles = []
if len(images) > 1:
for imagei in range(len(images)):
image = images[imagei]
iname = image.split('/')[-1]
iname = iname.split('.')[0]
x = handle_imgur(image, albumid=name, customname='%d_%s' % (imagei, iname))
localfiles.append(x)
else:
x = handle_imgur(images[0], customname=name)
localfiles.append(x)
return localfiles
def handle_gfycat(url, customname=None):
name = url.split('/')[-1]
name = name.split('.')[0]
if customname:
filename = customname
else:
filename = name
print('Gfycat')
name = url.split('/')[-1]
name = name.split('.')[0]
if customname:
filename = customname
else:
filename = name
if GFYCAT_MP4:
name += '.mp4'
filename += '.mp4'
else:
name += '.webm'
filename += '.webm'
for subdomain in GFYCAT_SUBDOMAINS:
url = 'http://%s.gfycat.com/%s' % (subdomain, name)
try:
return download_file(url, filename)
except StatusExc:
pass
def handle_vidme(url, customname=None):
if customname is None:
customname = url.split('/')[-1]+'.mp4'
pagedata = request_get(url)
pagedata = pagedata.text
pagedata = pagedata.split('\n')
pagedata = [l for l in pagedata if '.mp4' in l and 'og:video:url' in l]
pagedata = pagedata[0]
pagedata = pagedata.split('content="')[1].split('"')[0]
pagedata = pagedata.replace('&amp;', '&')
headers = {'Referer': 'https://vid.me/',
'Range':'bytes=0-',
'Host':'d1wst0behutosd.cloudfront.net',
'Cache-Control':'max-age=0'}
return download_file(pagedata, customname, headers=headers)
def handle_vimeo(url, customname=None):
name = url.split('/')[-1]
name = name.split('?')[0]
try:
int(name)
except ValueError as e:
print('Could not identify filename of %s' % url)
raise e
url = 'http://player.vimeo.com/video/%s' % name
pagedata = request_get(url)
pagedata = pagedata.text
pagedata = pagedata.replace('</script>', '<script')
pagedata = pagedata.split('<script>')
for chunk in pagedata:
if VIMEO_DICT_START in chunk:
break
chunk = chunk.split(VIMEO_DICT_START)[1]
chunk = chunk.split(VIMEO_DICT_END)[0]
chunk = json.loads(chunk)
for priority in VIMEO_PRIORITY:
if priority in chunk:
fileurl = chunk[priority]['url']
break
if customname:
filename = customname + '.mp4'
else:
filename = name + '.mp4'
return download_file(fileurl, filename)
if GFYCAT_MP4:
name += '.mp4'
filename += '.mp4'
else:
name += '.webm'
filename += '.webm'
for subdomain in GFYCAT_SUBDOMAINS:
url = 'http://%s.gfycat.com/%s' % (subdomain, name)
try:
return download_file(url, filename)
except StatusExc:
pass
def handle_liveleak(url, customname=None):
if customname:
name = customname
else:
name = url.split('=')[1]
name += '.mp4'
pagedata = request_get(url)
pagedata = pagedata.text
if LIVELEAK_YOUTUBEIFRAME in pagedata:
pagedata = pagedata.split('\n')
pagedata = [line for line in pagedata if LIVELEAK_YOUTUBEIFRAME in line]
pagedata = pagedata[0]
pagedata = pagedata.split('src="')[1]
pagedata = pagedata.split('"')[0]
print('\tFound youtube embed')
handle_master(pagedata, customname=customname)
else:
pagedata = pagedata.split('file: "')[1]
pagedata = pagedata.split('",')[0]
original = pagedata
pagedata = pagedata.split('.')
for spoti in range(len(pagedata)):
if 'h264_' in pagedata[spoti]:
pagedata[spoti] = 'LIVELEAKRESOLUTION'
pagedata = '.'.join(pagedata)
for res in LIVELEAK_RESOLUTIONS:
url = pagedata.replace('LIVELEAKRESOLUTION', res)
try:
return download_file(url, name)
except StatusExc:
pass
return download_file(original, name)
print('Liveleak')
if customname:
name = customname
else:
name = url.split('=')[1]
name += '.mp4'
pagedata = request_get(url)
pagedata = pagedata.text
if LIVELEAK_YOUTUBEIFRAME in pagedata:
pagedata = pagedata.split('\n')
pagedata = [line for line in pagedata if LIVELEAK_YOUTUBEIFRAME in line]
pagedata = pagedata[0]
pagedata = pagedata.split('src="')[1]
pagedata = pagedata.split('"')[0]
print('\tFound youtube embed')
handle_master(pagedata, customname=customname)
else:
pagedata = pagedata.split('file: "')[1]
pagedata = pagedata.split('",')[0]
original = pagedata
pagedata = pagedata.split('.')
for spoti in range(len(pagedata)):
if 'h264_' in pagedata[spoti]:
pagedata[spoti] = 'LIVELEAKRESOLUTION'
pagedata = '.'.join(pagedata)
for res in LIVELEAK_RESOLUTIONS:
url = pagedata.replace('LIVELEAKRESOLUTION', res)
try:
return download_file(url, name)
except StatusExc:
pass
return download_file(original, name)
def handle_imgur_html(url):
print('Imgur')
pagedata = request_get(url)
pagedata = pagedata.text.replace(' ', '')
pagedata = pagedata.split('\n')
pagedata = [line for line in pagedata if IMGUR_ALBUM_INDV in line]
pagedata = [line.split('content="')[1] for line in pagedata]
links = [line.split('"')[0] for line in pagedata]
links = [line.split('?')[0] for line in links]
print(links)
return links
def handle_imgur(url, albumid='', customname=None):
print('Imgur')
name = url.split('/')[-1]
if 'imgur.com' in name:
# This link doesn't appear to have an image id
return
def handle_youtube(url, customname=None):
url = url.replace('&amp;', '&')
url = url.replace('feature=player_embedded&', '')
url = url.replace('&feature=player_embedded', '')
if not customname:
os.system(YOUTUBE_DL_FORMAT.format(url=url, dir=DOWNLOAD_DIRECTORY, name='%(title)s'))
return
os.system(YOUTUBE_DL_FORMAT.format(url=url, dir=DOWNLOAD_DIRECTORY, name=customname))
if DOWNLOAD_DIRECTORY:
return '%s/%s.mp4' % (DOWNLOAD_DIRECTORY, customname)
return '%s.mp4' % customname
url = url.replace('/gallery/', '/a/')
basename = name.split('.')[0]
if '.' in name:
# This is a direct image link
if customname:
# replace the imgur ID with the customname, keep ext.
name = '%s.%s' % (customname, name.split('.')[-1])
if albumid and albumid != basename:
if IMGUR_ALBUMFOLDERS:
if not os.path.exists(DOWNLOAD_DIRECTORY + albumid):
os.makedirs(DOWNLOAD_DIRECTORY + albumid)
localpath = '%s\\%s' % (albumid, name)
else:
localpath = '%s_%s' % (albumid, name)
else:
localpath = name
return download_file(url, localpath)
else:
# Not a direct image link, let's read the html.
images = handle_imgur_html(url)
if customname:
name = customname
print('\tFound %d images' % len(images))
localfiles = []
if len(images) > 1:
for imagei in range(len(images)):
image = images[imagei]
iname = image.split('/')[-1]
iname = iname.split('.')[0]
x = handle_imgur(image, albumid=name, customname='%d_%s' % (imagei, iname))
localfiles.append(x)
else:
x = handle_imgur(images[0], customname=name)
localfiles.append(x)
return localfiles
def handle_twitter(url, customname=None):
pagedata = request_get(url)
pagedata = pagedata.text
print('Twitter')
pagedata = request_get(url)
pagedata = pagedata.text
idnumber = url.split('status/')[1].split('/')[0]
if customname:
name = customname
else:
name = idnumber
customname = idnumber
tweetpath = '%s.html' % (DOWNLOAD_DIRECTORY + name)
psplit = '<p class="TweetTextSize'
tweettext = pagedata.split(psplit)[1]
tweettext = tweettext.split('</p>')[0]
tweettext = psplit + tweettext + '</p>'
tweettext = '<html><body>%s</body></html>' % tweettext
tweettext = tweettext.replace('/hashtag/', 'http://twitter.com/hashtag/')
tweethtml = open(tweetpath, 'w', encoding='utf-8')
tweethtml.write(tweettext)
tweethtml.close()
print('\tSaved tweet text')
try:
link = pagedata.split('data-url="')[1]
link = link.split('"')[0]
if link != url:
handle_master(link, customname=customname)
return tweetpath
except IndexError:
try:
link = pagedata.split('data-expanded-url="')[1]
link = link.split('"')[0]
if link != url:
handle_master(link, customname=customname)
return tweetpath
except IndexError:
pass
return tweetpath
print('\tNo media detected')
idnumber = url.split('status/')[1].split('/')[0]
if customname:
name = customname
else:
name = idnumber
customname = idnumber
tweetpath = '%s.html' % (DOWNLOAD_DIRECTORY + name)
psplit = '<p class="TweetTextSize'
tweettext = pagedata.split(psplit)[1]
tweettext = tweettext.split('</p>')[0]
tweettext = psplit + tweettext + '</p>'
tweettext = '<html><body>%s</body></html>' % tweettext
tweettext = tweettext.replace('/hashtag/', 'http://twitter.com/hashtag/')
tweethtml = open(tweetpath, 'w', encoding='utf-8')
tweethtml.write(tweettext)
tweethtml.close()
print('\tSaved tweet text')
try:
link = pagedata.split('data-url="')[1]
link = link.split('"')[0]
if link != url:
handle_master(link, customname=customname)
return tweetpath
except IndexError:
try:
link = pagedata.split('data-expanded-url="')[1]
link = link.split('"')[0]
if link != url:
handle_master(link, customname=customname)
return tweetpath
except IndexError:
pass
return tweetpath
print('\tNo media detected')
def handle_vidble(url, customname=None):
print('Vidble')
if '/album/' in url:
pagedata = request_get(url)
pagedata.raise_for_status()
pagedata = pagedata.text
soup = bs4.BeautifulSoup(pagedata)
images = soup.find_all('img')
images = [i for i in images if i.attrs.get('src', None)]
images = [i.attrs['src'] for i in images]
images = [i for i in images if '/assets/' not in i]
images = [i for i in images if i[0] == '/']
if customname:
folder = customname
else:
folder = url.split('/album/')[1].split('/')[0]
for (index, image) in enumerate(images):
name = image.split('/')[-1]
localname = '{folder}\\{index}_{name}'.format(folder=folder, index=index, name=name)
image = 'https://vidble.com' + image
download_file(image, localname)
else:
localname = url.split('/')[-1]
extension = os.path.splitext(localname)[1]
localname = customname + extension
download_file(url, localname)
def handle_vidme(url, customname=None):
print('Vidme')
if customname is None:
customname = url.split('/')[-1]+'.mp4'
pagedata = request_get(url)
pagedata = pagedata.text
pagedata = pagedata.split('\n')
pagedata = [l for l in pagedata if '.mp4' in l and 'og:video:url' in l]
pagedata = pagedata[0]
pagedata = pagedata.split('content="')[1].split('"')[0]
pagedata = pagedata.replace('&amp;', '&')
headers = {'Referer': 'https://vid.me/',
'Range':'bytes=0-',
'Host':'d1wst0behutosd.cloudfront.net',
'Cache-Control':'max-age=0'}
return download_file(pagedata, customname, headers=headers)
def handle_vimeo(url, customname=None):
print('Vimeo')
name = url.split('/')[-1]
name = name.split('?')[0]
try:
int(name)
except ValueError as e:
print('Could not identify filename of %s' % url)
raise e
url = 'http://player.vimeo.com/video/%s' % name
pagedata = request_get(url)
pagedata = pagedata.text
pagedata = pagedata.replace('</script>', '<script')
pagedata = pagedata.split('<script>')
for chunk in pagedata:
if VIMEO_DICT_START in chunk:
break
chunk = chunk.split(VIMEO_DICT_START)[1]
chunk = chunk.split(VIMEO_DICT_END)[0]
chunk = json.loads(chunk)
for priority in VIMEO_PRIORITY:
if priority in chunk:
fileurl = chunk[priority]['url']
break
if customname:
filename = customname + '.mp4'
else:
filename = name + '.mp4'
return download_file(fileurl, filename)
def handle_youtube(url, customname=None):
print('Youtube')
url = url.replace('&amp;', '&')
url = url.replace('feature=player_embedded&', '')
url = url.replace('&feature=player_embedded', '')
if not customname:
os.system(YOUTUBE_DL_FORMAT.format(url=url, dir=DOWNLOAD_DIRECTORY, name='%(title)s'))
return
os.system(YOUTUBE_DL_FORMAT.format(url=url, dir=DOWNLOAD_DIRECTORY, name=customname))
if DOWNLOAD_DIRECTORY:
return '%s/%s.mp4' % (DOWNLOAD_DIRECTORY, customname)
return '%s.mp4' % customname
def handle_generic(url, customname=None):
try:
if customname:
name = customname
else:
name = url.split('/')[-1]
print('Generic')
try:
if customname:
name = customname
else:
name = url.split('/')[-1]
base = name.split('.')[0]
ext = name.split('.')[-1]
if ext in [base, '']:
ext = 'html'
print(base)
print(ext)
base = name.split('.')[0]
ext = name.split('.')[-1]
if ext in [base, '']:
ext = 'html'
print(base)
print(ext)
name = '%s.%s' % (base, ext)
name = '%s.%s' % (base, ext)
return download_file(url, name)
except:
pass
return download_file(url, name)
except:
pass
##
##############################################################################
HANDLERS = {
'imgur.com': handle_imgur,
'gfycat.com': handle_gfycat,
'vimeo.com': handle_vimeo,
'vid.me': handle_vidme,
'liveleak.com': handle_liveleak,
'youtube.com': handle_youtube,
'youtu.be': handle_youtube,
'twitter.com': handle_twitter
}
'gfycat.com': handle_gfycat,
'imgur.com': handle_imgur,
'liveleak.com': handle_liveleak,
'vid.me': handle_vidme,
'vidble.com': handle_vidble,
'vimeo.com': handle_vimeo,
'youtube.com': handle_youtube,
'youtu.be': handle_youtube,
'twitter.com': handle_twitter
}
def handle_master(url, customname=None):
print('Handling %s' % url)
for handlerkey in HANDLERS:
if handlerkey.lower() in url.lower():
return HANDLERS[handlerkey](url, customname=customname)
if DO_GENERIC:
return handle_generic(url, customname=customname)
print('Handling %s' % url)
for handlerkey in HANDLERS:
if handlerkey.lower() in url.lower():
return HANDLERS[handlerkey](url, customname=customname)
if DO_GENERIC:
return handle_generic(url, customname=customname)
def test_imgur():
# Imgur gallery album
handle_master('http://imgur.com/gallery/s4WLG')
# Imgur gallery album
handle_master('http://imgur.com/gallery/s4WLG')
# Imgur standard album with customname
handle_master('http://imgur.com/a/s4WLG', customname='album')
# Imgur standard album with customname
handle_master('http://imgur.com/a/s4WLG', customname='album')
# Imgur indirect
handle_master('http://imgur.com/gvJUct0')
# Imgur indirect
handle_master('http://imgur.com/gvJUct0')
# Imgur indirect single with customname
handle_master('http://imgur.com/gvJUct0', customname='indirect')
# Imgur indirect single with customname
handle_master('http://imgur.com/gvJUct0', customname='indirect')
# Imgur direct single
handle_master('http://i.imgur.com/gvJUct0.jpg')
# Imgur direct single
handle_master('http://i.imgur.com/gvJUct0.jpg')
def test_gfycat():
# Gfycat direct .gif
handle_master('http://giant.gfycat.com/FatherlyBruisedIberianchiffchaff.gif')
# Gfycat direct .gif
handle_master('http://giant.gfycat.com/FatherlyBruisedIberianchiffchaff.gif')
# Gfycat general link
handle_master('http://www.gfycat.com/RawWetFlatcoatretriever')
# Gfycat general link
handle_master('http://www.gfycat.com/RawWetFlatcoatretriever')
# Gfycat general link with customname
handle_master('http://www.gfycat.com/RawWetFlatcoatretriever', customname='gfycatgeneral')
# Gfycat general link with customname
handle_master('http://www.gfycat.com/RawWetFlatcoatretriever', customname='gfycatgeneral')
def test_vimeo():
# Vimeo standard link
handle_master('https://vimeo.com/109405701')
# Vimeo standard link
handle_master('https://vimeo.com/109405701')
# Vimeo player link with customname
handle_master('https://player.vimeo.com/video/109405701', customname='vimeoplayer')
# Vimeo player link with customname
handle_master('https://player.vimeo.com/video/109405701', customname='vimeoplayer')
def test_liveleak():
# LiveLeak standard link
handle_master('http://www.liveleak.com/view?i=9d1_1429192014')
# LiveLeak standard link
handle_master('http://www.liveleak.com/view?i=9d1_1429192014')
# Liveleak article with youtube embed
handle_master('http://www.liveleak.com/view?i=ab8_1367941301')
# Liveleak article with youtube embed
handle_master('http://www.liveleak.com/view?i=ab8_1367941301')
# LiveLeak standard link with customname
handle_master('http://www.liveleak.com/view?i=9d1_1429192014', customname='liveleak')
# LiveLeak standard link with customname
handle_master('http://www.liveleak.com/view?i=9d1_1429192014', customname='liveleak')
def test_youtube():
# Youtube standard link
handle_master('https://www.youtube.com/watch?v=bEgeh5hA5ko')
# Youtube standard link
handle_master('https://www.youtube.com/watch?v=bEgeh5hA5ko')
# Youtube short link
handle_master('https://youtu.be/GjOBTstnW20', customname='youtube')
# Youtube short link
handle_master('https://youtu.be/GjOBTstnW20', customname='youtube')
# Youtube player embed link
handle_master('https://www.youtube.com/watch?feature=player_embedded&amp;v=bEgeh5hA5ko')
# Youtube player embed link
handle_master('https://www.youtube.com/watch?feature=player_embedded&amp;v=bEgeh5hA5ko')
def test_twitter():
# Tiwtter with twitter-image embed
handle_master('https://twitter.com/PetoLucem/status/599493836214272000')
# Tiwtter with twitter-image embed
handle_master('https://twitter.com/PetoLucem/status/599493836214272000')
# Twitter with twitter-image embed
handle_master('https://twitter.com/Jalopnik/status/598287843128188929')
# Twitter with twitter-image embed
handle_master('https://twitter.com/Jalopnik/status/598287843128188929')
# Twitter with twitter-image embed and customname
handle_master('https://twitter.com/Jalopnik/status/598287843128188929', customname='twits')
# Twitter with twitter-image embed and customname
handle_master('https://twitter.com/Jalopnik/status/598287843128188929', customname='twits')
# Twitter with youtube embed
handle_master('https://twitter.com/cp_orange_x3/status/599705117420457984')
# Twitter with youtube embed
handle_master('https://twitter.com/cp_orange_x3/status/599705117420457984')
# Twitter plain text
handle_master('https://twitter.com/cp_orange_x3/status/599700702382817280')
# Twitter plain text
handle_master('https://twitter.com/cp_orange_x3/status/599700702382817280')
# Twitter plain text
handle_master('https://twitter.com/SyriacMFS/status/556513635913437184')
# Twitter plain text
handle_master('https://twitter.com/SyriacMFS/status/556513635913437184')
# Twitter with arabic characters
handle_master('https://twitter.com/HadiAlabdallah/status/600885154991706113')
# Twitter with arabic characters
handle_master('https://twitter.com/HadiAlabdallah/status/600885154991706113')
def test_generic():
# Some link that might work
handle_master('https://raw.githubusercontent.com/voussoir/reddit/master/SubredditBirthdays/show/statistics.txt')
# Some link that might work
handle_master('https://raw.githubusercontent.com/voussoir/reddit/master/SubredditBirthdays/show/statistics.txt')
# Some link that might work with customname
handle_master('https://raw.githubusercontent.com/voussoir/reddit/master/SubredditBirthdays/show/statistics.txt', customname='sss')
# Some link that might work with customname
handle_master('https://raw.githubusercontent.com/voussoir/reddit/master/SubredditBirthdays/show/statistics.txt', customname='sss')
# Some link that might work
handle_master('https://github.com/voussoir/reddit/tree/master/SubredditBirthdays/show')
# Some link that might work
handle_master('https://github.com/voussoir/reddit/tree/master/SubredditBirthdays/show')
if __name__ == '__main__':
if len(sys.argv) > 1:
handle_master(sys.argv[1])
else:
#test_imgur()
#test_gfycat()
#test_vimeo()
test_liveleak()
test_youtube()
#test_twitter()
#test_generic()
pass
if len(sys.argv) > 1:
handle_master(sys.argv[1])
else:
#test_imgur()
#test_gfycat()
#test_vimeo()
test_liveleak()
test_youtube()
#test_twitter()
#test_generic()
pass