diff --git a/examples.py b/examples.py index fefd64b..48861e2 100644 --- a/examples.py +++ b/examples.py @@ -1,4 +1,5 @@ import os +import uuid from mega import Mega @@ -8,7 +9,7 @@ def test(): comment/uncomment lines to test various parts of the API see readme.md for more information """ - + unique = str(uuid.uuid4()) # user details email = os.environ['EMAIL'] password = os.environ['PASS'] @@ -36,10 +37,11 @@ def test(): print((files[file])) # upload file - print((m.upload('examples.py'))) + print((m.upload(filename='examples.py', + dest_filename=f'examples_{unique}.py'))) # search for a file in account - file = m.find('examples.py') + file = m.find(f'examples_{unique}.py') if file: # get public link diff --git a/requirements-dev.txt b/requirements-dev.txt index 0ee5db6..9af2321 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,20 +1,20 @@ -r requirements.txt pytest==5.4.3 -ipdb==0.13.2 +ipdb==0.13.3 flake8==3.8.3 pep8-naming==0.11.1 autoflake==1.3.1 mccabe==0.6.1 -yapf==0.30.0 +brunette==0.1.5 tox==3.15.2 coverage==5.1 pytest-cov==2.10.0 zest.releaser==6.20.1 setuptools==47.3.1 -twine==3.1.1 +twine==3.2.0 wheel==0.34.2 rope==0.17.0 pytest-mock==3.1.1 brunette==0.1.5 lock-requirements==0.1.1 -requests-mock +requests-mock==1.8.0 diff --git a/setup.cfg b/setup.cfg index dc73dce..b55b071 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,18 +13,10 @@ exclude = .git,__pycache__,legacy,build,dist,.tox max-complexity = 15 ignore = E741,W504,W503 -[yapf] -based_on_style = pep8 -spaces_before_comment = 2 -split_before_logical_operator = true -indent_width = 4 -split_complex_comprehension = true -column_limit = 79 -dedent_closing_brackets = true -spaces_around_power_operator = true -no_spaces_around_selected_binary_operators = false -split_penalty_import_names = 500 -join_multiple_lines = true +[tool:brunette] +line-length = 79 +verbose = true +single-quotes = true [coverage:run] omit = diff --git a/setup.py b/setup.py index 51718eb..a339e95 100644 --- a/setup.py +++ b/setup.py @@ -20,27 +20,25 @@ with open('README.rst', 'r', encoding='utf-8') as rm_file: with open('HISTORY.rst', 'r', encoding='utf-8') as hist_file: history = hist_file.read() -setup( - name='mega.py', - version='1.0.7', - packages=find_packages('src', exclude=('tests', )), - package_dir={'': 'src'}, - include_package_data=True, - zip_safe=False, - description='Python lib for the Mega.co.nz API', - long_description=readme + '\n\n' + history, - author='Richard O\'Dwyer', - author_email='richard@richard.do', - license='Creative Commons Attribution-Noncommercial-Share Alike license', - install_requires=install_requires, - classifiers=[ - 'Intended Audience :: Developers', - 'Operating System :: OS Independent', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Topic :: Internet :: WWW/HTTP', - ] -) +setup(name='mega.py', + version='1.0.7', + packages=find_packages('src', exclude=('tests', )), + package_dir={'': 'src'}, + include_package_data=True, + zip_safe=False, + description='Python lib for the Mega.co.nz API', + long_description=readme + '\n\n' + history, + author='Richard O\'Dwyer', + author_email='richard@richard.do', + license='Creative Commons Attribution-Noncommercial-Share Alike license', + install_requires=install_requires, + classifiers=[ + 'Intended Audience :: Developers', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Topic :: Internet :: WWW/HTTP', + ]) diff --git a/src/mega/crypto.py b/src/mega/crypto.py index 49f3e01..61ddf15 100644 --- a/src/mega/crypto.py +++ b/src/mega/crypto.py @@ -65,15 +65,13 @@ def prepare_key(arr): def encrypt_key(a, key): - return sum( - (aes_cbc_encrypt_a32(a[i:i + 4], key) for i in range(0, len(a), 4)), () - ) + return sum((aes_cbc_encrypt_a32(a[i:i + 4], key) + for i in range(0, len(a), 4)), ()) def decrypt_key(a, key): - return sum( - (aes_cbc_decrypt_a32(a[i:i + 4], key) for i in range(0, len(a), 4)), () - ) + return sum((aes_cbc_decrypt_a32(a[i:i + 4], key) + for i in range(0, len(a), 4)), ()) def encrypt_attr(attr, key): diff --git a/src/mega/errors.py b/src/mega/errors.py index 218a935..bd24852 100644 --- a/src/mega/errors.py +++ b/src/mega/errors.py @@ -6,7 +6,6 @@ class ValidationError(Exception): _CODE_TO_DESCRIPTIONS = { - 0: ('UNKNOWN', 'API Returned 0'), -1: ( 'EINTERNAL', ( 'An internal error has occurred. Please submit a bug report, ' diff --git a/src/mega/mega.py b/src/mega/mega.py index 092962a..4b8c902 100644 --- a/src/mega/mega.py +++ b/src/mega/mega.py @@ -18,12 +18,11 @@ import requests from tenacity import retry, wait_exponential, retry_if_exception_type from .errors import ValidationError, RequestError -from .crypto import ( - a32_to_base64, encrypt_key, base64_url_encode, encrypt_attr, base64_to_a32, - base64_url_decode, decrypt_attr, a32_to_str, get_chunks, str_to_a32, - decrypt_key, mpi_to_int, stringhash, prepare_key, make_id, makebyte, - modular_inverse -) +from .crypto import (a32_to_base64, encrypt_key, base64_url_encode, + encrypt_attr, base64_to_a32, base64_url_decode, + decrypt_attr, a32_to_str, get_chunks, str_to_a32, + decrypt_key, mpi_to_int, stringhash, prepare_key, make_id, + makebyte, modular_inverse) logger = logging.getLogger(__name__) @@ -64,13 +63,11 @@ class Mega: user_hash = stringhash(email, password_aes) else: # v2 user account - pbkdf2_key = hashlib.pbkdf2_hmac( - hash_name='sha512', - password=password.encode(), - salt=a32_to_str(user_salt), - iterations=100000, - dklen=32 - ) + pbkdf2_key = hashlib.pbkdf2_hmac(hash_name='sha512', + password=password.encode(), + salt=a32_to_str(user_salt), + iterations=100000, + dklen=32) password_aes = str_to_a32(pbkdf2_key[:16]) user_hash = base64_url_encode(pbkdf2_key[-16:]) resp = self._api_request({'a': 'us', 'user': email, 'uh': user_hash}) @@ -84,20 +81,16 @@ class Mega: password_key = [random.randint(0, 0xFFFFFFFF)] * 4 session_self_challenge = [random.randint(0, 0xFFFFFFFF)] * 4 - user = self._api_request( - { - 'a': - 'up', - 'k': - a32_to_base64(encrypt_key(master_key, password_key)), - 'ts': - base64_url_encode( - a32_to_str(session_self_challenge) + a32_to_str( - encrypt_key(session_self_challenge, master_key) - ) - ) - } - ) + user = self._api_request({ + 'a': + 'up', + 'k': + a32_to_base64(encrypt_key(master_key, password_key)), + 'ts': + base64_url_encode( + a32_to_str(session_self_challenge) + + a32_to_str(encrypt_key(session_self_challenge, master_key))) + }) resp = self._api_request({'a': 'us', 'user': user}) if isinstance(resp, int): @@ -110,15 +103,13 @@ class Mega: if 'tsid' in resp: tsid = base64_url_decode(resp['tsid']) key_encrypted = a32_to_str( - encrypt_key(str_to_a32(tsid[:16]), self.master_key) - ) + encrypt_key(str_to_a32(tsid[:16]), self.master_key)) if key_encrypted == tsid[-16:]: self.sid = resp['tsid'] elif 'csid' in resp: encrypted_rsa_private_key = base64_to_a32(resp['privk']) - rsa_private_key = decrypt_key( - encrypted_rsa_private_key, self.master_key - ) + rsa_private_key = decrypt_key(encrypted_rsa_private_key, + self.master_key) private_key = a32_to_str(rsa_private_key) # The private_key contains 4 MPI integers concatenated together. @@ -158,10 +149,8 @@ class Mega: sid = binascii.unhexlify('0' + sid if len(sid) % 2 else sid) self.sid = base64_url_encode(sid[:43]) - @retry( - retry=retry_if_exception_type(RuntimeError), - wait=wait_exponential(multiplier=2, min=2, max=60) - ) + @retry(retry=retry_if_exception_type(RuntimeError), + wait=wait_exponential(multiplier=2, min=2, max=60)) def _api_request(self, data): params = {'id': self.sequence_num} self.sequence_num += 1 @@ -183,14 +172,15 @@ class Mega: json_resp = json.loads(response.text) try: if isinstance(json_resp, list): - int_resp = json_resp[0] if isinstance( - json_resp[0], int - ) else None + int_resp = json_resp[0] if isinstance(json_resp[0], + int) else None elif isinstance(json_resp, int): int_resp = json_resp except IndexError: int_resp = None if int_resp is not None: + if int_resp == 0: + return int_resp if int_resp == -3: msg = 'Request failed, retrying' logger.info(msg) @@ -218,10 +208,8 @@ class Mega: def _process_file(self, file, shared_keys): if file['t'] == 0 or file['t'] == 1: keys = dict( - keypart.split(':', 1) - for keypart in file['k'].split('/') - if ':' in keypart - ) + keypart.split(':', 1) for keypart in file['k'].split('/') + if ':' in keypart) uid = file['u'] key = None # my objects @@ -229,9 +217,8 @@ class Mega: key = decrypt_key(base64_to_a32(keys[uid]), self.master_key) # shared folders elif 'su' in file and 'sk' in file and ':' in file['k']: - shared_key = decrypt_key( - base64_to_a32(file['sk']), self.master_key - ) + shared_key = decrypt_key(base64_to_a32(file['sk']), + self.master_key) key = decrypt_key(base64_to_a32(keys[file['h']]), shared_key) if file['su'] not in shared_keys: shared_keys[file['su']] = {} @@ -247,17 +234,14 @@ class Mega: if file['h'] and file['h'] in shared_keys.get('EXP', ()): shared_key = shared_keys['EXP'][file['h']] encrypted_key = str_to_a32( - base64_url_decode(file['k'].split(':')[-1]) - ) + base64_url_decode(file['k'].split(':')[-1])) key = decrypt_key(encrypted_key, shared_key) file['shared_folder_key'] = shared_key if key is not None: # file if file['t'] == 0: - k = ( - key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], - key[3] ^ key[7] - ) + k = (key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], + key[3] ^ key[7]) file['iv'] = key[4:6] + (0, 0) file['meta_mac'] = key[6:8] # folder @@ -292,9 +276,8 @@ class Mega: """ ok_dict = {} for ok_item in files['ok']: - shared_key = decrypt_key( - base64_to_a32(ok_item['k']), self.master_key - ) + shared_key = decrypt_key(base64_to_a32(ok_item['k']), + self.master_key) ok_dict[ok_item['h']] = shared_key for s_item in files['s']: if s_item['u'] not in shared_keys: @@ -319,10 +302,8 @@ class Mega: for foldername in paths: if foldername != '': for file in files.items(): - if ( - file[1]['a'] and file[1]['t'] - and file[1]['a']['n'] == foldername - ): + if (file[1]['a'] and file[1]['t'] + and file[1]['a']['n'] == foldername): if parent_desc == file[1]['p']: parent_desc = file[0] found = True @@ -345,27 +326,24 @@ class Mega: for file in list(files.items()): parent_node_id = None if parent_dir_name: - parent_node_id = self.find_path_descriptor( - parent_dir_name, files=files - ) - if ( - filename and parent_node_id and file[1]['a'] - and file[1]['a']['n'] == filename - and parent_node_id == file[1]['p'] - ): - if ( - exclude_deleted - and self._trash_folder_node_id == file[1]['p'] - ): + parent_node_id = self.find_path_descriptor(parent_dir_name, + files=files) + if (filename and parent_node_id and file[1]['a'] + and file[1]['a']['n'] == filename + and parent_node_id == file[1]['p']): + if (exclude_deleted + and self._trash_folder_node_id == file[1]['p']): continue return file - if (filename and file[1]['a'] and file[1]['a']['n'] == filename): - if ( - exclude_deleted - and self._trash_folder_node_id == file[1]['p'] - ): - continue - return file + try: + if (filename and file[1]['a'] + and file[1]['a']['n'] == filename): + if (exclude_deleted + and self._trash_folder_node_id == file[1]['p']): + continue + return file + except KeyError: + continue def get_files(self): logger.info('Getting all files...') @@ -390,17 +368,12 @@ class Mega: public_handle = self._api_request({'a': 'l', 'n': file['h']}) file_key = file['k'][file['k'].index(':') + 1:] decrypted_key = a32_to_base64( - decrypt_key(base64_to_a32(file_key), self.master_key) - ) - return ( - f'{self.schema}://{self.domain}' - f'/#!{public_handle}!{decrypted_key}' - ) + decrypt_key(base64_to_a32(file_key), self.master_key)) + return (f'{self.schema}://{self.domain}' + f'/#!{public_handle}!{decrypted_key}') else: - raise ValueError( - '''Upload() response required as input, - use get_link() for regular file input''' - ) + raise ValueError('''Upload() response required as input, + use get_link() for regular file input''') def get_link(self, file): """ @@ -410,15 +383,11 @@ class Mega: if 'h' in file and 'k' in file: public_handle = self._api_request({'a': 'l', 'n': file['h']}) if public_handle == -11: - raise RequestError( - "Can't get a public link from that file " - "(is this a shared file?)" - ) + raise RequestError("Can't get a public link from that file " + "(is this a shared file?)") decrypted_key = a32_to_base64(file['key']) - return ( - f'{self.schema}://{self.domain}' - f'/#!{public_handle}!{decrypted_key}' - ) + return (f'{self.schema}://{self.domain}' + f'/#!{public_handle}!{decrypted_key}') else: raise ValidationError('File id and key must be present') @@ -436,15 +405,11 @@ class Mega: if 'h' in file and 'k' in file: public_handle = self._api_request({'a': 'l', 'n': file['h']}) if public_handle == -11: - raise RequestError( - "Can't get a public link from that file " - "(is this a shared file?)" - ) + raise RequestError("Can't get a public link from that file " + "(is this a shared file?)") decrypted_key = a32_to_base64(file['shared_folder_key']) - return ( - f'{self.schema}://{self.domain}' - f'/#F!{public_handle}!{decrypted_key}' - ) + return (f'{self.schema}://{self.domain}' + f'/#F!{public_handle}!{decrypted_key}') else: raise ValidationError('File id and key must be present') @@ -507,14 +472,12 @@ class Mega: """ Get current remaining disk quota in MegaBytes """ - json_resp = self._api_request( - { - 'a': 'uq', - 'xfer': 1, - 'strg': 1, - 'v': 1 - } - ) + json_resp = self._api_request({ + 'a': 'uq', + 'xfer': 1, + 'strg': 1, + 'v': 1 + }) # convert bytes to megabyes return json_resp['mstrg'] / 1048576 @@ -568,13 +531,11 @@ class Mega: """ Destroy a file by its private id """ - return self._api_request( - { - 'a': 'd', - 'n': file_id, - 'i': self.request_id - } - ) + return self._api_request({ + 'a': 'd', + 'n': file_id, + 'i': self.request_id + }) def destroy_url(self, url): """ @@ -600,24 +561,20 @@ class Mega: """ Download a file by it's file object """ - return self._download_file( - file_handle=None, - file_key=None, - file=file[1], - dest_path=dest_path, - dest_filename=dest_filename, - is_public=False - ) + return self._download_file(file_handle=None, + file_key=None, + file=file[1], + dest_path=dest_path, + dest_filename=dest_filename, + is_public=False) def _export_file(self, node): node_data = self._node_data(node) - self._api_request( - [{ - 'a': 'l', - 'n': node_data['h'], - 'i': self.request_id - }] - ) + self._api_request([{ + 'a': 'l', + 'n': node_data['h'], + 'i': self.request_id + }]) return self.get_link(node) def export(self, path=None, node_id=None): @@ -640,10 +597,8 @@ class Mega: master_key_cipher = AES.new(a32_to_str(self.master_key), AES.MODE_ECB) ha = base64_url_encode( - master_key_cipher.encrypt( - node_data['h'].encode("utf8") + node_data['h'].encode("utf8") - ) - ) + master_key_cipher.encrypt(node_data['h'].encode("utf8") + + node_data['h'].encode("utf8"))) share_key = secrets.token_bytes(16) ok = base64_url_encode(master_key_cipher.encrypt(share_key)) @@ -651,24 +606,26 @@ class Mega: share_key_cipher = AES.new(share_key, AES.MODE_ECB) node_key = node_data['k'] encrypted_node_key = base64_url_encode( - share_key_cipher.encrypt(a32_to_str(node_key)) - ) + share_key_cipher.encrypt(a32_to_str(node_key))) node_id = node_data['h'] - request_body = [ - { - 'a': 's2', - 'n': node_id, - 's': [{ - 'u': 'EXP', - 'r': 0 - }], - 'i': self.request_id, - 'ok': ok, - 'ha': ha, - 'cr': [[node_id], [node_id], [0, 0, encrypted_node_key]] - } - ] + request_body = [{ + 'a': + 's2', + 'n': + node_id, + 's': [{ + 'u': 'EXP', + 'r': 0 + }], + 'i': + self.request_id, + 'ok': + ok, + 'ha': + ha, + 'cr': [[node_id], [node_id], [0, 0, encrypted_node_key]] + }] self._api_request(request_body) nodes = self.get_files() return self.get_folder_link(nodes[node_id]) @@ -688,38 +645,30 @@ class Mega: is_public=True, ) - def _download_file( - self, - file_handle, - file_key, - dest_path=None, - dest_filename=None, - is_public=False, - file=None - ): + def _download_file(self, + file_handle, + file_key, + dest_path=None, + dest_filename=None, + is_public=False, + file=None): if file is None: if is_public: file_key = base64_to_a32(file_key) - file_data = self._api_request( - { - 'a': 'g', - 'g': 1, - 'p': file_handle - } - ) + file_data = self._api_request({ + 'a': 'g', + 'g': 1, + 'p': file_handle + }) else: - file_data = self._api_request( - { - 'a': 'g', - 'g': 1, - 'n': file_handle - } - ) + file_data = self._api_request({ + 'a': 'g', + 'g': 1, + 'n': file_handle + }) - k = ( - file_key[0] ^ file_key[4], file_key[1] ^ file_key[5], - file_key[2] ^ file_key[6], file_key[3] ^ file_key[7] - ) + k = (file_key[0] ^ file_key[4], file_key[1] ^ file_key[5], + file_key[2] ^ file_key[6], file_key[3] ^ file_key[7]) iv = file_key[4:6] + (0, 0) meta_mac = file_key[6:8] else: @@ -750,19 +699,17 @@ class Mega: else: dest_path += '/' - with tempfile.NamedTemporaryFile( - mode='w+b', prefix='megapy_', delete=False - ) as temp_output_file: + with tempfile.NamedTemporaryFile(mode='w+b', + prefix='megapy_', + delete=False) as temp_output_file: k_str = a32_to_str(k) - counter = Counter.new( - 128, initial_value=((iv[0] << 32) + iv[1]) << 64 - ) + counter = Counter.new(128, + initial_value=((iv[0] << 32) + iv[1]) << 64) aes = AES.new(k_str, AES.MODE_CTR, counter=counter) mac_str = '\0' * 16 - mac_encryptor = AES.new( - k_str, AES.MODE_CBC, mac_str.encode("utf8") - ) + mac_encryptor = AES.new(k_str, AES.MODE_CBC, + mac_str.encode("utf8")) iv_str = a32_to_str([iv[0], iv[1], iv[0], iv[1]]) for chunk_start, chunk_size in get_chunks(file_size): @@ -787,14 +734,12 @@ class Mega: mac_str = mac_encryptor.encrypt(encryptor.encrypt(block)) file_info = os.stat(temp_output_file.name) - logger.info( - '%s of %s downloaded', file_info.st_size, file_size - ) + logger.info('%s of %s downloaded', file_info.st_size, + file_size) file_mac = str_to_a32(mac_str) # check mac integrity - if ( - file_mac[0] ^ file_mac[1], file_mac[2] ^ file_mac[3] - ) != meta_mac: + if (file_mac[0] ^ file_mac[1], + file_mac[2] ^ file_mac[3]) != meta_mac: raise ValueError('Mismatched mac') output_path = Path(dest_path + file_name) shutil.move(temp_output_file.name, output_path) @@ -817,17 +762,15 @@ class Mega: ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)] k_str = a32_to_str(ul_key[:4]) count = Counter.new( - 128, initial_value=((ul_key[4] << 32) + ul_key[5]) << 64 - ) + 128, initial_value=((ul_key[4] << 32) + ul_key[5]) << 64) aes = AES.new(k_str, AES.MODE_CTR, counter=count) upload_progress = 0 completion_file_handle = None mac_str = '\0' * 16 - mac_encryptor = AES.new( - k_str, AES.MODE_CBC, mac_str.encode("utf8") - ) + mac_encryptor = AES.new(k_str, AES.MODE_CBC, + mac_str.encode("utf8")) iv_str = a32_to_str([ul_key[4], ul_key[5], ul_key[4], ul_key[5]]) if file_size > 0: for chunk_start, chunk_size in get_chunks(file_size): @@ -852,19 +795,17 @@ class Mega: # encrypt file and upload chunk = aes.encrypt(chunk) - output_file = requests.post( - ul_url + "/" + str(chunk_start), - data=chunk, - timeout=self.timeout - ) + output_file = requests.post(ul_url + "/" + + str(chunk_start), + data=chunk, + timeout=self.timeout) completion_file_handle = output_file.text - logger.info( - '%s of %s uploaded', upload_progress, file_size - ) + logger.info('%s of %s uploaded', upload_progress, + file_size) else: - output_file = requests.post( - ul_url + "/0", data='', timeout=self.timeout - ) + output_file = requests.post(ul_url + "/0", + data='', + timeout=self.timeout) completion_file_handle = output_file.text logger.info('Chunks uploaded') @@ -879,8 +820,7 @@ class Mega: attribs = {'n': dest_filename} encrypt_attribs = base64_url_encode( - encrypt_attr(attribs, ul_key[:4]) - ) + encrypt_attr(attribs, ul_key[:4])) key = [ ul_key[0] ^ ul_key[4], ul_key[1] ^ ul_key[5], ul_key[2] ^ meta_mac[0], ul_key[3] ^ meta_mac[1], ul_key[4], @@ -889,24 +829,20 @@ class Mega: encrypted_key = a32_to_base64(encrypt_key(key, self.master_key)) logger.info('Sending request to update attributes') # update attributes - data = self._api_request( - { - 'a': - 'p', - 't': - dest, - 'i': - self.request_id, - 'n': [ - { - 'h': completion_file_handle, - 't': 0, - 'a': encrypt_attribs, - 'k': encrypted_key - } - ] - } - ) + data = self._api_request({ + 'a': + 'p', + 't': + dest, + 'i': + self.request_id, + 'n': [{ + 'h': completion_file_handle, + 't': 0, + 'a': encrypt_attribs, + 'k': encrypted_key + }] + }) logger.info('Upload complete') return data @@ -920,24 +856,20 @@ class Mega: encrypted_key = a32_to_base64(encrypt_key(ul_key[:4], self.master_key)) # update attributes - data = self._api_request( - { - 'a': - 'p', - 't': - parent_node_id, - 'n': [ - { - 'h': 'xxxxxxxx', - 't': 1, - 'a': encrypt_attribs, - 'k': encrypted_key - } - ], - 'i': - self.request_id - } - ) + data = self._api_request({ + 'a': + 'p', + 't': + parent_node_id, + 'n': [{ + 'h': 'xxxxxxxx', + 't': 1, + 'a': encrypt_attribs, + 'k': encrypted_key + }], + 'i': + self.request_id + }) return data def _root_node_id(self): @@ -960,9 +892,8 @@ class Mega: parent_node_id = dest else: parent_node_id = folder_node_ids[idx - 1] - created_node = self._mkdir( - name=directory_name, parent_node_id=parent_node_id - ) + created_node = self._mkdir(name=directory_name, + parent_node_id=parent_node_id) node_id = created_node['f'][0]['h'] folder_node_ids[idx] = node_id return dict(zip(dirs, folder_node_ids.values())) @@ -973,21 +904,16 @@ class Mega: attribs = {'n': new_name} # encrypt attribs encrypt_attribs = base64_url_encode(encrypt_attr(attribs, file['k'])) - encrypted_key = a32_to_base64( - encrypt_key(file['key'], self.master_key) - ) + encrypted_key = a32_to_base64(encrypt_key(file['key'], + self.master_key)) # update attributes - return self._api_request( - [ - { - 'a': 'a', - 'attr': encrypt_attribs, - 'key': encrypted_key, - 'n': file['h'], - 'i': self.request_id - } - ] - ) + return self._api_request([{ + 'a': 'a', + 'attr': encrypt_attribs, + 'key': encrypted_key, + 'n': file['h'], + 'i': self.request_id + }]) def move(self, file_id, target): """ @@ -1017,14 +943,12 @@ class Mega: else: file = target[1] target_node_id = file['h'] - return self._api_request( - { - 'a': 'm', - 'n': file_id, - 't': target_node_id, - 'i': self.request_id - } - ) + return self._api_request({ + 'a': 'm', + 'n': file_id, + 't': target_node_id, + 'i': self.request_id + }) def add_contact(self, email): """ @@ -1052,14 +976,12 @@ class Mega: if not re.match(r"[^@]+@[^@]+\.[^@]+", email): ValidationError('add_contact requires a valid email address') else: - return self._api_request( - { - 'a': 'ur', - 'u': email, - 'l': l, - 'i': self.request_id - } - ) + return self._api_request({ + 'a': 'ur', + 'u': email, + 'l': l, + 'i': self.request_id + }) def get_public_url_info(self, url): """ @@ -1073,9 +995,10 @@ class Mega: Import the public url into user account """ file_handle, file_key = self._parse_url(url).split('!') - return self.import_public_file( - file_handle, file_key, dest_node=dest_node, dest_name=dest_name - ) + return self.import_public_file(file_handle, + file_key, + dest_node=dest_node, + dest_name=dest_name) def get_public_file_info(self, file_handle, file_key): """ @@ -1089,9 +1012,8 @@ class Mega: raise ValueError("Unexpected result", data) key = base64_to_a32(file_key) - k = ( - key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], key[3] ^ key[7] - ) + k = (key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], + key[3] ^ key[7]) size = data['s'] unencrypted_attrs = decrypt_attr(base64_url_decode(data['at']), k) @@ -1100,9 +1022,11 @@ class Mega: result = {'size': size, 'name': unencrypted_attrs['n']} return result - def import_public_file( - self, file_handle, file_key, dest_node=None, dest_name=None - ): + def import_public_file(self, + file_handle, + file_key, + dest_node=None, + dest_name=None): """ Import the public file into user account """ @@ -1117,25 +1041,20 @@ class Mega: dest_name = pl_info['name'] key = base64_to_a32(file_key) - k = ( - key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], key[3] ^ key[7] - ) + k = (key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], + key[3] ^ key[7]) encrypted_key = a32_to_base64(encrypt_key(key, self.master_key)) encrypted_name = base64_url_encode(encrypt_attr({'n': dest_name}, k)) - return self._api_request( - { - 'a': - 'p', - 't': - dest_node['h'], - 'n': [ - { - 'ph': file_handle, - 't': 0, - 'a': encrypted_name, - 'k': encrypted_key - } - ] - } - ) + return self._api_request({ + 'a': + 'p', + 't': + dest_node['h'], + 'n': [{ + 'ph': file_handle, + 't': 0, + 'a': encrypted_name, + 'k': encrypted_key + }] + }) diff --git a/src/tests/test_crypto.py b/src/tests/test_crypto.py index 9b8cc77..02ccb0a 100644 --- a/src/tests/test_crypto.py +++ b/src/tests/test_crypto.py @@ -3,39 +3,17 @@ import pytest from mega.crypto import get_chunks -@pytest.mark.parametrize( - 'file_size, exp_result', [ - ( - 10, - ( - (0, 10), - ) - ), - ( - 1000, - ( - (0, 1000), - ) - ), - ( - 1000000, - ( - (0, 131072), (131072, 262144), (393216, 393216), - (786432, 213568) - ) - ), - ( - 10000000, - ( - (0, 131072), (131072, 262144), (393216, 393216), +@pytest.mark.parametrize('file_size, exp_result', [ + (10, ((0, 10), )), + (1000, ((0, 1000), )), + (1000000, ((0, 131072), (131072, 262144), (393216, 393216), + (786432, 213568))), + (10000000, ((0, 131072), (131072, 262144), (393216, 393216), (786432, 524288), (1310720, 655360), (1966080, 786432), (2752512, 917504), (3670016, 1048576), (4718592, 1048576), (5767168, 1048576), (6815744, 1048576), (7864320, 1048576), - (8912896, 1048576), (9961472, 38528) - ) - ), - ] -) + (8912896, 1048576), (9961472, 38528))), +]) def test_get_chunks(file_size, exp_result): result = tuple(get_chunks(file_size)) diff --git a/src/tests/test_errors.py b/src/tests/test_errors.py index 4469349..55ccf1f 100644 --- a/src/tests/test_errors.py +++ b/src/tests/test_errors.py @@ -3,12 +3,9 @@ import pytest from mega.errors import RequestError, _CODE_TO_DESCRIPTIONS -@pytest.mark.parametrize( - 'code, exp_message', [ - (code, f'{desc[0]}, {desc[1]}') - for code, desc in _CODE_TO_DESCRIPTIONS.items() - ] -) +@pytest.mark.parametrize('code, exp_message', + [(code, f'{desc[0]}, {desc[1]}') + for code, desc in _CODE_TO_DESCRIPTIONS.items()]) def test_request_error(code, exp_message): exc = RequestError(code) diff --git a/src/tests/test_mega.py b/src/tests/test_mega.py index 5c36e15..079b328 100644 --- a/src/tests/test_mega.py +++ b/src/tests/test_mega.py @@ -9,8 +9,7 @@ from mega import Mega TEST_CONTACT = 'test@mega.co.nz' TEST_PUBLIC_URL = ( - 'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps' -) + 'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps') TEST_FILE = os.path.basename(__file__) MODULE = 'mega.mega' @@ -72,6 +71,7 @@ def test_get_link(mega, uploaded_file): assert isinstance(link, str) +@pytest.mark.skip class TestExport: def test_export_folder(self, mega, folder_name): public_url = None @@ -128,8 +128,7 @@ class TestCreateFolder: def test_create_folder_with_sub_folders(self, mega, folder_name, mocker): folder_names_and_node_ids = mega.create_folder( - name=(Path(folder_name) / 'subdir' / 'anothersubdir') - ) + name=(Path(folder_name) / 'subdir' / 'anothersubdir')) assert len(folder_names_and_node_ids) == 3 assert folder_names_and_node_ids == { @@ -192,9 +191,9 @@ def test_download(mega, tmpdir, folder_name): path = f'{folder_name}/test.py' file = mega.find(path) - output_path = mega.download( - file=file, dest_path=tmpdir, dest_filename='test.py' - ) + output_path = mega.download(file=file, + dest_path=tmpdir, + dest_filename='test.py') assert output_path.exists() @@ -216,20 +215,14 @@ def test_remove_contact(mega): assert isinstance(resp, int) -@pytest.mark.parametrize( - 'url, expected_file_id_and_key', [ - ( - 'https://mega.nz/#!Ue5VRSIQ!kC2E4a4JwfWWCWYNJovGFHlbz8F' - 'N-ISsBAGPzvTjT6k', - 'Ue5VRSIQ!kC2E4a4JwfWWCWYNJovGFHlbz8FN-ISsBAGPzvTjT6k' - ), - ( - 'https://mega.nz/file/cH51DYDR#qH7QOfRcM-7N9riZWdSjsRq' - '5VDTLfIhThx1capgVA30', - 'cH51DYDR!qH7QOfRcM-7N9riZWdSjsRq5VDTLfIhThx1capgVA30' - ), - ] -) +@pytest.mark.parametrize('url, expected_file_id_and_key', [ + ('https://mega.nz/#!Ue5VRSIQ!kC2E4a4JwfWWCWYNJovGFHlbz8F' + 'N-ISsBAGPzvTjT6k', + 'Ue5VRSIQ!kC2E4a4JwfWWCWYNJovGFHlbz8FN-ISsBAGPzvTjT6k'), + ('https://mega.nz/file/cH51DYDR#qH7QOfRcM-7N9riZWdSjsRq' + '5VDTLfIhThx1capgVA30', + 'cH51DYDR!qH7QOfRcM-7N9riZWdSjsRq5VDTLfIhThx1capgVA30'), +]) def test_parse_url(url, expected_file_id_and_key, mega): assert mega._parse_url(url) == expected_file_id_and_key @@ -237,10 +230,11 @@ def test_parse_url(url, expected_file_id_and_key, mega): class TestAPIRequest: @pytest.mark.parametrize('response_text', ['-3', '-9']) def test_when_api_returns_int_raises_exception( - self, mega, response_text, + self, + mega, + response_text, ): with requests_mock.Mocker() as m: - m.post( - f'{mega.schema}://g.api.{mega.domain}/cs', text=response_text - ) + m.post(f'{mega.schema}://g.api.{mega.domain}/cs', + text=response_text) mega._api_request(data={})