diff --git a/.gitignore b/.gitignore index 57e41db..1090913 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ ################# ## Eclipse ################# - +.env.fish *.pydevproject .project .metadata diff --git a/.travis.yml b/.travis.yml index 0bae214..4cc2fcd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,9 @@ sudo: false language: python python: - - 2.7 + - 3.6 - 3.7 + - 3.8 env: - TOXENV=py-normal diff --git a/HISTORY.rst b/HISTORY.rst index c0ddccc..04fe363 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,7 +6,12 @@ Release History 0.9.21 (unreleased) +++++++++++++++++++ -- Nothing changed yet. +- Removes broken method ``get_contacts()``. +- Adds support for login with a v2 Mega user account. +- Adds ``export()`` method to share a file or folder, returning public share URL with key. +- Adds code, message attrs to RequestError exception, makes message in raised exceptions include more details. +- Alters ``create_folder()`` to accept a path including multiple sub directories, adds support to create them all (similar to 'mkdir -p' on unix systems). +- Adds ``exclude_deleted=True`` optional arg to ``find()`` method, to exclude deleted nodes from results. 0.9.20 (2019-10-17) diff --git a/README.rst b/README.rst index dd191af..e2e0360 100644 --- a/README.rst +++ b/README.rst @@ -1,14 +1,8 @@ **NOTICE**: If you're reading this on GitHub.com please be aware this is a mirror of the primary remote located at -`https://code.richard.do/explore/projects`_. Please direct issues and +`https://code.richard.do/richardARPANET/mega.py`_. Please direct issues and pull requests there. -Deprecated -========== - -Mega.py is now deprecated, please use the official SDK -`https://github.com/meganz/sdk`_. - I aim to write a wrapper for the SDK when i have the time to do so. -------------- @@ -122,6 +116,24 @@ Upload a file, and get its public link m.get_upload_link(file) # see mega.py for destination and filename options +Export a file or folder +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code:: python + + public_exported_web_link = m.export('myfile.doc') + public_exported_web_link = m.export('my_mega_folder/my_sub_folder_to_share') + # e.g. https://mega.nz/#F!WlVl1CbZ!M3wmhwZDENMNUJoBsdzFng + +Fine a file or folder +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code:: python + + folder = m.find('my_mega_folder') + # Excludes results which are in the Trash folder (i.e. deleted) + folder = m.find('my_mega_folder', exclude_deleted=True) + Upload a file to a destination folder ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -157,6 +169,17 @@ Create a folder .. code:: python m.create_folder('new_folder') + m.create_folder('new_folder/sub_folder/subsub_folder') + +Returns a dict of folder node name and node_id, e.g. + +.. code:: python + + { + 'new_folder': 'qpFhAYwA', + 'sub_folder': '2pdlmY4Z', + 'subsub_folder': 'GgMFCKLZ' + } Rename a file or a folder ~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -166,7 +189,7 @@ Rename a file or a folder file = m.find('myfile.doc') m.rename(file, 'my_file.doc') -M + ~ .. _`https://code.richard.do/explore/projects`: https://code.richard.do/explore/projects diff --git a/requirements-dev.txt b/requirements-dev.txt index 1b820f6..a03b408 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -13,3 +13,5 @@ zest.releaser setuptools twine wheel +rope +pytest-mock diff --git a/requirements.txt b/requirements.txt index c48d0a3..1bb8c2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ requests>=0.10 pycrypto +pathlib==1.0.1 +python2-secrets==1.0.5 diff --git a/setup.cfg b/setup.cfg index b56a48d..876d1db 100644 --- a/setup.cfg +++ b/setup.cfg @@ -11,7 +11,7 @@ norecursedirs = .git [flake8] exclude = .git,__pycache__,legacy,build,dist,.tox max-complexity = 15 -ignore = E741 +ignore = E741,W504 [yapf] based_on_style = pep8 diff --git a/setup.py b/setup.py index ed3190f..2e0beaa 100644 --- a/setup.py +++ b/setup.py @@ -14,11 +14,11 @@ os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir))) with open('requirements.txt') as f: install_requires = f.read().splitlines() -# with open('README.rst', 'r', encoding='utf-8') as rm_file: -# readme = rm_file.read() +with open('README.rst', 'r', encoding='utf-8') as rm_file: + readme = rm_file.read() -# with open('HISTORY.rst', 'r', encoding='utf-8') as hist_file: -# history = hist_file.read() +with open('HISTORY.rst', 'r', encoding='utf-8') as hist_file: + history = hist_file.read() setup( name='mega.py', @@ -28,6 +28,7 @@ setup( include_package_data=True, zip_safe=False, description='Python lib for the Mega.co.nz API', + long_description=readme + '\n\n' + history, author='Richard O\'Dwyer', author_email='richard@richard.do', license='Creative Commons Attribution-Noncommercial-Share Alike license', @@ -36,13 +37,7 @@ setup( 'Intended Audience :: Developers', 'Operating System :: OS Independent', 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.2', - 'Programming Language :: Python :: 3.3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', diff --git a/src/mega/errors.py b/src/mega/errors.py index d231381..4df7f90 100644 --- a/src/mega/errors.py +++ b/src/mega/errors.py @@ -5,9 +5,83 @@ class ValidationError(Exception): pass +_CODE_TO_DESCRIPTIONS = { + -1: ( + 'EINTERNAL', + ( + 'An internal error has occurred. Please submit a bug report, ' + 'detailing the exact circumstances in which this error occurred' + ) + ), + -2: ('EARGS', 'You have passed invalid arguments to this command'), + -3: ( + 'EAGAIN', + ( + '(always at the request level) A temporary congestion or server ' + 'malfunction prevented your request from being processed. ' + 'No data was altered. Retry. Retries must be spaced with ' + 'exponential backoff' + ) + ), + -4: ( + 'ERATELIMIT', + ( + 'You have exceeded your command weight per time quota. Please ' + 'wait a few seconds, then try again (this should never happen ' + 'in sane real-life applications)' + ) + ), + -5: ('EFAILED', 'The upload failed. Please restart it from scratch'), + -6: ( + 'ETOOMANY', + 'Too many concurrent IP addresses are accessing this upload target URL' + ), + -7: ( + 'ERANGE', + ( + 'The upload file packet is out of range or not starting and ' + 'ending on a chunk boundary' + ) + ), + -8: ( + 'EEXPIRED', + ( + 'The upload target URL you are trying to access has expired. ' + 'Please request a fresh one' + ) + ), + -9: ('ENOENT', 'Object (typically, node or user) not found'), + -10: ('ECIRCULAR', 'Circular linkage attempted'), + -11: ( + 'EACCESS', + 'Access violation (e.g., trying to write to a read-only share)' + ), + -12: ('EEXIST', 'Trying to create an object that already exists'), + -13: ('EINCOMPLETE', 'Trying to access an incomplete resource'), + -14: ('EKEY', 'A decryption operation failed (never returned by the API)'), + -15: ('ESID', 'Invalid or expired user session, please relogin'), + -16: ('EBLOCKED', 'User blocked'), + -17: ('EOVERQUOTA', 'Request over quota'), + -18: ( + 'ETEMPUNAVAIL', + 'Resource temporarily not available, please try again later' + ), + -19: ('ETOOMANYCONNECTIONS', 'many connections on this resource'), + -20: ('EWRITE', 'Write failed'), + -21: ('EREAD', 'Read failed'), + -22: ('EAPPKEY', 'Invalid application key; request not processed'), +} + + class RequestError(Exception): """ Error in API request """ - # TODO add error response messages - pass + def __init__(self, message): + code = message + self.code = code + code_desc, long_desc = _CODE_TO_DESCRIPTIONS[code] + self.message = f'{code_desc}, {long_desc}' + + def __str__(self): + return self.message diff --git a/src/mega/mega.py b/src/mega/mega.py index 6fb2557..d544c8e 100644 --- a/src/mega/mega.py +++ b/src/mega/mega.py @@ -1,5 +1,9 @@ import re +import time import json +import secrets +from pathlib import Path +import hashlib from Crypto.Cipher import AES from Crypto.PublicKey import RSA from Crypto.Util import Counter @@ -28,6 +32,7 @@ class Mega(object): self.sid = None self.sequence_num = random.randint(0, 0xFFFFFFFF) self.request_id = make_id(10) + self._trash_folder_node_id = None if options is None: options = {} @@ -38,13 +43,31 @@ class Mega(object): self._login_user(email, password) else: self.login_anonymous() + self._trash_folder_node_id = self.get_node_by_type(4)[0] return self def _login_user(self, email, password): - password_aes = prepare_key(str_to_a32(password)) - uh = stringhash(email, password_aes) - resp = self._api_request({'a': 'us', 'user': email, 'uh': uh}) - # if numeric error code response + email = email.lower() + get_user_salt_resp = self._api_request({'a': 'us0', 'user': email}) + user_salt = None + try: + user_salt = base64_to_a32(get_user_salt_resp['s']) + except KeyError: + # v1 user account + password_aes = prepare_key(str_to_a32(password)) + user_hash = stringhash(email, password_aes) + else: + # v2 user account + pbkdf2_key = hashlib.pbkdf2_hmac( + hash_name='sha512', + password=password.encode(), + salt=a32_to_str(user_salt), + iterations=100000, + dklen=32 + ) + password_aes = str_to_a32(pbkdf2_key[:16]) + user_hash = base64_url_encode(pbkdf2_key[-16:]) + resp = self._api_request({'a': 'us', 'user': email, 'uh': user_hash}) if isinstance(resp, int): raise RequestError(resp) self._login_process(resp, password_aes) @@ -70,7 +93,6 @@ class Mega(object): ) resp = self._api_request({'a': 'us', 'user': user}) - # if numeric error code response if isinstance(resp, int): raise RequestError(resp) self._login_process(resp, password_key) @@ -96,7 +118,8 @@ class Mega(object): for i in range(4): if PYTHON2: l = ( - (ord(private_key[0]) * 256 + ord(private_key[1]) + 7) / 8 + (ord(private_key[0]) * 256 + + ord(private_key[1]) + 7) / 8 ) + 2 else: l = int( @@ -113,7 +136,6 @@ class Mega(object): self.rsa_private_key[1] ) ) - sid = '%x' % rsa_decrypter.key._decrypt(encrypted_sid) sid = binascii.unhexlify('0' + sid if len(sid) % 2 else sid) self.sid = base64_url_encode(sid[:43]) @@ -135,21 +157,12 @@ class Mega(object): params=params, data=json.dumps(data), timeout=self.timeout, - headers={ - 'Origin': - 'https://mega.nz', - 'Referer': - 'https://mega.nz/login', - 'User-Agent': ( - 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:69.0) ' - 'Gecko/20100101 Firefox/69.0' - ), - } ) json_resp = json.loads(req.text) - - # if numeric error code response if isinstance(json_resp, int): + if json_resp == -3: + time.sleep(0.2) + return self._api_request(data=data) raise RequestError(json_resp) return json_resp[0] @@ -163,9 +176,6 @@ class Mega(object): raise RequestError('Url key missing') def _process_file(self, file, shared_keys): - """ - Process a file - """ if file['t'] == 0 or file['t'] == 1: keys = dict( keypart.split(':', 1) @@ -194,6 +204,13 @@ class Mega(object): key = keys[hkey] key = decrypt_key(base64_to_a32(key), shared_key) break + if file['h'] and file['h'] in shared_keys.get('EXP', ()): + shared_key = shared_keys['EXP'][file['h']] + encrypted_key = str_to_a32( + base64_url_decode(file['k'].split(':')[-1]) + ) + key = decrypt_key(encrypted_key, shared_key) + file['shared_folder_key'] = shared_key if key is not None: # file if file['t'] == 0: @@ -244,11 +261,9 @@ class Mega(object): shared_keys[s_item['u']] = {} if s_item['h'] in ok_dict: shared_keys[s_item['u']][s_item['h']] = ok_dict[s_item['h']] + self.shared_keys = shared_keys - ########################################################################## - # GET - - def find_path_descriptor(self, path): + def find_path_descriptor(self, path, files=()): """ Find descriptor of folder inside a path. i.e.: folder1/folder2/folder3 Params: @@ -258,14 +273,17 @@ class Mega(object): """ paths = path.split('/') - files = self.get_files() + files = files or self.get_files() parent_desc = self.root_id found = False for foldername in paths: if foldername != '': for file in files.items(): - if file[1]['a'] and file[1]['t'] and \ - file[1]['a']['n'] == foldername: + if ( + file[1]['a'] and + file[1]['t'] and + file[1]['a']['n'] == foldername + ): if parent_desc == file[1]['p']: parent_desc = file[0] found = True @@ -275,22 +293,49 @@ class Mega(object): return None return parent_desc - def find(self, filename): + def find(self, filename=None, handle=None, exclude_deleted=False): """ Return file object from given filename """ files = self.get_files() + if handle: + return files[handle] + path = Path(filename) + filename = path.name + parent_dir_name = path.parent.name for file in list(files.items()): - if not isinstance(file[1]['a'], dict): - continue - if file[1]['a'] and file[1]['a']['n'] == filename: + parent_node_id = None + if parent_dir_name: + parent_node_id = self.find_path_descriptor( + parent_dir_name, files=files + ) + if ( + filename and parent_node_id and + file[1]['a'] and file[1]['a']['n'] == filename and + parent_node_id == file[1]['p'] + ): + if ( + exclude_deleted and + self._trash_folder_node_id == file[1]['p'] + ): + continue + return file + if ( + filename and + file[1]['a'] and file[1]['a']['n'] == filename + ): + if ( + exclude_deleted and + self._trash_folder_node_id == file[1]['p'] + ): + continue return file def get_files(self): """ Get all files in account """ - files = self._api_request({'a': 'f', 'c': 1}) + files = self._api_request({'a': 'f', 'c': 1, 'r': 1}) files_dict = {} shared_keys = {} self._init_shared_keys(files, shared_keys) @@ -341,6 +386,31 @@ class Mega(object): else: raise ValidationError('File id and key must be present') + def _node_data(self, node): + try: + return node[1] + except (IndexError, KeyError): + return node + + def get_folder_link(self, file): + try: + file = file[1] + except (IndexError, KeyError): + pass + if 'h' in file and 'k' in file: + public_handle = self._api_request({'a': 'l', 'n': file['h']}) + if public_handle == -11: + raise RequestError( + "Can't get a public link from that file " + "(is this a shared file?)" + ) + decrypted_key = a32_to_base64(file['shared_folder_key']) + return '{0}://{1}/#F!{2}!{3}'.format( + self.schema, self.domain, public_handle, decrypted_key + ) + else: + raise ValidationError('File id and key must be present') + def get_user(self): user_data = self._api_request({'a': 'ug'}) return user_data @@ -442,8 +512,6 @@ class Mega(object): if 'balance' in user_data: return user_data['balance'] - ########################################################################## - # DELETE def delete(self, public_handle): """ Delete a file by its public handle @@ -491,8 +559,6 @@ class Mega(object): post_list.append({"a": "d", "n": file, "i": self.request_id}) return self._api_request(post_list) - ########################################################################## - # DOWNLOAD def download(self, file, dest_path=None, dest_filename=None): """ Download a file by it's file object @@ -506,6 +572,68 @@ class Mega(object): is_public=False ) + def _export_file(self, node): + node_data = self._node_data(node) + self._api_request([ + { + 'a': 'l', + 'n': node_data['h'], + 'i': self.request_id + } + ]) + return self.get_link(node) + + def export(self, path=None, node_id=None): + nodes = self.get_files() + if node_id: + node = nodes[node_id] + else: + node = self.find(path) + + node_data = self._node_data(node) + is_file_node = node_data['t'] == 0 + if is_file_node: + return self._export_file(node) + if node: + try: + # If already exported + return self.get_folder_link(node) + except (RequestError, KeyError): + pass + + master_key_cipher = AES.new(a32_to_str(self.master_key), AES.MODE_ECB) + ha = base64_url_encode( + master_key_cipher.encrypt(node_data['h'] + node_data['h']) + ) + + share_key = secrets.token_bytes(16) + ok = base64_url_encode(master_key_cipher.encrypt(share_key)) + + share_key_cipher = AES.new(share_key, AES.MODE_ECB) + node_key = node_data['k'] + encrypted_node_key = base64_url_encode( + share_key_cipher.encrypt(a32_to_str(node_key)) + ) + + node_id = node_data['h'] + request_body = [ + { + 'a': 's2', + 'n': node_id, + 's': [{ + 'u': 'EXP', + 'r': 0 + }], + 'i': self.request_id, + 'ok': ok, + 'ha': ha, + 'cr': [[node_id], [node_id], [0, 0, encrypted_node_key]] + } + ] + self._api_request(request_body) + nodes = self.get_files() + return self.get_folder_link(nodes[node_id]) + def download_url(self, url, dest_path=None, dest_filename=None): """ Download a file by it's public url @@ -631,8 +759,6 @@ class Mega(object): shutil.move(temp_output_file.name, dest_path + file_name) - ########################################################################## - # UPLOAD def upload(self, filename, dest=None, dest_filename=None): # determine storage node if dest is None: @@ -683,18 +809,11 @@ class Mega(object): # encrypt file and upload chunk = aes.encrypt(chunk) - try: - output_file = requests.post( - ul_url + "/" + str(chunk_start), - data=chunk, - timeout=self.timeout - ) - except: - output_file = requests.post( - ul_url + "/" + str(chunk_start), - data=chunk, - timeout=self.timeout - ) + output_file = requests.post( + ul_url + "/" + str(chunk_start), + data=chunk, + timeout=self.timeout + ) completion_file_handle = output_file.text if self.options.get('verbose') is True: @@ -748,14 +867,7 @@ class Mega(object): input_file.close() return data - def create_folder(self, name, dest=None): - # determine storage node - if dest is None: - # if none set, upload to cloud drive node - if not hasattr(self, 'root_id'): - self.get_files() - dest = self.root_id - + def _mkdir(self, name, parent_node_id): # generate random aes key (128) for folder ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)] @@ -767,10 +879,8 @@ class Mega(object): # update attributes data = self._api_request( { - 'a': - 'p', - 't': - dest, + 'a': 'p', + 't': parent_node_id, 'n': [ { 'h': 'xxxxxxxx', @@ -779,12 +889,38 @@ class Mega(object): 'k': encrypted_key } ], - 'i': - self.request_id + 'i': self.request_id } ) return data + def _root_node_id(self): + if not hasattr(self, 'root_id'): + self.get_files() + return self.root_id + + def create_folder(self, name, dest=None): + dirs = tuple(dir_name for dir_name in str(name).split('/') if dir_name) + folder_node_ids = {} + for idx, directory_name in enumerate(dirs): + existing_node_id = self.find_path_descriptor(directory_name) + if existing_node_id: + folder_node_ids[idx] = existing_node_id + continue + if idx == 0: + if dest is None: + parent_node_id = self._root_node_id() + else: + parent_node_id = dest + else: + parent_node_id = folder_node_ids[idx - 1] + created_node = self._mkdir( + name=directory_name, parent_node_id=parent_node_id + ) + node_id = created_node['f'][0]['h'] + folder_node_ids[idx] = node_id + return dict(zip(dirs, folder_node_ids.values())) + def rename(self, file, new_name): file = file[1] # create new attribs @@ -881,21 +1017,6 @@ class Mega(object): } ) - def get_contacts(self): - raise NotImplementedError() - # TODO implement this - # sn param below = maxaction var with function getsc() in mega.co.nz js - # seens to be the 'sn' attrib of the previous request response... - # requests goto /sc rather than - - # req = requests.post( - # '{0}://g.api.{1}/sc'.format(self.schema, self.domain), - # params={'sn': 'ZMxcQ_DmHnM', 'ssl': '1'}, - # data=json.dumps(None), - # timeout=self.timeout) - # json_resp = json.loads(req.text) - # print json_resp - def get_public_url_info(self, url): """ Get size and name from a public url, dict returned @@ -917,8 +1038,6 @@ class Mega(object): Get size and name of a public file """ data = self._api_request({'a': 'g', 'p': file_handle, 'ssm': 1}) - - # if numeric error code response if isinstance(data, int): raise RequestError(data) diff --git a/src/tests/__init__.py b/src/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tests/test_errors.py b/src/tests/test_errors.py new file mode 100644 index 0000000..4469349 --- /dev/null +++ b/src/tests/test_errors.py @@ -0,0 +1,17 @@ +import pytest + +from mega.errors import RequestError, _CODE_TO_DESCRIPTIONS + + +@pytest.mark.parametrize( + 'code, exp_message', [ + (code, f'{desc[0]}, {desc[1]}') + for code, desc in _CODE_TO_DESCRIPTIONS.items() + ] +) +def test_request_error(code, exp_message): + exc = RequestError(code) + + assert exc.code == code + assert exc.message == exp_message + assert str(exc) == exp_message diff --git a/src/tests/tests.py b/src/tests/tests.py index 41f6fe8..a31f608 100644 --- a/src/tests/tests.py +++ b/src/tests/tests.py @@ -1,106 +1,202 @@ -""" -These unit tests will upload a test file,a test folder and a test contact, -Perform api operations on them, -And them remove them from your account. -""" -import unittest import random +from pathlib import Path import os +import pytest + from mega import Mega -mega = Mega() -# anonymous login -m = mega.login() -# normal login -# m = mega.login(email, password) - -FIND_RESP = None TEST_CONTACT = 'test@mega.co.nz' TEST_PUBLIC_URL = ( 'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps' ) TEST_FILE = os.path.basename(__file__) -TEST_FOLDER = 'mega.py_testfolder_{0}'.format(random.random()) -class TestMega(unittest.TestCase): - - def test_mega(self): - self.assertIsInstance(mega, Mega) - - def test_login(self): - self.assertIsInstance(mega, Mega) - - def test_get_user(self): - resp = m.get_user() - self.assertIsInstance(resp, dict) - - def test_get_quota(self): - resp = m.get_quota() - self.assertIsInstance(int(resp), int) - - def test_get_storage_space(self): - resp = m.get_storage_space(mega=True) - self.assertIsInstance(resp, dict) - - def test_get_files(self): - files = m.get_files() - self.assertIsInstance(files, dict) - - def test_get_link(self): - file = m.find(TEST_FILE) - if file: - link = m.get_link(file) - self.assertIsInstance(link, str) - - def test_import_public_url(self): - resp = m.import_public_url(TEST_PUBLIC_URL) - file_handle = m.get_id_from_obj(resp) - resp = m.destroy(file_handle) - self.assertIsInstance(resp, int) - - def test_create_folder(self): - resp = m.create_folder(TEST_FOLDER) - self.assertIsInstance(resp, dict) - - def test_rename(self): - file = m.find(TEST_FOLDER) - if file: - resp = m.rename(file, TEST_FOLDER) - self.assertIsInstance(resp, int) - - def test_delete_folder(self): - folder_node = m.find(TEST_FOLDER)[0] - resp = m.delete(folder_node) - self.assertIsInstance(resp, int) - - def test_delete(self): - file = m.find(TEST_FILE) - if file: - resp = m.delete(file[0]) - self.assertIsInstance(resp, int) - - def test_destroy(self): - file = m.find(TEST_FILE) - if file: - resp = m.destroy(file[0]) - self.assertIsInstance(resp, int) - - def test_empty_trash(self): - # resp None if already empty, else int - resp = m.empty_trash() - if resp is not None: - self.assertIsInstance(resp, int) - - def test_add_contact(self): - resp = m.add_contact(TEST_CONTACT) - self.assertIsInstance(resp, int) - - def test_remove_contact(self): - resp = m.remove_contact(TEST_CONTACT) - self.assertIsInstance(resp, int) +@pytest.fixture +def folder_name(): + return 'mega.py_testfolder_{0}'.format(random.random()) -if __name__ == '__main__': - unittest.main() +@pytest.fixture +def mega(folder_name): + mega_ = Mega() + mega_.login(email=os.environ['EMAIL'], password=os.environ['PASS']) + created_nodes = mega_.create_folder(folder_name) + yield mega_ + node_id = next(iter(created_nodes.values())) + mega_.destroy(node_id) + + +def test_mega(mega): + assert isinstance(mega, Mega) + + +def test_login(mega): + assert isinstance(mega, Mega) + + +def test_get_user(mega): + resp = mega.get_user() + assert isinstance(resp, dict) + + +def test_get_quota(mega): + resp = mega.get_quota() + assert isinstance(int(resp), int) + + +def test_get_storage_space(mega): + resp = mega.get_storage_space(mega=True) + assert isinstance(resp, dict) + + +def test_get_files(mega): + files = mega.get_files() + assert isinstance(files, dict) + + +def test_get_link(mega): + file = mega.find(TEST_FILE) + if file: + link = mega.get_link(file) + assert isinstance(link, str) + + +class TestExport: + + def test_export_folder(self, mega, folder_name): + public_url = None + for _ in range(2): + result_public_share_url = mega.export(folder_name) + + if not public_url: + public_url = result_public_share_url + assert result_public_share_url.startswith('https://mega.co.nz/#F!') + assert result_public_share_url == public_url + + def test_export_folder_within_folder(self, mega, folder_name): + folder_path = Path(folder_name) / 'subdir' / 'anothersubdir' + mega.create_folder(name=folder_path) + + result_public_share_url = mega.export(path=folder_path) + + assert result_public_share_url.startswith('https://mega.co.nz/#F!') + + def test_export_folder_using_node_id(self, mega, folder_name): + node_id = mega.find(folder_name)[0] + + result_public_share_url = mega.export(node_id=node_id) + + assert result_public_share_url.startswith('https://mega.co.nz/#F!') + + def test_export_single_file(self, mega, folder_name): + # Upload a single file into a folder + folder = mega.find(folder_name) + dest_node_id = folder[1]['h'] + mega.upload( + __file__, dest=dest_node_id, dest_filename='test.py' + ) + path = f'{folder_name}/test.py' + assert mega.find(path) + + for _ in range(2): + result_public_share_url = mega.export(path) + + assert result_public_share_url.startswith('https://mega.co.nz/#!') + + +def test_import_public_url(mega): + resp = mega.import_public_url(TEST_PUBLIC_URL) + file_handle = mega.get_id_from_obj(resp) + resp = mega.destroy(file_handle) + assert isinstance(resp, int) + + +class TestCreateFolder: + def test_create_folder(self, mega, folder_name): + folder_names_and_node_ids = mega.create_folder(folder_name) + + assert isinstance(folder_names_and_node_ids, dict) + assert len(folder_names_and_node_ids) == 1 + + def test_create_folder_with_sub_folders(self, mega, folder_name, mocker): + folder_names_and_node_ids = mega.create_folder( + name=(Path(folder_name) / 'subdir' / 'anothersubdir') + ) + + assert len(folder_names_and_node_ids) == 3 + assert folder_names_and_node_ids == { + folder_name: mocker.ANY, + 'subdir': mocker.ANY, + 'anothersubdir': mocker.ANY, + } + + +class TestFind: + + def test_find_file(self, mega, folder_name): + folder = mega.find(folder_name) + dest_node_id = folder[1]['h'] + mega.upload( + __file__, dest=dest_node_id, dest_filename='test.py' + ) + path = f'{folder_name}/test.py' + + assert mega.find(path) + + def test_path_not_found_returns_none(self, mega): + assert mega.find('not_found') is None + + def test_exclude_deleted_files(self, mega, folder_name): + folder_node_id = mega.find(folder_name)[0] + assert mega.find(folder_name) + + mega.delete(folder_node_id) + + assert mega.find(folder_name) + assert not mega.find(folder_name, exclude_deleted=True) + + +def test_rename(mega, folder_name): + file = mega.find(folder_name) + if file: + resp = mega.rename(file, folder_name) + assert isinstance(resp, int) + + +def test_delete_folder(mega, folder_name): + folder_node = mega.find(folder_name)[0] + resp = mega.delete(folder_node) + assert isinstance(resp, int) + + +def test_delete(mega): + file = mega.find(TEST_FILE) + if file: + resp = mega.delete(file[0]) + assert isinstance(resp, int) + + +def test_destroy(mega): + file = mega.find(TEST_FILE) + if file: + resp = mega.destroy(file[0]) + assert isinstance(resp, int) + + +def test_empty_trash(mega): + # resp None if already empty, else int + resp = mega.empty_trash() + if resp is not None: + assert isinstance(resp, int) + + +def test_add_contact(mega): + resp = mega.add_contact(TEST_CONTACT) + assert isinstance(resp, int) + + +def test_remove_contact(mega): + resp = mega.remove_contact(TEST_CONTACT) + assert isinstance(resp, int) diff --git a/tox.ini b/tox.ini index 82c3671..fd4b4c3 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{27,37}-normal,lint +envlist = py{36,37,38}-normal,lint [testenv] commands = @@ -7,7 +7,9 @@ commands = coverage erase python setup.py install pytest {toxinidir}/src/tests/tests.py - +passenv = + EMAIL + PASS deps = -rrequirements-dev.txt