pixelcanvas/pixelcanvas.py

580 lines
19 KiB
Python

import argparse
import datetime
import gzip
import PIL.Image
import random
import requests
import sqlite3
import sys
import time
from voussoirkit import threadpool
from voussoirkit import vlogging
log = vlogging.getLogger(__name__, 'pixelcanvasdl')
vlogging.getLogger('urllib3.connectionpool').setLevel(vlogging.CRITICAL)
WHITE = (255, 255, 255)
LIGHTGRAY = (228, 228, 228)
DARKGRAY = (136, 136, 136)
BLACK = (34, 34, 34)
PINK = (255, 167, 209)
RED = (229, 0, 0)
ORANGE = (229, 149, 0)
BROWN = (160, 106, 66)
YELLOW = (229, 217, 0)
LIGHTGREEN = (148, 224, 68)
DARKGREEN = (2, 190, 1)
LIGHTBLUE = (0, 211, 221)
MEDIUMBLUE = (0, 131, 199)
DARKBLUE = (0, 0, 234)
LIGHTPURPLE = (207, 110, 228)
DARKPURPLE = (130, 0, 128)
COLOR_MAP = {
0: WHITE,
1: LIGHTGRAY,
2: DARKGRAY,
3: BLACK,
4: PINK,
5: RED,
6: ORANGE,
7: BROWN,
8: YELLOW,
9: LIGHTGREEN,
10: DARKGREEN,
11: LIGHTBLUE,
12: MEDIUMBLUE,
13: DARKBLUE,
14: LIGHTPURPLE,
15: DARKPURPLE,
}
# The width and height of a chunk, in pixels.
CHUNK_SIZE_PIX = 64
# The number of bytes for a full chunk.
# They are 32x64 because each byte represents two 4-bit pixels.
CHUNK_SIZE_BYTES = int(CHUNK_SIZE_PIX * (CHUNK_SIZE_PIX / 2))
# The width and height of a bigchunk, in chunks.
BIGCHUNK_SIZE_CHUNKS = 15
# The width and height of a bigchunk, in pixels.
BIGCHUNK_SIZE_PIX = BIGCHUNK_SIZE_CHUNKS * CHUNK_SIZE_PIX
# The number of bytes for a full bigchunk.
BIGCHUNK_SIZE_BYTES = int(BIGCHUNK_SIZE_PIX * (BIGCHUNK_SIZE_PIX / 2))
# The chunk 0, 0 has a pixel coordinate of -448, -448 for some reason.
ORIGIN_OFFSET_X = 448
ORIGIN_OFFSET_Y = 448
DB_INIT = '''
BEGIN;
CREATE TABLE IF NOT EXISTS chunks (x INT, y INT, data BLOB, updated_at REAL);
CREATE INDEX IF NOT EXISTS chunks_x_y ON chunks(x, y);
COMMIT;
'''
sql = sqlite3.connect('pixelcanvas.db')
cur = sql.cursor()
cur.executescript(DB_INIT)
session = requests.Session()
# HELPER FUNCTIONS
################################################################################
def now():
n = datetime.datetime.now(datetime.timezone.utc)
return n.timestamp()
# DB FUNCTIONS
################################################################################
def db_commit():
log.debug('Committing')
sql.commit()
def get_chunk_from_db(chunk_x, chunk_y, as_of=None):
'''
Get the chunk from the database, and raise IndexError if it doesn't exist.
'''
query = f'''
SELECT x, y, data FROM chunks
WHERE x == ? AND y == ?
{'AND updated_at <= ?' if as_of is not None else ''}
ORDER BY updated_at DESC
LIMIT 1
'''
bindings = [chunk_x, chunk_y]
if as_of is not None:
bindings.append(as_of)
cur.execute(query, bindings)
fetch = cur.fetchone()
if fetch is None:
raise IndexError(f'{chunk_x}, {chunk_y}')
(x, y, data) = fetch
data = gzip.decompress(data)
return (x, y, data)
def get_chunk(chunk_x, chunk_y, *args, **kwargs):
'''
Get the chunk from the database if it exists, or else download it.
'''
try:
return get_chunk_from_db(chunk_x, chunk_y, *args, **kwargs)
except IndexError:
(bigchunk_x, bigchunk_y) = chunk_to_bigchunk(chunk_x, chunk_y)
chunks = download_bigchunk(bigchunk_x, bigchunk_y)
insert_chunks(chunks)
return get_chunk_from_db(chunk_x, chunk_y, *args, **kwargs)
def insert_chunk(chunk_x, chunk_y, data, commit=True):
try:
existing_chunk = get_chunk_from_db(chunk_x, chunk_y)
except IndexError:
pass
else:
if data == existing_chunk[2]:
return
log.debug('Updating chunk %s %s', chunk_x, chunk_y)
data = gzip.compress(data)
cur.execute('INSERT INTO chunks VALUES(?, ?, ?, ?)', [chunk_x, chunk_y, data, now()])
if commit:
db_commit()
def insert_chunks(chunks, commit=True):
for (index, chunk) in enumerate(chunks):
if index % 25000 == 0 and commit:
db_commit()
insert_chunk(*chunk, commit=False)
if commit:
db_commit()
# API FUNCTIONS
################################################################################
def url_for_bigchunk(bigchunk_x, bigchunk_y):
return f'http://api.pixelcanvas.io/api/bigchunk/{bigchunk_x}.{bigchunk_y}.bmp'
def request(url):
response = session.get(url)
response.raise_for_status()
return response
def download_bigchunk(bigchunk_x, bigchunk_y):
'''
Download a bigchunk and return the list of chunks.
'''
url = url_for_bigchunk(bigchunk_x, bigchunk_y)
log.info('Downloading %s', url)
response = request(url)
bigchunk_data = response.content
if len(bigchunk_data) != BIGCHUNK_SIZE_BYTES:
message = 'Received bigchunk does not matched the expected byte size!\n'
message += 'Got %d instead of %d' % (len(bigchunk_data), BIGCHUNK_SIZE_BYTES)
raise ValueError(message)
chunks = split_bigchunk(bigchunk_x, bigchunk_y, bigchunk_data)
return chunks
def download_bigchunk_range(bigchunk_xy1, bigchunk_xy2, shuffle=False, threads=1):
'''
Given (UPPERLEFT_X, UPPERLEFT_Y), (LOWERRIGHT_X, LOWERRIGHT_Y),
download multiple bigchunks, and yield all of the small chunks.
'''
if threads < 1:
raise ValueError(threads)
log.debug('Downloading bigchunk range %s-%s', bigchunk_xy1, bigchunk_xy2)
bigchunks = bigchunk_range_iterator(bigchunk_xy1, bigchunk_xy2)
if shuffle:
bigchunks = list(bigchunks)
random.shuffle(bigchunks)
if threads == 1:
for (x, y) in bigchunks:
chunks = download_bigchunk(x, y)
yield from chunks
else:
pool = threadpool.ThreadPool(size=threads)
job_gen = (
{'function': download_bigchunk, 'args': (x, y), 'name': (x, y),}
for (x, y) in bigchunks
)
pool.add_generator(job_gen)
for job in pool.result_generator():
job.join()
if job.exception:
raise job.exception
yield from job.value
# CHUNK FUNCTIONS
################################################################################
def chunk_range_iterator(chunk_xy1, chunk_xy2):
'''
Given (UPPERLEFT_X, UPPERLEFT_Y), (LOWERRIGHT_X, LOWERRIGHT_Y),
yield (x, y) pairs for chunks in this range, inclusive.
'''
for x in range(chunk_xy1[0], chunk_xy2[0] + 1):
for y in range(chunk_xy1[1], chunk_xy2[1] + 1):
yield (x, y)
def bigchunk_range_iterator(bigchunk_xy1, bigchunk_xy2):
'''
Given (UPPERLEFT_X, UPPERLEFT_Y), (LOWERRIGHT_X, LOWERRIGHT_Y),
yield (x, y) pairs for bigchunks in this range, inclusive.
'''
for x in range(bigchunk_xy1[0], bigchunk_xy2[0] + BIGCHUNK_SIZE_CHUNKS, BIGCHUNK_SIZE_CHUNKS):
for y in range(bigchunk_xy1[1], bigchunk_xy2[1] + BIGCHUNK_SIZE_CHUNKS, BIGCHUNK_SIZE_CHUNKS):
yield (x, y)
def chunk_to_bigchunk(chunk_x, chunk_y):
bigchunk_x = (chunk_x // BIGCHUNK_SIZE_CHUNKS) * BIGCHUNK_SIZE_CHUNKS
bigchunk_y = (chunk_y // BIGCHUNK_SIZE_CHUNKS) * BIGCHUNK_SIZE_CHUNKS
log.loud('Converted chunk %s, %s to bigchunk %s, %s', chunk_x, chunk_y, bigchunk_x, bigchunk_y)
return (bigchunk_x, bigchunk_y)
def chunk_range_to_bigchunk_range(chunk_xy1, chunk_xy2):
bigchunk_range = (chunk_to_bigchunk(*chunk_xy1), chunk_to_bigchunk(*chunk_xy2))
return bigchunk_range
def chunk_to_pixel(chunk_x, chunk_y):
pixel_x = chunk_x * CHUNK_SIZE_PIX - ORIGIN_OFFSET_X
pixel_y = chunk_y * CHUNK_SIZE_PIX - ORIGIN_OFFSET_Y
log.loud('Converted chunk %s, %s to pixel %s, %s', chunk_x, chunk_y, pixel_x, pixel_y)
return (pixel_x, pixel_y)
def chunk_range_to_pixel_range(chunk_xy1, chunk_xy2):
pixel_range = (chunk_to_pixel(*chunk_xy1), chunk_to_pixel(*chunk_xy2))
return pixel_range
def pixel_to_chunk(pixel_x, pixel_y):
chunk_x = (pixel_x + ORIGIN_OFFSET_X) // CHUNK_SIZE_PIX
chunk_y = (pixel_y + ORIGIN_OFFSET_Y) // CHUNK_SIZE_PIX
log.loud('Converted pixel %s, %s to chunk %s, %s', pixel_x, pixel_y, chunk_x, chunk_y)
return (chunk_x, chunk_y)
def pixel_range_to_chunk_range(pixel_xy1, pixel_xy2):
chunk_range = (pixel_to_chunk(*pixel_xy1), pixel_to_chunk(*pixel_xy2))
log.loud('Converted pixel range %s, %s to chunk range %s, %s', pixel_xy1, pixel_xy2, *chunk_range)
return chunk_range
def pixel_to_bigchunk(pixel_x, pixel_y):
bigchunk_x = ((pixel_x + ORIGIN_OFFSET_X) // BIGCHUNK_SIZE_PIX) * BIGCHUNK_SIZE_CHUNKS
bigchunk_y = ((pixel_y + ORIGIN_OFFSET_Y) // BIGCHUNK_SIZE_PIX) * BIGCHUNK_SIZE_CHUNKS
log.loud('Converted pixel %s, %s to bigchunk %s, %s', pixel_x, pixel_y, bigchunk_x, bigchunk_y)
return (bigchunk_x, bigchunk_y)
def pixel_range_to_bigchunk_range(pixel_xy1, pixel_xy2):
bigchunk_range = (pixel_to_bigchunk(*pixel_xy1), pixel_to_bigchunk(*pixel_xy2))
log.loud('Converted pixel range %s, %s to bigchunk range %s, %s', pixel_xy1, pixel_xy2, *bigchunk_range)
return bigchunk_range
def split_bigchunk(bigchunk_x, bigchunk_y, bigchunk_data):
'''
Chunks are downloaded from the site as a "bigchunk" which is just 15x15
chunks stitched together.
The chunks are arranged left to right, top to bottom.
For example, the byte stream:
000011112222333344445555666677778888
represents the bitmap:
001122
001122
334455
334455
667788
667788
'''
chunks = []
chunk_count = int(len(bigchunk_data) / CHUNK_SIZE_BYTES)
for chunk_index in range(chunk_count):
chunk_x = (chunk_index % BIGCHUNK_SIZE_CHUNKS) + bigchunk_x
chunk_y = (chunk_index // BIGCHUNK_SIZE_CHUNKS) + bigchunk_y
start_index = chunk_index * CHUNK_SIZE_BYTES
end_index = start_index + CHUNK_SIZE_BYTES
chunk_data = bigchunk_data[start_index:end_index]
chunk = (chunk_x, chunk_y, chunk_data)
chunks.append(chunk)
return chunks
# IMAGE FUNCTIONS
################################################################################
def chunk_to_rgb(chunk_data):
'''
Convert the data chunk into RGB tuples.
PixelCanvas chunks are strings of bytes where every byte represents two
horizontal pixels. Each pixel is 4 bits since there are 16 colors.
Chunks are 32x64 bytes for a total of 64x64 pixels.
'''
# Each byte actually represents two horizontal pixels. 8F is actually 8, F.
# So create a generator that takes in the bytes and yields the pixel bits.
pixels = (
pixel
for byte in chunk_data
for pixel in (byte >> 4, byte & 0xf)
)
matrix = [None for x in range(len(chunk_data) * 2)]
for (index, pixel) in enumerate(pixels):
px = index % CHUNK_SIZE_PIX
py = index // CHUNK_SIZE_PIX
matrix[(py * CHUNK_SIZE_PIX) + px] = COLOR_MAP[pixel]
return matrix
def rgb_to_image(matrix):
matrix = bytes([color for pixel in matrix for color in pixel])
i = PIL.Image.frombytes(mode='RGB', size=(CHUNK_SIZE_PIX, CHUNK_SIZE_PIX), data=matrix)
return i
def chunk_to_image(chunk_data, scale=1):
image = rgb_to_image(chunk_to_rgb(chunk_data))
if scale is not None and scale != 1:
new_size = (int(image.size[0] * scale), int(image.size[1] * scale))
image = image.resize(new_size, resample=PIL.Image.NEAREST)
return image
def chunks_to_image(chunks, scale=1):
'''
Combine all of the given chunks into a single image.
'''
log.debug('Creating image from %s chunks', len(chunks))
min_x = min(chunk[0] for chunk in chunks)
max_x = max(chunk[0] for chunk in chunks)
min_y = min(chunk[1] for chunk in chunks)
max_y = max(chunk[1] for chunk in chunks)
span_x = max_x - min_x + 1
span_y = max_y - min_y + 1
img_width = int(span_x * CHUNK_SIZE_PIX * scale)
img_height = int(span_y * CHUNK_SIZE_PIX * scale)
img = PIL.Image.new(mode='RGB', size=(img_width, img_height), color=WHITE)
for (chunk_x, chunk_y, chunk_data) in chunks:
paste_x = int((chunk_x - min_x) * CHUNK_SIZE_PIX * scale)
paste_y = int((chunk_y - min_y) * CHUNK_SIZE_PIX * scale)
chunk_image = chunk_to_image(chunk_data, scale)
img.paste(chunk_image, (paste_x, paste_y))
return img
def crop_image(image, pixel_xy1, pixel_xy2):
'''
Because the images are rendered on a chunk basis, they are probably larger
than the exact area that you want. Use this function to crop the image to
the exact coordinates.
pixel_xy1 and pixel_xy2 are the world coordinates that you used to get this
image in the first place, not coordinates within this picture.
'''
img_width = pixel_xy2[0] - pixel_xy1[0] + 1
img_height = pixel_xy2[1] - pixel_xy1[1] + 1
basis_xy = chunk_to_pixel(*pixel_to_chunk(*pixel_xy1))
xy1 = (pixel_xy1[0] - (basis_xy[0]), pixel_xy1[1] - (basis_xy[1]))
xy2 = (xy1[0] + img_width, xy1[1] + img_height)
bbox = (xy1[0], xy1[1], xy2[0], xy2[1])
log.debug('Cropping image down to %s', bbox)
image = image.crop(bbox)
return image
# COMMAND LINE
################################################################################
from voussoirkit import betterhelp
DOCSTRING = '''
This tool is run from the command line, where you provide the coordinates you
want to download and render.
The format for typing coordinates is `UPPERLEFT--LOWERRIGHT`. The format for
each of those pieces is `X.Y`.
Sometimes, argparse gets confused by negative coordinates because it thinks
you're trying to provide another argument. Sorry.
If this happens, use a tilde `~` as the negative sign instead.
Remember, because this is an image, up and left are negative;
down and right are positive.
Commands:
{update}
{render}
So, for example:
> pixelcanvas.py update 0.0--100.100
> pixelcanvas.py update ~100.~100--100.100
> pixelcanvas.py update ~1200.300--~900.600
> pixelcanvas.py render 0.0--100.100
> pixelcanvas.py render ~100.~100--100.100 --scale 2
> pixelcanvas.py render ~1200.300--~900.600 --show
'''
SUB_DOCSTRINGS = dict(
overview='''
overview:
Draw an ascii map representing the owned chunks.
'''.strip(),
update='''
update:
Download chunks into the database.
> pixelcanvas.py update ~100.~100--100.100
flags:
--chunks:
The coordinates which you provided are chunk coordinates instead of
pixel coordinates.
--shuffle:
Download chunks in a random order instead of from corner to corner.
--threads X:
Use X threads to download bigchunks.
'''.strip(),
render='''
render:
Export an image as PNG.
> pixelcanvas.py render 0.0--100.100 <flags>
flags:
--chunks:
The coordinates which you provided are chunk coordinates instead of
pixel coordinates.
--scale <float>:
Render the image at a different scale.
For best results, use powers of 2 like 0.5, 0.25, etc.
This will disable the autocropping.
--show:
Instead of saving the image, display it on the screen.
https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.show
'''.strip(),
)
DOCSTRING = betterhelp.add_previews(DOCSTRING, SUB_DOCSTRINGS)
def parse_coordinate_string(coordinates):
'''
Convert the given '~100.~100--100.100' to ((-100, -100), (100, 100)).
'''
coordinates = coordinates.strip()
if '--' in coordinates:
(xy1, xy2) = coordinates.split('--', 1)
else:
# If you are only updating a single chunk.
xy1 = coordinates
xy2 = coordinates
def split_xy(xy):
xy = xy.replace('~', '-')
xy = xy.replace(',', '.')
(x, y) = xy.split('.')
return (int(x), int(y))
(xy1, xy2) = (split_xy(xy1), split_xy(xy2))
log.debug('Parsed coordinates %s into %s %s', coordinates, xy1, xy2)
return (xy1, xy2)
def overview_argparse(args):
cur.execute('SELECT x, y, updated_at FROM chunks GROUP BY x, y ORDER BY updated_at DESC')
chunks = cur.fetchall()
min_x = min(chunk[0] for chunk in chunks)
max_x = max(chunk[0] for chunk in chunks)
min_y = min(chunk[1] for chunk in chunks)
max_y = max(chunk[1] for chunk in chunks)
width = max_x - min_x + 1
height = max_y - min_y + 1
x_offset = abs(min(min_x, 0))
y_offset = abs(min(min_y, 0))
matrix = [[' ' for x in range(width)] for y in range(height)]
for (x, y, updated_at) in chunks:
x += x_offset
y += y_offset
matrix[y][x] = '.'
for (x, y, updated_at) in chunks:
if (x % 15 == 0) and (y % 15 == 0):
text = f'{x},{y}'
x += x_offset
y += y_offset
for c in text:
matrix[y][x] = c
x+=1
for row in matrix:
for column in row:
print(column, end='')
print()
def render_argparse(args):
coordinates = parse_coordinate_string(args.coordinates)
if args.is_chunks:
chunk_range = coordinates
coordinates = chunk_range_to_pixel_range(*coordinates)
else:
chunk_range = pixel_range_to_chunk_range(*coordinates)
chunks = [get_chunk(*chunk_xy) for chunk_xy in chunk_range_iterator(*chunk_range)]
scale = float(args.scale)
image = chunks_to_image(chunks, scale=scale)
if scale == 1 and not args.is_chunks:
image = crop_image(image, *coordinates)
if args.do_show:
image.show()
else:
((p1x, p1y), (p2x, p2y)) = coordinates
scale_s = f'_{scale}' if scale != 1 else ''
filename = f'{p1x}.{p1y}--{p2x}.{p2y}{scale_s}.png'
image.save(filename)
log.debug('Wrote %s', filename)
def update_argparse(args):
coordinates = parse_coordinate_string(args.coordinates)
if args.is_chunks:
bigchunk_range = chunk_range_to_bigchunk_range(*coordinates)
else:
bigchunk_range = pixel_range_to_bigchunk_range(*coordinates)
chunks = download_bigchunk_range(*bigchunk_range, shuffle=args.shuffle, threads=args.threads)
try:
insert_chunks(chunks)
except KeyboardInterrupt:
db_commit()
@vlogging.main_decorator
def main(argv):
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
p_update = subparsers.add_parser('update')
p_update.add_argument('coordinates')
p_update.add_argument('--chunks', dest='is_chunks', action='store_true')
p_update.add_argument('--shuffle', dest='shuffle', action='store_true')
p_update.add_argument('--threads', dest='threads', type=int, default=1)
p_update.set_defaults(func=update_argparse)
p_render = subparsers.add_parser('render')
p_render.add_argument('coordinates')
p_render.add_argument('--chunks', dest='is_chunks', action='store_true')
p_render.add_argument('--show', dest='do_show', action='store_true')
p_render.add_argument('--scale', dest='scale', default=1)
p_render.set_defaults(func=render_argparse)
p_overview = subparsers.add_parser('overview')
p_overview.set_defaults(func=overview_argparse)
return betterhelp.subparser_main(
argv,
parser,
main_docstring=DOCSTRING,
sub_docstrings=SUB_DOCSTRINGS,
)
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))