import argparse import datetime import gzip import logging import PIL.Image import random import requests import sqlite3 import sys import time from voussoirkit import threadpool logging.basicConfig(level=logging.DEBUG) log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) logging.getLogger('urllib3.connectionpool').setLevel(logging.CRITICAL) WHITE = (255, 255, 255) LIGHTGRAY = (228, 228, 228) DARKGRAY = (136, 136, 136) BLACK = (34, 34, 34) PINK = (255, 167, 209) RED = (229, 0, 0) ORANGE = (229, 149, 0) BROWN = (160, 106, 66) YELLOW = (229, 217, 0) LIGHTGREEN = (148, 224, 68) DARKGREEN = (2, 190, 1) LIGHTBLUE = (0, 211, 221) MEDIUMBLUE = (0, 131, 199) DARKBLUE = (0, 0, 234) LIGHTPURPLE = (207, 110, 228) DARKPURPLE = (130, 0, 128) COLOR_MAP = { 0: WHITE, 1: LIGHTGRAY, 2: DARKGRAY, 3: BLACK, 4: PINK, 5: RED, 6: ORANGE, 7: BROWN, 8: YELLOW, 9: LIGHTGREEN, 10: DARKGREEN, 11: LIGHTBLUE, 12: MEDIUMBLUE, 13: DARKBLUE, 14: LIGHTPURPLE, 15: DARKPURPLE, } # The width and height of a chunk, in pixels. CHUNK_SIZE_PIX = 64 # The number of bytes for a full chunk. # They are 32x64 because each byte represents two 4-bit pixels. CHUNK_SIZE_BYTES = int(CHUNK_SIZE_PIX * (CHUNK_SIZE_PIX / 2)) # The width and height of a bigchunk, in chunks. BIGCHUNK_SIZE_CHUNKS = 15 # The width and height of a bigchunk, in pixels. BIGCHUNK_SIZE_PIX = BIGCHUNK_SIZE_CHUNKS * CHUNK_SIZE_PIX # The number of bytes for a full bigchunk. BIGCHUNK_SIZE_BYTES = int(BIGCHUNK_SIZE_PIX * (BIGCHUNK_SIZE_PIX / 2)) # The chunk 0, 0 has a pixel coordinate of -448, -448 for some reason. ORIGIN_OFFSET_X = 448 ORIGIN_OFFSET_Y = 448 DB_INIT = ''' BEGIN; CREATE TABLE IF NOT EXISTS chunks (x INT, y INT, data BLOB, updated_at REAL); CREATE INDEX IF NOT EXISTS chunks_x_y ON chunks(x, y); COMMIT; ''' sql = sqlite3.connect('pixelcanvas.db') cur = sql.cursor() cur.executescript(DB_INIT) session = requests.Session() # HELPER FUNCTIONS ################################################################################ def now(): n = datetime.datetime.now(datetime.timezone.utc) return n.timestamp() # DB FUNCTIONS ################################################################################ def get_chunk_from_db(chunk_x, chunk_y, as_of=None): ''' Get the chunk from the database, and raise IndexError if it doesn't exist. ''' query = f''' SELECT x, y, data FROM chunks WHERE x == ? AND y == ? {'AND updated_at <= ?' if as_of is not None else ''} ORDER BY updated_at DESC LIMIT 1 ''' bindings = [chunk_x, chunk_y] if as_of is not None: bindings.append(as_of) cur.execute(query, bindings) fetch = cur.fetchone() if fetch is None: raise IndexError(f'{chunk_x}, {chunk_y}') (x, y, data) = fetch data = gzip.decompress(data) return (x, y, data) def get_chunk(chunk_x, chunk_y, *args, **kwargs): ''' Get the chunk from the database if it exists, or else download it. ''' try: return get_chunk_from_db(chunk_x, chunk_y, *args, **kwargs) except IndexError: (bigchunk_x, bigchunk_y) = chunk_to_bigchunk(chunk_x, chunk_y) chunks = download_bigchunk(bigchunk_x, bigchunk_y) insert_chunks(chunks) return get_chunk_from_db(chunk_x, chunk_y, *args, **kwargs) def insert_chunk(chunk_x, chunk_y, data, commit=True): try: existing_chunk = get_chunk_from_db(chunk_x, chunk_y) except IndexError: pass else: if data == existing_chunk[2]: return # log.debug('Updating chunk %s %s', chunk_x, chunk_y) data = gzip.compress(data) cur.execute('INSERT INTO chunks VALUES(?, ?, ?, ?)', [chunk_x, chunk_y, data, now()]) if commit: sql.commit() def insert_chunks(chunks, commit=True): for (index, chunk) in enumerate(chunks): if index % 25000 == 0 and commit: sql.commit() insert_chunk(*chunk, commit=False) if commit: sql.commit() # API FUNCTIONS ################################################################################ def url_for_bigchunk(bigchunk_x, bigchunk_y): return f'http://api.pixelcanvas.io/api/bigchunk/{bigchunk_x}.{bigchunk_y}.bmp' def request(url): response = session.get(url) response.raise_for_status() return response def download_bigchunk(bigchunk_x, bigchunk_y): ''' Download a bigchunk and return the list of chunks. ''' url = url_for_bigchunk(bigchunk_x, bigchunk_y) logging.info('Downloading %s', url) response = request(url) bigchunk_data = response.content if len(bigchunk_data) != BIGCHUNK_SIZE_BYTES: message = 'Received bigchunk does not matched the expected byte size!\n' message += 'Got %d instead of %d' % (len(bigchunk_data), BIGCHUNK_SIZE_BYTES) raise ValueError(message) chunks = split_bigchunk(bigchunk_x, bigchunk_y, bigchunk_data) return chunks def download_bigchunk_range(bigchunk_xy1, bigchunk_xy2, shuffle=False, threads=1): ''' Given (UPPERLEFT_X, UPPERLEFT_Y), (LOWERRIGHT_X, LOWERRIGHT_Y), download multiple bigchunks, and yield all of the small chunks. ''' bigchunks = bigchunk_range_iterator(bigchunk_xy1, bigchunk_xy2) if shuffle: bigchunks = list(bigchunks) random.shuffle(bigchunks) if threads < 1: raise ValueError(threads) if threads == 1: for (x, y) in bigchunks: chunks = download_bigchunk(x, y) yield from chunks else: pool = threadpool.ThreadPool(size=threads) kwargss = [ {'function': download_bigchunk, 'args': (x, y), 'name': (x, y),} for (x, y) in bigchunks ] jobs = pool.add_many(kwargss) while jobs: job = jobs.pop(0) job.join() if job.exception: raise job.exception yield from job.value # CHUNK FUNCTIONS ################################################################################ def chunk_range_iterator(chunk_xy1, chunk_xy2): ''' Given (UPPERLEFT_X, UPPERLEFT_Y), (LOWERRIGHT_X, LOWERRIGHT_Y), yield (x, y) pairs for chunks in this range, inclusive. ''' for x in range(chunk_xy1[0], chunk_xy2[0] + 1): for y in range(chunk_xy1[1], chunk_xy2[1] + 1): yield (x, y) def bigchunk_range_iterator(bigchunk_xy1, bigchunk_xy2): ''' Given (UPPERLEFT_X, UPPERLEFT_Y), (LOWERRIGHT_X, LOWERRIGHT_Y), yield (x, y) pairs for bigchunks in this range, inclusive. ''' for x in range(bigchunk_xy1[0], bigchunk_xy2[0] + BIGCHUNK_SIZE_CHUNKS, BIGCHUNK_SIZE_CHUNKS): for y in range(bigchunk_xy1[1], bigchunk_xy2[1] + BIGCHUNK_SIZE_CHUNKS, BIGCHUNK_SIZE_CHUNKS): yield (x, y) def chunk_to_bigchunk(chunk_x, chunk_y): bigchunk_x = (chunk_x // BIGCHUNK_SIZE_CHUNKS) * BIGCHUNK_SIZE_CHUNKS bigchunk_y = (chunk_y // BIGCHUNK_SIZE_CHUNKS) * BIGCHUNK_SIZE_CHUNKS # log.debug('Converted chunk %s, %s to bigchunk %s, %s', chunk_x, chunk_y, bigchunk_x, bigchunk_y) return (bigchunk_x, bigchunk_y) def chunk_range_to_bigchunk_range(chunk_xy1, chunk_xy2): bigchunk_range = (chunk_to_bigchunk(*chunk_xy1), chunk_to_bigchunk(*chunk_xy2)) return bigchunk_range def chunk_to_pixel(chunk_x, chunk_y): pixel_x = chunk_x * CHUNK_SIZE_PIX - ORIGIN_OFFSET_X pixel_y = chunk_y * CHUNK_SIZE_PIX - ORIGIN_OFFSET_Y # log.debug('Converted chunk %s, %s to pixel %s, %s', chunk_x, chunk_y, pixel_x, pixel_y) return (pixel_x, pixel_y) def chunk_range_to_pixel_range(chunk_xy1, chunk_xy2): pixel_range = (chunk_to_pixel(*chunk_xy1), chunk_to_pixel(*chunk_xy2)) return pixel_range def pixel_to_chunk(pixel_x, pixel_y): chunk_x = (pixel_x + ORIGIN_OFFSET_X) // CHUNK_SIZE_PIX chunk_y = (pixel_y + ORIGIN_OFFSET_Y) // CHUNK_SIZE_PIX # log.debug('Converted pixel %s, %s to chunk %s, %s', pixel_x, pixel_y, chunk_x, chunk_y) return (chunk_x, chunk_y) def pixel_range_to_chunk_range(pixel_xy1, pixel_xy2): chunk_range = (pixel_to_chunk(*pixel_xy1), pixel_to_chunk(*pixel_xy2)) # log.debug('Converted pixel range %s, %s to chunk range %s, %s', pixel_xy1, pixel_xy2, *chunk_range) return chunk_range def pixel_to_bigchunk(pixel_x, pixel_y): bigchunk_x = ((pixel_x + ORIGIN_OFFSET_X) // BIGCHUNK_SIZE_PIX) * BIGCHUNK_SIZE_CHUNKS bigchunk_y = ((pixel_y + ORIGIN_OFFSET_Y) // BIGCHUNK_SIZE_PIX) * BIGCHUNK_SIZE_CHUNKS # log.debug('Converted pixel %s, %s to bigchunk %s, %s', pixel_x, pixel_y, bigchunk_x, bigchunk_y) return (bigchunk_x, bigchunk_y) def pixel_range_to_bigchunk_range(pixel_xy1, pixel_xy2): bigchunk_range = (pixel_to_bigchunk(*pixel_xy1), pixel_to_bigchunk(*pixel_xy2)) # log.debug('Converted pixel range %s, %s to bigchunk range %s, %s', pixel_xy1, pixel_xy2, *bigchunk_range) return bigchunk_range def split_bigchunk(bigchunk_x, bigchunk_y, bigchunk_data): ''' Chunks are downloaded from the site as a "bigchunk" which is just 15x15 chunks stitched together. The chunks are arranged left to right, top to bottom. For example, the byte stream: 000011112222333344445555666677778888 represents the bitmap: 001122 001122 334455 334455 667788 667788 ''' chunks = [] chunk_count = int(len(bigchunk_data) / CHUNK_SIZE_BYTES) for chunk_index in range(chunk_count): chunk_x = (chunk_index % BIGCHUNK_SIZE_CHUNKS) + bigchunk_x chunk_y = (chunk_index // BIGCHUNK_SIZE_CHUNKS) + bigchunk_y start_index = chunk_index * CHUNK_SIZE_BYTES end_index = start_index + CHUNK_SIZE_BYTES chunk_data = bigchunk_data[start_index:end_index] chunk = (chunk_x, chunk_y, chunk_data) chunks.append(chunk) return chunks # IMAGE FUNCTIONS ################################################################################ def chunk_to_rgb(chunk_data): ''' Convert the data chunk into RGB tuples. PixelCanvas chunks are strings of bytes where every byte represents two horizontal pixels. Each pixel is 4 bits since there are 16 colors. Chunks are 32x64 bytes for a total of 64x64 pixels. ''' # Each byte actually represents two horizontal pixels. 8F is actually 8, F. # So create a generator that takes in the bytes and yields the pixel bits. pixels = ( pixel for byte in chunk_data for pixel in (byte >> 4, byte & 0xf) ) matrix = [None for x in range(len(chunk_data) * 2)] for (index, pixel) in enumerate(pixels): px = index % CHUNK_SIZE_PIX py = index // CHUNK_SIZE_PIX matrix[(py * CHUNK_SIZE_PIX) + px] = COLOR_MAP[pixel] return matrix def rgb_to_image(matrix): matrix = bytes([color for pixel in matrix for color in pixel]) i = PIL.Image.frombytes(mode='RGB', size=(CHUNK_SIZE_PIX, CHUNK_SIZE_PIX), data=matrix) return i def chunk_to_image(chunk_data, scale=1): image = rgb_to_image(chunk_to_rgb(chunk_data)) if scale is not None and scale != 1: new_size = (int(image.size[0] * scale), int(image.size[1] * scale)) image = image.resize(new_size, resample=PIL.Image.NEAREST) return image def chunks_to_image(chunks, scale=1): ''' Combine all of the given chunks into a single image. ''' log.debug('Creating image from %s chunks', len(chunks)) min_x = min(chunk[0] for chunk in chunks) max_x = max(chunk[0] for chunk in chunks) min_y = min(chunk[1] for chunk in chunks) max_y = max(chunk[1] for chunk in chunks) span_x = max_x - min_x + 1 span_y = max_y - min_y + 1 img_width = int(span_x * CHUNK_SIZE_PIX * scale) img_height = int(span_y * CHUNK_SIZE_PIX * scale) img = PIL.Image.new(mode='RGB', size=(img_width, img_height), color=WHITE) for (chunk_x, chunk_y, chunk_data) in chunks: paste_x = int((chunk_x - min_x) * CHUNK_SIZE_PIX * scale) paste_y = int((chunk_y - min_y) * CHUNK_SIZE_PIX * scale) chunk_image = chunk_to_image(chunk_data, scale) img.paste(chunk_image, (paste_x, paste_y)) return img def crop_image(image, pixel_xy1, pixel_xy2): ''' Because the images are rendered on a chunk basis, they are probably larger than the exact area that you want. Use this function to crop the image to the exact coordinates. pixel_xy1 and pixel_xy2 are the world coordinates that you used to get this image in the first place, not coordinates within this picture. ''' img_width = pixel_xy2[0] - pixel_xy1[0] + 1 img_height = pixel_xy2[1] - pixel_xy1[1] + 1 basis_xy = chunk_to_pixel(*pixel_to_chunk(*pixel_xy1)) xy1 = (pixel_xy1[0] - (basis_xy[0]), pixel_xy1[1] - (basis_xy[1])) xy2 = (xy1[0] + img_width, xy1[1] + img_height) bbox = (xy1[0], xy1[1], xy2[0], xy2[1]) log.debug('Cropping image down to %s', bbox) image = image.crop(bbox) return image # COMMAND LINE ################################################################################ from voussoirkit import betterhelp DOCSTRING = ''' This tool is run from the command line, where you provide the coordinates you want to download and render. The format for typing coordinates is `UPPERLEFT--LOWERRIGHT`. The format for each of those pieces is `X.Y`. Sometimes, argparse gets confused by negative coordinates because it thinks you're trying to provide another argument. Sorry. If this happens, use a tilde `~` as the negative sign instead. Remember, because this is an image, up and left are negative; down and right are positive. Commands: {update} {render} So, for example: > pixelcanvas.py update 0.0--100.100 > pixelcanvas.py update ~100.~100--100.100 > pixelcanvas.py update ~1200.300--~900.600 > pixelcanvas.py render 0.0--100.100 > pixelcanvas.py render ~100.~100--100.100 --scale 2 > pixelcanvas.py render ~1200.300--~900.600 --show ''' SUB_DOCSTRINGS = dict( overview=''' overview: Draw an ascii map representing the owned chunks. '''.strip(), update=''' update: Download chunks into the database. > pixelcanvas.py update ~100.~100--100.100 flags: --chunks: The coordinates which you provided are chunk coordinates instead of pixel coordinates. --shuffle: Download chunks in a random order instead of from corner to corner. --threads X: Use X threads to download bigchunks. '''.strip(), render=''' render: Export an image as PNG. > pixelcanvas.py render 0.0--100.100 flags: --chunks: The coordinates which you provided are chunk coordinates instead of pixel coordinates. --scale : Render the image at a different scale. For best results, use powers of 2 like 0.5, 0.25, etc. This will disable the autocropping. --show: Instead of saving the image, display it on the screen. https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.show '''.strip(), ) DOCSTRING = betterhelp.add_previews(DOCSTRING, SUB_DOCSTRINGS) def parse_coordinate_string(coordinates): ''' Convert the given '~100.~100--100.100' to ((-100, -100), (100, 100)). ''' coordinates = coordinates.strip() if '--' in coordinates: (xy1, xy2) = coordinates.split('--', 1) else: # If you are only updating a single chunk. xy1 = coordinates xy2 = coordinates def split_xy(xy): xy = xy.replace('~', '-') xy = xy.replace(',', '.') (x, y) = xy.split('.') return (int(x), int(y)) (xy1, xy2) = (split_xy(xy1), split_xy(xy2)) # log.debug('Parsed coordinates %s into %s %s', coordinates, xy1, xy2) return (xy1, xy2) def overview_argparse(args): cur.execute('SELECT x, y, updated_at FROM chunks GROUP BY x, y ORDER BY updated_at DESC') chunks = cur.fetchall() min_x = min(chunk[0] for chunk in chunks) max_x = max(chunk[0] for chunk in chunks) min_y = min(chunk[1] for chunk in chunks) max_y = max(chunk[1] for chunk in chunks) width = max_x - min_x + 1 height = max_y - min_y + 1 x_offset = abs(min(min_x, 0)) y_offset = abs(min(min_y, 0)) matrix = [[' ' for x in range(width)] for y in range(height)] for (x, y, updated_at) in chunks: x += x_offset y += y_offset matrix[y][x] = '.' for (x, y, updated_at) in chunks: if (x % 15 == 0) and (y % 15 == 0): text = f'{x},{y}' x += x_offset y += y_offset for c in text: matrix[y][x] = c x+=1 for row in matrix: for column in row: print(column, end='') print() def render_argparse(args): coordinates = parse_coordinate_string(args.coordinates) if args.is_chunks: chunk_range = coordinates coordinates = chunk_range_to_pixel_range(*coordinates) else: chunk_range = pixel_range_to_chunk_range(*coordinates) chunks = [get_chunk(*chunk_xy) for chunk_xy in chunk_range_iterator(*chunk_range)] scale = float(args.scale) image = chunks_to_image(chunks, scale=scale) if scale == 1 and not args.is_chunks: image = crop_image(image, *coordinates) if args.do_show: image.show() else: ((p1x, p1y), (p2x, p2y)) = coordinates scale_s = f'_{scale}' if scale != 1 else '' filename = f'{p1x}.{p1y}--{p2x}.{p2y}{scale_s}.png' image.save(filename) log.debug('Wrote %s', filename) def update_argparse(args): coordinates = parse_coordinate_string(args.coordinates) if args.is_chunks: bigchunk_range = chunk_range_to_bigchunk_range(*coordinates) else: bigchunk_range = pixel_range_to_bigchunk_range(*coordinates) chunks = download_bigchunk_range(*bigchunk_range, shuffle=args.shuffle, threads=args.threads) try: insert_chunks(chunks) except KeyboardInterrupt: sql.commit() def main(argv): parser = argparse.ArgumentParser() subparsers = parser.add_subparsers() p_update = subparsers.add_parser('update') p_update.add_argument('coordinates') p_update.add_argument('--chunks', dest='is_chunks', action='store_true') p_update.add_argument('--shuffle', dest='shuffle', action='store_true') p_update.add_argument('--threads', dest='threads', type=int, default=1) p_update.set_defaults(func=update_argparse) p_render = subparsers.add_parser('render') p_render.add_argument('coordinates') p_render.add_argument('--chunks', dest='is_chunks', action='store_true') p_render.add_argument('--show', dest='do_show', action='store_true') p_render.add_argument('--scale', dest='scale', default=1) p_render.set_defaults(func=render_argparse) p_overview = subparsers.add_parser('overview') p_overview.set_defaults(func=overview_argparse) return betterhelp.subparser_main( argv, parser, main_docstring=DOCSTRING, sub_docstrings=SUB_DOCSTRINGS, ) if __name__ == '__main__': raise SystemExit(main(sys.argv[1:]))