2019-06-12 05:41:31 +00:00
|
|
|
'''
|
2021-12-03 03:53:43 +00:00
|
|
|
Recompress jpeg images to save disk space.
|
2019-06-12 05:41:31 +00:00
|
|
|
'''
|
2020-12-24 04:05:35 +00:00
|
|
|
import argparse
|
2019-06-12 05:41:31 +00:00
|
|
|
import io
|
|
|
|
import os
|
|
|
|
import PIL.Image
|
|
|
|
import PIL.ImageFile
|
|
|
|
import sys
|
|
|
|
|
2020-12-24 04:05:35 +00:00
|
|
|
from voussoirkit import bytestring
|
2021-01-06 04:29:54 +00:00
|
|
|
from voussoirkit import imagetools
|
2021-02-08 10:52:08 +00:00
|
|
|
from voussoirkit import pipeable
|
|
|
|
from voussoirkit import spinal
|
2021-09-24 06:42:34 +00:00
|
|
|
from voussoirkit import vlogging
|
|
|
|
|
|
|
|
log = vlogging.getLogger(__name__, 'rejpg')
|
2020-12-24 04:05:35 +00:00
|
|
|
|
2019-06-12 05:41:31 +00:00
|
|
|
PIL.ImageFile.LOAD_TRUNCATED_IMAGES = True
|
|
|
|
|
2022-11-20 18:22:00 +00:00
|
|
|
def compress_to_filesize(image, target_size, *, exif=None):
|
|
|
|
lower = 1
|
|
|
|
lower_bio = None
|
|
|
|
upper = 100
|
|
|
|
current = 100
|
|
|
|
while True:
|
|
|
|
# print(lower, current, upper)
|
|
|
|
if lower == (upper - 1):
|
|
|
|
break
|
|
|
|
bio = io.BytesIO()
|
|
|
|
image.save(bio, format='jpeg', exif=exif, quality=current)
|
|
|
|
bio.seek(0)
|
|
|
|
size = len(bio.read())
|
|
|
|
if size > target_size:
|
|
|
|
upper = current
|
|
|
|
if size <= target_size:
|
|
|
|
lower = current
|
|
|
|
lower_bio = bio
|
|
|
|
current = (lower + upper) // 2
|
|
|
|
|
|
|
|
if lower_bio is None:
|
|
|
|
raise RuntimeError(f'Could not compress below {target_size}')
|
|
|
|
|
|
|
|
return lower_bio
|
|
|
|
|
2020-12-24 04:05:35 +00:00
|
|
|
def rejpg_argparse(args):
|
2021-02-08 10:52:08 +00:00
|
|
|
patterns = pipeable.input_many(args.patterns, skip_blank=True, strip=True)
|
|
|
|
files = spinal.walk(recurse=args.recurse, glob_filenames=patterns)
|
2020-12-24 04:05:35 +00:00
|
|
|
|
2019-06-12 05:41:31 +00:00
|
|
|
files = [f.absolute_path for f in files]
|
|
|
|
|
2020-12-24 04:05:35 +00:00
|
|
|
bytes_saved = 0
|
|
|
|
remaining_size = 0
|
|
|
|
for filename in files:
|
2021-09-24 06:42:34 +00:00
|
|
|
log.info('Processing %s.', filename)
|
2021-06-04 03:43:27 +00:00
|
|
|
image = PIL.Image.open(filename)
|
2023-03-19 18:21:33 +00:00
|
|
|
icc_profile = image.info.get('icc_profile')
|
2020-12-24 04:05:35 +00:00
|
|
|
|
2021-06-07 03:33:32 +00:00
|
|
|
(image, exif) = imagetools.rotate_by_exif(image)
|
2020-12-24 04:05:35 +00:00
|
|
|
|
2022-11-20 18:22:00 +00:00
|
|
|
if args.filesize:
|
|
|
|
target_size = bytestring.parsebytes(args.filesize)
|
|
|
|
bytesio = compress_to_filesize(image, target_size, exif=exif)
|
|
|
|
else:
|
|
|
|
bytesio = io.BytesIO()
|
2023-03-19 18:21:33 +00:00
|
|
|
image.save(bytesio, format='jpeg', exif=exif, quality=args.quality, icc_profile=icc_profile)
|
2020-12-24 04:05:35 +00:00
|
|
|
|
|
|
|
bytesio.seek(0)
|
|
|
|
new_bytes = bytesio.read()
|
|
|
|
old_size = os.path.getsize(filename)
|
|
|
|
new_size = len(new_bytes)
|
|
|
|
remaining_size += new_size
|
|
|
|
if new_size < old_size:
|
|
|
|
bytes_saved += (old_size - new_size)
|
|
|
|
f = open(filename, 'wb')
|
|
|
|
f.write(new_bytes)
|
|
|
|
f.close()
|
|
|
|
|
2021-10-23 01:38:27 +00:00
|
|
|
log.info('Saved %s.', bytestring.bytestring(bytes_saved))
|
|
|
|
log.info('Remaining are %s.', bytestring.bytestring(remaining_size))
|
2021-09-24 06:42:34 +00:00
|
|
|
return 0
|
2020-12-24 04:05:35 +00:00
|
|
|
|
2021-09-24 06:42:34 +00:00
|
|
|
@vlogging.main_decorator
|
2020-12-24 04:05:35 +00:00
|
|
|
def main(argv):
|
|
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
|
|
|
2021-02-08 10:52:08 +00:00
|
|
|
parser.add_argument('patterns', nargs='+', default={'*.jpg', '*.jpeg'})
|
2021-03-11 00:30:38 +00:00
|
|
|
parser.add_argument('--quality', type=int, default=80)
|
2022-11-20 18:22:00 +00:00
|
|
|
parser.add_argument('--filesize', type=str, default=None)
|
2021-02-21 05:01:55 +00:00
|
|
|
parser.add_argument('--recurse', action='store_true')
|
2020-12-24 04:05:35 +00:00
|
|
|
parser.set_defaults(func=rejpg_argparse)
|
|
|
|
|
|
|
|
args = parser.parse_args(argv)
|
|
|
|
return args.func(args)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
raise SystemExit(main(sys.argv[1:]))
|