diff --git a/.gitignore b/.gitignore index 57e41db..1090913 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ ################# ## Eclipse ################# - +.env.fish *.pydevproject .project .metadata diff --git a/HISTORY.rst b/HISTORY.rst index c0ddccc..0bc68d9 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,7 +6,9 @@ Release History 0.9.21 (unreleased) +++++++++++++++++++ -- Nothing changed yet. +- Removes broken method ``get_contacts()``. +- Adds support for login with a v2 Mega user account. +- Adds ``export()`` method to share a file or folder, returning public share URL with key. 0.9.20 (2019-10-17) diff --git a/requirements-dev.txt b/requirements-dev.txt index 1b820f6..547f6ca 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -13,3 +13,4 @@ zest.releaser setuptools twine wheel +rope diff --git a/src/mega/example_req_to_export_folder.json b/src/mega/example_req_to_export_folder.json new file mode 100644 index 0000000..59572db --- /dev/null +++ b/src/mega/example_req_to_export_folder.json @@ -0,0 +1,19 @@ +[{ + "a": "log", + "e": 99635 +}, { + "a": "s2", + "n": "oUchHSzJ", + "s": [{ + "u": "EXP", + "r": 0 + }], + "i": "q6CpLxTDNV", + "ok": "uGuwXS80VfifU1hcLlKcrQ", + "ha": "n6dI3_qVste9XPma5fMIKQ", + "cr": [ + ["oUchHSzJ"], + ["oUchHSzJ"], + [0, 0, "ff5m7sr6LZlYyRtiEE9EZA"] + ] +}] diff --git a/src/mega/mega.py b/src/mega/mega.py index 6fb2557..3569dc2 100644 --- a/src/mega/mega.py +++ b/src/mega/mega.py @@ -1,5 +1,7 @@ import re import json +import secrets +import hashlib from Crypto.Cipher import AES from Crypto.PublicKey import RSA from Crypto.Util import Counter @@ -41,10 +43,27 @@ class Mega(object): return self def _login_user(self, email, password): - password_aes = prepare_key(str_to_a32(password)) - uh = stringhash(email, password_aes) - resp = self._api_request({'a': 'us', 'user': email, 'uh': uh}) - # if numeric error code response + email = email.lower() + get_user_salt_resp = self._api_request({'a': 'us0', 'user': email}) + user_salt = None + try: + user_salt = base64_to_a32(get_user_salt_resp['s']) + except KeyError: + # v1 user account + password_aes = prepare_key(str_to_a32(password)) + user_hash = stringhash(email, password_aes) + else: + # v2 user account + pbkdf2_key = hashlib.pbkdf2_hmac( + hash_name='sha512', + password=password.encode(), + salt=a32_to_str(user_salt), + iterations=100000, + dklen=32 + ) + password_aes = str_to_a32(pbkdf2_key[:16]) + user_hash = base64_url_encode(pbkdf2_key[-16:]) + resp = self._api_request({'a': 'us', 'user': email, 'uh': user_hash}) if isinstance(resp, int): raise RequestError(resp) self._login_process(resp, password_aes) @@ -70,7 +89,6 @@ class Mega(object): ) resp = self._api_request({'a': 'us', 'user': user}) - # if numeric error code response if isinstance(resp, int): raise RequestError(resp) self._login_process(resp, password_key) @@ -113,7 +131,6 @@ class Mega(object): self.rsa_private_key[1] ) ) - sid = '%x' % rsa_decrypter.key._decrypt(encrypted_sid) sid = binascii.unhexlify('0' + sid if len(sid) % 2 else sid) self.sid = base64_url_encode(sid[:43]) @@ -135,20 +152,8 @@ class Mega(object): params=params, data=json.dumps(data), timeout=self.timeout, - headers={ - 'Origin': - 'https://mega.nz', - 'Referer': - 'https://mega.nz/login', - 'User-Agent': ( - 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:69.0) ' - 'Gecko/20100101 Firefox/69.0' - ), - } ) json_resp = json.loads(req.text) - - # if numeric error code response if isinstance(json_resp, int): raise RequestError(json_resp) return json_resp[0] @@ -163,9 +168,6 @@ class Mega(object): raise RequestError('Url key missing') def _process_file(self, file, shared_keys): - """ - Process a file - """ if file['t'] == 0 or file['t'] == 1: keys = dict( keypart.split(':', 1) @@ -194,6 +196,13 @@ class Mega(object): key = keys[hkey] key = decrypt_key(base64_to_a32(key), shared_key) break + if file['h'] and file['h'] in shared_keys.get('EXP', ()): + shared_key = shared_keys['EXP'][file['h']] + encrypted_key = str_to_a32( + base64_url_decode(file['k'].split(':')[-1]) + ) + key = decrypt_key(encrypted_key, shared_key) + file['shared_folder_key'] = shared_key if key is not None: # file if file['t'] == 0: @@ -244,9 +253,7 @@ class Mega(object): shared_keys[s_item['u']] = {} if s_item['h'] in ok_dict: shared_keys[s_item['u']][s_item['h']] = ok_dict[s_item['h']] - - ########################################################################## - # GET + self.shared_keys = shared_keys def find_path_descriptor(self, path): """ @@ -264,8 +271,11 @@ class Mega(object): for foldername in paths: if foldername != '': for file in files.items(): - if file[1]['a'] and file[1]['t'] and \ - file[1]['a']['n'] == foldername: + if ( + file[1]['a'] and + file[1]['t'] and + file[1]['a']['n'] == foldername + ): if parent_desc == file[1]['p']: parent_desc = file[0] found = True @@ -275,22 +285,40 @@ class Mega(object): return None return parent_desc - def find(self, filename): + def find(self, filename=None, handle=None): """ Return file object from given filename """ + from pathlib import Path + path = Path(filename) + filename = path.name files = self.get_files() + parent_dir_name = path.parent.name for file in list(files.items()): - if not isinstance(file[1]['a'], dict): - continue - if file[1]['a'] and file[1]['a']['n'] == filename: + parent_node_id = None + if parent_dir_name: + parent_node_id = self.find_path_descriptor(parent_dir_name) + if ( + filename and parent_node_id and + file[1]['a'] and file[1]['a']['n'] == filename + and parent_node_id == file[1]['p'] + ): + return file + # if not isinstance(file[1]['a'], dict): + # continue + if ( + filename and + file[1]['a'] and file[1]['a']['n'] == filename + ): + return file + if handle and file[1]['h'] == handle: return file def get_files(self): """ Get all files in account """ - files = self._api_request({'a': 'f', 'c': 1}) + files = self._api_request({'a': 'f', 'c': 1, 'r': 1}) files_dict = {} shared_keys = {} self._init_shared_keys(files, shared_keys) @@ -341,6 +369,31 @@ class Mega(object): else: raise ValidationError('File id and key must be present') + def _node_data(self, node): + try: + return node[1] + except (IndexError, KeyError): + return node + + def get_folder_link(self, file): + try: + file = file[1] + except (IndexError, KeyError): + pass + if 'h' in file and 'k' in file: + public_handle = self._api_request({'a': 'l', 'n': file['h']}) + if public_handle == -11: + raise RequestError( + "Can't get a public link from that file " + "(is this a shared file?)" + ) + decrypted_key = a32_to_base64(file['shared_folder_key']) + return '{0}://{1}/#F!{2}!{3}'.format( + self.schema, self.domain, public_handle, decrypted_key + ) + else: + raise ValidationError('File id and key must be present') + def get_user(self): user_data = self._api_request({'a': 'ug'}) return user_data @@ -442,8 +495,6 @@ class Mega(object): if 'balance' in user_data: return user_data['balance'] - ########################################################################## - # DELETE def delete(self, public_handle): """ Delete a file by its public handle @@ -491,8 +542,6 @@ class Mega(object): post_list.append({"a": "d", "n": file, "i": self.request_id}) return self._api_request(post_list) - ########################################################################## - # DOWNLOAD def download(self, file, dest_path=None, dest_filename=None): """ Download a file by it's file object @@ -506,6 +555,68 @@ class Mega(object): is_public=False ) + def _export_file(self, node): + self._api_request([ + { + 'a': 'l', + 'n': node[1]['h'], + 'i': self.request_id + } + ]) + return self.get_link(node) + + def export(self, path): + self.get_files() + folder = self.find(path) + if folder[1]['t'] == 0: + return self._export_file(folder) + if folder: + try: + # If already exported + return self.get_folder_link(folder) + except (RequestError, KeyError): + pass + + user_id = folder[1]['u'] + user_pub_key = self._api_request({'a': 'uk', 'u': user_id})['pubk'] + node_key = folder[1]['k'] + + master_key_cipher = AES.new(a32_to_str(self.master_key), AES.MODE_ECB) + ha = base64_url_encode( + master_key_cipher.encrypt(folder[1]['h'] + folder[1]['h']) + ) + + share_key = secrets.token_bytes(16) + ok = base64_url_encode(master_key_cipher.encrypt(share_key)) + + share_key_cipher = AES.new(share_key, AES.MODE_ECB) + encrypted_node_key = base64_url_encode( + share_key_cipher.encrypt(a32_to_str(node_key)) + ) + + node_id = folder[1]['h'] + request_body = [ + { + 'a': 's2', + 'n': node_id, + 's': [{ + 'u': 'EXP', + 'r': 0 + }], + 'i': self.request_id, + 'ok': ok, + 'ha': ha, + 'cr': [ + [node_id], + [node_id], + [0, 0, encrypted_node_key] + ] + }] + self._api_request(request_body) + nodes = self.get_files() + link = self.get_folder_link(nodes[node_id]) + return link + def download_url(self, url, dest_path=None, dest_filename=None): """ Download a file by it's public url @@ -631,8 +742,6 @@ class Mega(object): shutil.move(temp_output_file.name, dest_path + file_name) - ########################################################################## - # UPLOAD def upload(self, filename, dest=None, dest_filename=None): # determine storage node if dest is None: @@ -767,10 +876,8 @@ class Mega(object): # update attributes data = self._api_request( { - 'a': - 'p', - 't': - dest, + 'a':'p', + 't': dest, 'n': [ { 'h': 'xxxxxxxx', @@ -779,8 +886,7 @@ class Mega(object): 'k': encrypted_key } ], - 'i': - self.request_id + 'i': self.request_id } ) return data @@ -881,21 +987,6 @@ class Mega(object): } ) - def get_contacts(self): - raise NotImplementedError() - # TODO implement this - # sn param below = maxaction var with function getsc() in mega.co.nz js - # seens to be the 'sn' attrib of the previous request response... - # requests goto /sc rather than - - # req = requests.post( - # '{0}://g.api.{1}/sc'.format(self.schema, self.domain), - # params={'sn': 'ZMxcQ_DmHnM', 'ssl': '1'}, - # data=json.dumps(None), - # timeout=self.timeout) - # json_resp = json.loads(req.text) - # print json_resp - def get_public_url_info(self, url): """ Get size and name from a public url, dict returned @@ -917,8 +1008,6 @@ class Mega(object): Get size and name of a public file """ data = self._api_request({'a': 'g', 'p': file_handle, 'ssm': 1}) - - # if numeric error code response if isinstance(data, int): raise RequestError(data) diff --git a/src/tests/tests.py b/src/tests/tests.py index 41f6fe8..214cdea 100644 --- a/src/tests/tests.py +++ b/src/tests/tests.py @@ -1,21 +1,10 @@ -""" -These unit tests will upload a test file,a test folder and a test contact, -Perform api operations on them, -And them remove them from your account. -""" -import unittest import random import os +import pytest + from mega import Mega -mega = Mega() -# anonymous login -m = mega.login() -# normal login -# m = mega.login(email, password) - -FIND_RESP = None TEST_CONTACT = 'test@mega.co.nz' TEST_PUBLIC_URL = ( 'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps' @@ -24,83 +13,131 @@ TEST_FILE = os.path.basename(__file__) TEST_FOLDER = 'mega.py_testfolder_{0}'.format(random.random()) -class TestMega(unittest.TestCase): - - def test_mega(self): - self.assertIsInstance(mega, Mega) - - def test_login(self): - self.assertIsInstance(mega, Mega) - - def test_get_user(self): - resp = m.get_user() - self.assertIsInstance(resp, dict) - - def test_get_quota(self): - resp = m.get_quota() - self.assertIsInstance(int(resp), int) - - def test_get_storage_space(self): - resp = m.get_storage_space(mega=True) - self.assertIsInstance(resp, dict) - - def test_get_files(self): - files = m.get_files() - self.assertIsInstance(files, dict) - - def test_get_link(self): - file = m.find(TEST_FILE) - if file: - link = m.get_link(file) - self.assertIsInstance(link, str) - - def test_import_public_url(self): - resp = m.import_public_url(TEST_PUBLIC_URL) - file_handle = m.get_id_from_obj(resp) - resp = m.destroy(file_handle) - self.assertIsInstance(resp, int) - - def test_create_folder(self): - resp = m.create_folder(TEST_FOLDER) - self.assertIsInstance(resp, dict) - - def test_rename(self): - file = m.find(TEST_FOLDER) - if file: - resp = m.rename(file, TEST_FOLDER) - self.assertIsInstance(resp, int) - - def test_delete_folder(self): - folder_node = m.find(TEST_FOLDER)[0] - resp = m.delete(folder_node) - self.assertIsInstance(resp, int) - - def test_delete(self): - file = m.find(TEST_FILE) - if file: - resp = m.delete(file[0]) - self.assertIsInstance(resp, int) - - def test_destroy(self): - file = m.find(TEST_FILE) - if file: - resp = m.destroy(file[0]) - self.assertIsInstance(resp, int) - - def test_empty_trash(self): - # resp None if already empty, else int - resp = m.empty_trash() - if resp is not None: - self.assertIsInstance(resp, int) - - def test_add_contact(self): - resp = m.add_contact(TEST_CONTACT) - self.assertIsInstance(resp, int) - - def test_remove_contact(self): - resp = m.remove_contact(TEST_CONTACT) - self.assertIsInstance(resp, int) +@pytest.fixture +def mega(): + mega_ = Mega() + mega_.login(email=os.environ['EMAIL'], password=os.environ['PASS']) + node = mega_.create_folder(TEST_FOLDER) + yield mega_ + node_id = node['f'][0]['h'] + mega_.destroy(node_id) -if __name__ == '__main__': - unittest.main() +def test_mega(mega): + assert isinstance(mega, Mega) + + +def test_login(mega): + assert isinstance(mega, Mega) + + +def test_get_user(mega): + resp = mega.get_user() + assert isinstance(resp, dict) + + +def test_get_quota(mega): + resp = mega.get_quota() + assert isinstance(int(resp), int) + + +def test_get_storage_space(mega): + resp = mega.get_storage_space(mega=True) + assert isinstance(resp, dict) + + +def test_get_files(mega): + files = mega.get_files() + assert isinstance(files, dict) + + +def test_get_link(mega): + file = mega.find(TEST_FILE) + if file: + link = mega.get_link(file) + assert isinstance(link, str) + + +class TestExport: + + def test_export_folder(self, mega): + public_url = None + for _ in range(2): + result_public_share_url = mega.export(TEST_FOLDER) + + if not public_url: + public_url = result_public_share_url + + assert result_public_share_url.startswith('https://mega.co.nz/#F!') + assert result_public_share_url == public_url + + def test_export_single_file(self, mega): + # Upload a single file into a folder + folder = mega.find(TEST_FOLDER) + dest_node_id = folder[1]['h'] + result = mega.upload( + __file__, dest=dest_node_id, dest_filename='test.py' + ) + path = f'{TEST_FOLDER}/test.py' + assert mega.find(path) + + for _ in range(2): + result_public_share_url = mega.export(path) + + assert result_public_share_url.startswith('https://mega.co.nz/#!') + + +def test_import_public_url(mega): + resp = mega.import_public_url(TEST_PUBLIC_URL) + file_handle = mega.get_id_from_obj(resp) + resp = mega.destroy(file_handle) + assert isinstance(resp, int) + + +def test_create_folder(mega): + resp = mega.create_folder(TEST_FOLDER) + assert isinstance(resp, dict) + + +def test_rename(mega): + file = mega.find(TEST_FOLDER) + if file: + resp = mega.rename(file, TEST_FOLDER) + assert isinstance(resp, int) + + +def test_delete_folder(mega): + folder_node = mega.find(TEST_FOLDER)[0] + resp = mega.delete(folder_node) + assert isinstance(resp, int) + + +def test_delete(mega): + file = mega.find(TEST_FILE) + if file: + resp = mega.delete(file[0]) + assert isinstance(resp, int) + + +def test_destroy(mega): + file = mega.find(TEST_FILE) + if file: + resp = mega.destroy(file[0]) + assert isinstance(resp, int) + + +def test_empty_trash(mega): + # resp None if already empty, else int + resp = mega.empty_trash() + if resp is not None: + assert isinstance(resp, int) + + +def test_add_contact(mega): + resp = mega.add_contact(TEST_CONTACT) + assert isinstance(resp, int) + + +def test_remove_contact(mega): + resp = mega.remove_contact(TEST_CONTACT) + assert isinstance(resp, int)