From 3507b72eb5cb28edf86bd33b6b94cd2c2037c594 Mon Sep 17 00:00:00 2001 From: Ethan Dalool Date: Wed, 23 Dec 2020 20:16:07 -0800 Subject: [PATCH] Add RequestDraft which allows for batching of arbitrary API calls. --- src/mega/mega.py | 1842 ++++++++++++++++++++++++++-------------------- 1 file changed, 1034 insertions(+), 808 deletions(-) diff --git a/src/mega/mega.py b/src/mega/mega.py index 5820d50..7311b74 100644 --- a/src/mega/mega.py +++ b/src/mega/mega.py @@ -27,6 +27,11 @@ NODE_TYPE_ROOT = 2 NODE_TYPE_INBOX = 3 NODE_TYPE_TRASH = 4 +class RequestDraft: + def __init__(self, request, finalize): + self.request = request + self.finalize = finalize + class Mega: def __init__(self): self.schema = 'https' @@ -36,60 +41,108 @@ class Mega: self.sequence_num = random.randint(0, 0xFFFFFFFF) self.request_id = crypto.make_id(10) self._cached_trash_folder_node_id = None + self._cached_root_node_id = None self.shared_keys = {} self.requests_session = requests.Session() @tenacity.retry( retry=tenacity.retry_if_exception_type(errors.EAGAIN), - wait=tenacity.wait_exponential(multiplier=2, min=2, max=60) + wait=tenacity.wait_exponential(multiplier=2, min=2, max=60), ) - def _api_request(self, data, params={}): - req_params = {'id': self.sequence_num} + def _api_request(self, request_data, params={}): + request_params = {'id': self.sequence_num} self.sequence_num += 1 if self.sid: - req_params.update({'sid': self.sid}) + request_params['sid'] = self.sid - req_params.update(params) + request_params.update(params) # ensure input data is a list - if not isinstance(data, list): - data = [data] + if not isinstance(request_data, list): + request_data = [request_data] - url = f'{self.schema}://g.api.{self.domain}/cs' - req = self.requests_session.post( - url, - params=req_params, - data=json.dumps(data), + request_json = [d.request if isinstance(d, RequestDraft) else d for d in request_data] + response = self.requests_session.post( + url=f'{self.schema}://g.api.{self.domain}/cs', + params=request_params, + data=json.dumps(request_json), timeout=self.timeout, ) - json_resp = json.loads(req.text) - if isinstance(json_resp, list) and isinstance(json_resp[0], int): - json_resp = json_resp[0] - if isinstance(json_resp, int): + responses = json.loads(response.text) + + if isinstance(responses, int): # If this raises EAGAIN it'll be caught by tenacity retry. - raise errors.error_for_code(json_resp) - if len(json_resp) == 1: - return json_resp[0] - else: - return json_resp + raise errors.error_for_code(responses) + + if len(request_data) != len(responses): + message = 'Number of requests and responses don\'t match.' + message += f' {len(request_data)} != {len(responses)}.' + raise errors.RequestError(message) + + if len(responses) == 1: + request = request_data[0] + response = responses[0] + + if response == 0: + return response + + elif isinstance(response, int): + # If this raises EAGAIN it'll be caught by tenacity retry. + raise errors.error_for_code(response) + + elif isinstance(request, RequestDraft): + response = request.finalize(response) + + return response + + final_response = [] + for (request, response) in zip(request_data, responses): + if response == 0: + pass + + elif isinstance(responses, int): + response = errors.error_for_code(response) + + elif isinstance(request, RequestDraft): + response = request.finalize(response) + + final_response.append(response) + + return final_response + + # CACHED SPECIAL NODES ######################################################################### + + @property + def _root_node_id(self): + if self._cached_root_node_id is None: + self._cached_root_node_id = self.get_node_by_type(NODE_TYPE_ROOT)[0] + return self._cached_root_node_id + + @property + def _trash_folder_node_id(self): + if self._cached_trash_folder_node_id is None: + self._cached_trash_folder_node_id = self.get_node_by_type(NODE_TYPE_TRASH)[0] + return self._cached_trash_folder_node_id + + # LOGIN ######################################################################################## def _api_account_version_and_salt(self, email): - """ + ''' The `us0` request returns a dictionary like {'v': 1} if the account is a v1 account, or {'v': 2, 's': '*salt*'} if the account is v2 or higher. This function will return a tuple (version, salt) where salt is None if the version is 1. - """ + ''' resp = self._api_request({'a': 'us0', 'user': email}) account_version = resp['v'] user_salt = resp.get('s', None) return (account_version, user_salt) def _api_start_session(self, user, user_hash=None): - """ + ''' The `us` request returns a dictionary like { 'tsid': 'session' (if temporary session), @@ -99,28 +152,22 @@ class Mega: 'u': 'user id', 'ach': 1 (I don't know, it's always 1 for me) } - """ + ''' request = {'a': 'us', 'user': user} if user_hash is not None: request['uh'] = user_hash resp = self._api_request(request) return resp - @property - def _trash_folder_node_id(self): - if self._cached_trash_folder_node_id is None: - self._cached_trash_folder_node_id = self.get_node_by_type(NODE_TYPE_TRASH)[0] - return self._cached_trash_folder_node_id - def login(self, email=None, password=None): if email: - self._login_user(email, password) + self.login_user(email, password) else: self.login_anonymous() logger.info('Login complete') return self - def _login_user(self, email, password): + def login_user(self, email, password): logger.info('Logging in user...') email = email.lower() (account_version, user_salt) = self._api_account_version_and_salt(email) @@ -144,8 +191,6 @@ class Mega: user_hash = crypto.stringhash(email, password_aes) resp = self._api_start_session(email, user_hash) - if isinstance(resp, int): - raise errors.RequestError(resp) self._login_process(resp, password_aes) def login_anonymous(self): @@ -161,8 +206,6 @@ class Mega: user = self._api_request({'a': 'up', 'k': k, 'ts': ts}) resp = self._api_start_session(user) - if isinstance(resp, int): - raise errors.RequestError(resp) self._login_process(resp, password_key) def _login_process(self, resp, password): @@ -221,11 +264,42 @@ class Mega: sid = binascii.unhexlify('0' + sid if len(sid) % 2 else sid) self.sid = crypto.base64_url_encode(sid[:43]) - def _parse_url(self, url): - """ + # HELPER METHODS ############################################################################### + + def get_id_from_obj(self, node_data): + ''' + Get node id from a file object + ''' + node_id = None + + for i in node_data['f']: + if i['h'] != '': + node_id = i['h'] + return node_id + + def normalize_node(self, node): + if isinstance(node, dict): + return node + if isinstance(node, int): + return self.get_node_by_type(node)[1] + + def normalize_node_id(self, node): + if node is None: + return self._root_node_id + elif isinstance(node, int): + return self.get_node_by_type(node)[1]['h'] + elif isinstance(node, dict): + return node['h'] + elif isinstance(node, str): + return node + else: + raise TypeError(f'Invalid node {node}.') + + def parse_url(self, url): + ''' Given a url like 'https://mega.nz/#!fileid!filekey', return a tuple (fileid, filekey). - """ + ''' # File urls are '#!', Folder urls are '#F!' if '/file/' in url: # V2 URL structure @@ -241,487 +315,157 @@ class Mega: (public_handle, decryption_key) = match[0] return (public_handle, decryption_key) - def _process_file(self, file): - if file['t'] in [NODE_TYPE_FILE, NODE_TYPE_DIR]: - keys = dict( - keypart.split(':', 1) for keypart in file['k'].split('/') - if ':' in keypart) - uid = file['u'] - key = None - # my objects - if uid in keys: - key = crypto.decrypt_key(crypto.base64_to_a32(keys[uid]), self.master_key) - # shared folders - elif 'su' in file and 'sk' in file and ':' in file['k']: - shared_key = crypto.decrypt_key( - crypto.base64_to_a32(file['sk']), self.master_key - ) - key = crypto.decrypt_key(crypto.base64_to_a32(keys[file['h']]), shared_key) - if file['su'] not in self.shared_keys: - self.shared_keys[file['su']] = {} - self.shared_keys[file['su']][file['h']] = shared_key - # shared files - elif file['u'] and file['u'] in self.shared_keys: - for hkey in self.shared_keys[file['u']]: - shared_key = self.shared_keys[file['u']][hkey] - if hkey in keys: - key = keys[hkey] - key = crypto.decrypt_key(crypto.base64_to_a32(key), shared_key) - break - if file['h'] and file['h'] in self.shared_keys.get('EXP', ()): - shared_key = self.shared_keys['EXP'][file['h']] - encrypted_key = crypto.str_to_a32( - crypto.base64_url_decode(file['k'].split(':')[-1]) - ) - key = crypto.decrypt_key(encrypted_key, shared_key) - file['shared_folder_key'] = shared_key - if key is not None: - if file['t'] == NODE_TYPE_FILE: - k = crypto.interleave_xor_8(key) - file['iv'] = key[4:6] + (0, 0) - file['meta_mac'] = key[6:8] - else: - k = key - file['key'] = key - file['k'] = k - attributes = crypto.base64_url_decode(file['a']) - attributes = crypto.decrypt_attr(attributes, k) - file['a'] = attributes - # other => wrong object - elif file['k'] == '': - file['a'] = False - elif file['t'] == NODE_TYPE_ROOT: - self.root_id = file['h'] - file['a'] = {'n': 'Cloud Drive'} - elif file['t'] == NODE_TYPE_INBOX: - self.inbox_id = file['h'] - file['a'] = {'n': 'Inbox'} - elif file['t'] == NODE_TYPE_TRASH: - self.trashbin_id = file['h'] - file['a'] = {'n': 'Rubbish Bin'} - return file + # CONTACTS ##################################################################################### - def _init_shared_keys(self, files): - """ - Init shared key not associated with a user. - Seems to happen when a folder is shared, - some files are exchanged and then the - folder is un-shared. - Keys are stored in files['s'] and files['ok'] - """ - ok_dict = {} + def _draft_add_remove_contact(self, email, add): + if not isinstance(add, bool): + raise errors.ValidationError(f'`add` must be of type bool, not {type(add)}.') - for ok_item in files.get('ok', []): - shared_key = crypto.decrypt_key( - crypto.base64_to_a32(ok_item['k']), self.master_key - ) - ok_dict[ok_item['h']] = shared_key - for s_item in files.get('s', []): - if s_item['u'] not in self.shared_keys: - self.shared_keys[s_item['u']] = {} - if s_item['h'] in ok_dict: - self.shared_keys[s_item['u']][s_item['h']] = ok_dict[s_item['h']] + l = '1' if add else '0' - def find_path_descriptor(self, path, files=()): - """ - Find descriptor of folder inside a path. i.e.: folder1/folder2/folder3 - Params: - path: string like 'folder1/folder2/folder3' - Return: - Descriptor (str) of folder3 if exists, None otherwise - """ - paths = path.split('/') + if not re.match(r"[^@]+@[^@]+\.[^@]+", email): + raise errors.ValidationError('add_contact requires a valid email address') - files = files or self.get_files() - parent_desc = self.root_id - found = False - for foldername in paths: - if foldername != '': - for file in files.items(): - if (file[1]['a'] and file[1]['t'] - and file[1]['a']['n'] == foldername): - if parent_desc == file[1]['p']: - parent_desc = file[0] - found = True - if found: - found = False - else: - return None - return parent_desc + request = { + 'a': 'ur', + 'u': email, + 'l': l, + 'i': self.request_id + } + return request - def find(self, filename=None, handle=None, exclude_deleted=False): - """ - Return file object from given filename - """ - files = self.get_files() - if handle: - return files[handle] - path = pathlib.Path(filename) - filename = path.name - parent_dir_name = path.parent.name - for file in list(files.items()): - parent_node_id = None - try: - if parent_dir_name: - parent_node_id = self.find_path_descriptor(parent_dir_name, - files=files) - if (filename and parent_node_id and file[1]['a'] - and file[1]['a']['n'] == filename - and parent_node_id == file[1]['p']): - if (exclude_deleted and self._trash_folder_node_id - == file[1]['p']): - continue - return file - elif (filename and file[1]['a'] - and file[1]['a']['n'] == filename): - if (exclude_deleted - and self._trash_folder_node_id == file[1]['p']): - continue - return file - except TypeError: + def draft_add_contact(self, email): + ''' + Add a user to your mega contact list. + ''' + return self._draft_add_remove_contact(email, True) + + def final_add_contact(self, response): + return response + + def add_contact(self, *args, **kwargs): + request = self.draft_add_contact(*args, **kwargs) + draft = RequestDraft(request, self.final_add_contact) + return self._api_request(draft) + + def draft_remove_contact(self, email): + ''' + Remove a user from your mega contact list. + ''' + return self._draft_add_remove_contact(email, False) + + def final_remove_contact(self, response): + return response + + def remove_contact(self, *args, **kwargs): + request = self.draft_remove_contact(*args, **kwargs) + draft = RequestDraft(request, self.final_remove_contact) + return self._api_request(draft) + + # CREATE FOLDER ################################################################################ + + def _mkdir(self, name, parent_node_id): + # generate random aes key (128) for folder + ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)] + + # encrypt attribs + attribs = {'n': name} + encrypt_attribs = crypto.base64_url_encode(crypto.encrypt_attr(attribs, ul_key[:4])) + encrypted_key = crypto.a32_to_base64(crypto.encrypt_key(ul_key[:4], self.master_key)) + + request = { + 'a': 'p', + 't': parent_node_id, + 'n': [ + { + 'h': 'xxxxxxxx', + 't': NODE_TYPE_DIR, + 'a': encrypt_attribs, + 'k': encrypted_key + } + ], + 'i': self.request_id + } + data = self._api_request(request) + return data + + def create_folder(self, name, dest=None): + dirs = tuple(dir_name for dir_name in str(name).split('/') if dir_name) + folder_node_ids = {} + if dest is None: + folder_node_ids[-1] = dest + else: + folder_node_ids[-1] = self._root_node_id + + for (index, directory_name) in enumerate(dirs): + existing_node_id = self.find_path_descriptor(directory_name) + if existing_node_id: + folder_node_ids[index] = existing_node_id continue - - def get_files(self, public_folder_handle=None): - logger.info('Getting all files...') - - params = {} - if public_folder_handle is not None: - params['n'] = public_folder_handle - - files = self._api_request({'a': 'f', 'c': 1, 'r': 1}, params=params) - - files_dict = {} - self._init_shared_keys(files) - for file in files['f']: - processed_file = self._process_file(file) - # ensure each file has a name before returning - if processed_file['a']: - files_dict[file['h']] = processed_file - self._nodes = files_dict - return files_dict - - def get_upload_link(self, file): - """ - Get a file's public link including decryption key - Requires upload() response as input - """ - if 'f' in file: - file = file['f'][0] - public_handle = self._api_request({'a': 'l', 'n': file['h']}) - file_key = file['k'][file['k'].index(':') + 1:] - decrypted_key = crypto.a32_to_base64( - crypto.decrypt_key(crypto.base64_to_a32(file_key), self.master_key) + parent_node_id = folder_node_ids[index - 1] + created_node = self._mkdir( + name=directory_name, + parent_node_id=parent_node_id, ) - return ( - f'{self.schema}://{self.domain}' - f'/#!{public_handle}!{decrypted_key}' - ) - else: - raise ValueError('''Upload() response required as input, - use get_links() for regular file input''') + node_id = created_node['f'][0]['h'] + folder_node_ids[index] = node_id + folder_node_ids.pop(-1) - def get_links(self, files): - """ - Get a file public links from given file objects. - """ - if not isinstance(files, list): - files = [files] + return dict(zip(dirs, folder_node_ids.values())) - files = [self._node_data(file) for file in files] + # DESTROY ###################################################################################### - if not all('h' in file and 'k' in file for file in files): - raise errors.ValidationError('File id and key must be present') - - request = [{'a': 'l', 'n': file['h']} for file in files] - public_handles = self._api_request(request) - if public_handles == -11: - raise errors.RequestError( - "Can't get a public link from that file " - "(is this a shared file?)" - ) - urls = {} - for (file, public_handle) in zip(files, public_handles): - decrypted_key = crypto.a32_to_base64(file['key']) - url = f'{self.schema}://{self.domain}/#!{public_handle}!{decrypted_key}' - urls[file['h']] = url - return urls - - def _node_data(self, node): - if isinstance(node, dict): - return node - try: - return node[1] - except (IndexError, KeyError): - return node - - def get_folder_link(self, file): - try: - file = file[1] - except (IndexError, KeyError): - pass - if 'h' in file and 'k' in file: - public_handle = self._api_request({'a': 'l', 'n': file['h']}) - if public_handle == -11: - raise errors.RequestError( - "Can't get a public link from that file " - "(is this a shared file?)" - ) - decrypted_key = crypto.a32_to_base64(file['shared_folder_key']) - return ( - f'{self.schema}://{self.domain}' - f'/#F!{public_handle}!{decrypted_key}' - ) - else: - raise errors.ValidationError('File id and key must be present') - - def get_user(self): - user_data = self._api_request({'a': 'ug'}) - return user_data - - def get_node_by_type(self, type): - """ - Get a node by it's numeric type id, e.g: - 2: special: root cloud drive - 3: special: inbox - 4: special: trash bin - """ - # Should we also check for NODE_TYPE_FILE, NODE_TYPE_DIR here? - nodes = self.get_files() - for node in list(nodes.items()): - if node[1]['t'] == type: - return node - - def get_files_in_node(self, target): - """ - Get all files in a given target. - Params: - target: a node's id string, or one of the special nodes - e.g. NODE_TYPE_TRASH. - """ - if type(target) == int: - if target in [NODE_TYPE_FILE, NODE_TYPE_DIR]: - raise TypeError('Can\'t use file or dir node type.') - node_id = self.get_node_by_type(target)[0] - else: - node_id = target - - files = self._api_request({'a': 'f', 'c': 1}) - # MERGE COMMON CODE WITH GET_FILES - files_dict = {} - self._init_shared_keys(files) - for file in files['f']: - processed_file = self._process_file(file, self.shared_keys) - if processed_file['a'] and processed_file['p'] == node_id: - files_dict[file['h']] = processed_file - return files_dict - - def get_id_from_public_handle(self, public_handle): - node_data = self._api_request({'a': 'f', 'f': 1, 'p': public_handle}) - node_id = self.get_id_from_obj(node_data) - return node_id - - def get_id_from_obj(self, node_data): - """ - Get node id from a file object - """ - node_id = None - - for i in node_data['f']: - if i['h'] != '': - node_id = i['h'] - return node_id - - def get_transfer_quota(self): - """ - Get transfer quota usage and maximum. - """ - request = { - 'a': 'uq', - 'xfer': 1, - 'v': 1 - } - json_resp = self._api_request(request) - if json_resp['utype'] == 0: - # For free accounts, there is no specified limit and your bandwidth - # is measured in a 6-hour rolling window. - response = { - 'total': None, - 'used': sum(json_resp['tah']), - 'remaining': None, - } - else: - # For Pro users, bandwidth limits are clearly defined by the - # account and the response contains simple integers for total, used. - response = { - 'total': json_resp['mxfer'], - 'used': json_resp['caxfer'], - 'remaining': json_resp['mxfer'] - json_resp['caxfer'], - } - return response - - def get_storage_quota(self): - """ - Get disk quota usage and maximum. - """ - request = { - 'a': 'uq', - 'strg': 1, - 'v': 1 - } - json_resp = self._api_request(request) - response = { - 'total': json_resp['mstrg'], - 'used': json_resp['cstrg'], - 'remaining': json_resp['mstrg'] - json_resp['cstrg'], - } - return response - - def get_balance(self): - """ - Get account monetary balance, Pro accounts only - """ - user_data = self._api_request({"a": "uq", "pro": 1}) - if 'balance' in user_data: - return user_data['balance'] - - def delete(self, file_id): - """ - Delete a file by its file id. - """ - return self.move(file_id, self._trash_folder_node_id) - - def delete_url(self, url): - """ - Delete a file by its public url. - """ - (public_handle, decryption_key) = self._parse_url(url) - file_id = self.get_id_from_public_handle(public_handle) - return self.move(file_id, self._trash_folder_node_id) - - def destroy(self, file_id): - """ - Destroy a file by its file id. - """ + def draft_destroy_file(self, file_id): request = { 'a': 'd', 'n': file_id, 'i': self.request_id } - return self._api_request(request) + return RequestDraft(request, self.final_destroy) + + def final_destroy_file(self, response): + return response + + def destroy_file(self, *args, **kwargs): + ''' + Completely delete a file by its file id. + ''' + return self._api_request(self.draft_destroy(*args, **kwargs)) def destroy_url(self, url): - """ + ''' Destroy a file by its public url. - """ - (public_handle, decryption_key) = self._parse_url(url) + Because this relies on the get_id_from_public_handle endpoint to + work, this function does not offer drafts + ''' + (public_handle, decryption_key) = self.parse_url(url) file_id = self.get_id_from_public_handle(public_handle) return self.destroy(file_id) - def empty_trash(self): - # get list of files in rubbish out - files = self.get_files_in_node(self._trash_folder_node_id) + def destroy_urls(self, urls): + ''' + Destroy multiple files by their public urls. + Because this relies on the get_id_from_public_handle endpoint to + work, this function does not offer drafts and will take care of + batching by itself. + ''' + parseds = [self.parse_url(url) for url in urls] + requests = [self.draft_get_id_from_public_handle(handle) for (handle, key) in parseds] + file_ids = self._api_request(requests) + requests = [self.draft_destroy(file_id) for file_id in file_ids] + return self._api_request(requests) - # make a list of json - if files != {}: - post_list = [] - for file in files: - post_list.append({"a": "d", "n": file, "i": self.request_id}) - return self._api_request(post_list) + # DOWNLOAD ##################################################################################### - def download(self, file, dest_path=None, dest_filename=None): - """ - Download a file by it's file object - """ - return self._download_file(file_handle=None, - file_key=None, - file=file[1], - dest_path=dest_path, - dest_filename=dest_filename, - is_public=False) - - def export_files(self, nodes): - if not isinstance(nodes, list): - nodes = [nodes] - - node_datas = [self._node_data(node) for node in nodes] - requests = [{'a': 'l', 'n': node_data['h'], 'i': self.request_id} for node_data in node_datas] - self._api_request(requests) - return self.get_links(nodes) - - def export(self, path=None, node_id=None): - if node_id: - nodes = self.get_files() - node = nodes[node_id] - else: - node = self.find(path) - - node_data = self._node_data(node) - if node_data['t'] == NODE_TYPE_FILE: - return self.export_files(node) - - if node: - try: - # If already exported - return self.get_folder_link(node) - except (errors.RequestError, KeyError): - pass - - master_key_cipher = AES.new(crypto.a32_to_str(self.master_key), AES.MODE_ECB) - ha = crypto.base64_url_encode( - master_key_cipher.encrypt(node_data['h'].encode("utf8") + node_data['h'].encode("utf8")) - ) - - share_key = secrets.token_bytes(16) - ok = crypto.base64_url_encode(master_key_cipher.encrypt(share_key)) - - share_key_cipher = AES.new(share_key, AES.MODE_ECB) - node_key = node_data['k'] - encrypted_node_key = crypto.base64_url_encode( - share_key_cipher.encrypt(crypto.a32_to_str(node_key)) - ) - - node_id = node_data['h'] - request_body = [{ - 'a': - 's2', - 'n': - node_id, - 's': [{ - 'u': 'EXP', - 'r': 0 - }], - 'i': - self.request_id, - 'ok': - ok, - 'ha': - ha, - 'cr': [[node_id], [node_id], [0, 0, encrypted_node_key]] - }] - self._api_request(request_body) - nodes = self.get_files() - return self.get_folder_link(nodes[node_id]) - - def download_url(self, url, dest_path=None, dest_filename=None): - """ - Download a file by it's public url - """ - (public_handle, decryption_key) = self._parse_url(url) - return self._download_file( - file_handle=public_handle, - file_key=decryption_key, - dest_path=dest_path, - dest_filename=dest_filename, - is_public=True, - ) - - def _download_file(self, - file_handle, - file_key, - dest_path=None, - dest_filename=None, - is_public=False, - file=None): + def _download_file( + self, + file_handle, + file_key, + dest_path=None, + dest_filename=None, + is_public=False, + file=None, + ): if file is None: if is_public: file_key = crypto.base64_to_a32(file_key) @@ -815,6 +559,798 @@ class Mega: shutil.move(temp_output_file.name, output_path) return output_path + def download_file(self, file, dest_path=None, dest_filename=None): + ''' + Download a file by its file object + ''' + return self._download_file( + file_handle=None, + file_key=None, + file=file[1], + dest_path=dest_path, + dest_filename=dest_filename, + is_public=False, + ) + + def download_url(self, url, dest_path=None, dest_filename=None): + ''' + Download a file by its public url + ''' + (public_handle, decryption_key) = self.parse_url(url) + return self._download_file( + file_handle=public_handle, + file_key=decryption_key, + dest_path=dest_path, + dest_filename=dest_filename, + is_public=True, + ) + + # EMPTY TRASH ################################################################################## + + def empty_trash(self): + ''' + Because this relies on get_files_in_node, this method does not offer + drafts. + ''' + files = self.get_files_in_node(self._trash_folder_node_id) + + if not files: + return + + drafts = [self.draft_destroy(file) for file in files] + return self._api_request(drafts) + + # EXPORT ####################################################################################### + + def export_file(self, node): + return self.export_files([node]) + + def export_files(self, nodes): + nodes = [self.normalize_node(node) for node in nodes] + request = [{'a': 'l', 'n': node['h'], 'i': self.request_id} for node in nodes] + self._api_request(request) + request = [self.draft_get_file_link(node) for node in nodes] + response = self._api_request(request) + url_map = {node['h']: url for (node, url) in zip(nodes, response)} + return url_map + + ################################################################################################ + + def export_folder(self, node): + ''' + Because this relies on get_files, this function does not offer drafts. + ''' + node = self.normalize_node(node) + + try: + # If already exported + return self.get_folder_link(node) + except (errors.RequestError, KeyError): + pass + + master_key_cipher = AES.new(crypto.a32_to_str(self.master_key), AES.MODE_ECB) + ha = crypto.base64_url_encode( + master_key_cipher.encrypt(node_data['h'].encode("utf8") + node_data['h'].encode("utf8")) + ) + + share_key = secrets.token_bytes(16) + ok = crypto.base64_url_encode(master_key_cipher.encrypt(share_key)) + + share_key_cipher = AES.new(share_key, AES.MODE_ECB) + node_key = node_data['k'] + encrypted_node_key = crypto.base64_url_encode( + share_key_cipher.encrypt(crypto.a32_to_str(node_key)) + ) + + node_id = node_data['h'] + request = { + 'a': 's2', + 'n': node_id, + 's': [{ + 'u': 'EXP', + 'r': 0 + }], + 'i': self.request_id, + 'ok': ok, + 'ha': ha, + 'cr': [[node_id], [node_id], [0, 0, encrypted_node_key]] + } + node_id = self._api_request(request) + nodes = self.get_files() + return nodes[node_id] + + # FIND ######################################################################################### + + def find(self, filename=None, handle=None, exclude_deleted=False): + ''' + Return file object from given filename + ''' + files = self.get_files() + if handle: + return files[handle] + path = pathlib.Path(filename) + filename = path.name + parent_dir_name = path.parent.name + for file in list(files.items()): + parent_node_id = None + try: + if parent_dir_name: + parent_node_id = self.find_path_descriptor( + parent_dir_name, + files=files, + ) + if (filename and parent_node_id and file[1]['a'] + and file[1]['a']['n'] == filename + and parent_node_id == file[1]['p']): + if (exclude_deleted and self._trash_folder_node_id + == file[1]['p']): + continue + return file + elif (filename and file[1]['a'] + and file[1]['a']['n'] == filename): + if (exclude_deleted + and self._trash_folder_node_id == file[1]['p']): + continue + return file + except TypeError: + continue + + def find_path_descriptor(self, path, files=()): + ''' + Find descriptor of folder inside a path. i.e.: folder1/folder2/folder3 + Params: + path: string like 'folder1/folder2/folder3' + Return: + Descriptor (str) of folder3 if exists, None otherwise + ''' + paths = path.split('/') + + files = files or self.get_files() + parent_desc = self.root_id + found = False + for foldername in paths: + if foldername != '': + for file in files.items(): + if (file[1]['a'] and file[1]['t'] + and file[1]['a']['n'] == foldername): + if parent_desc == file[1]['p']: + parent_desc = file[0] + found = True + if found: + found = False + else: + return None + return parent_desc + + # GET BALANCE ################################################################################## + + def draft_get_balance(self): + ''' + Get account monetary balance, Pro accounts only + ''' + request = {"a": "uq", "pro": 1} + return RequestDraft(request, self.final_get_balance) + + def final_get_balance(self, response): + if 'balance' in response: + return response['balance'] + + def get_balance(self, *args, **kwargs): + return self._api_request(self.draft_get_balance(*args, **kwargs)) + + # GET FILE LINK ################################################################################ + + def draft_get_file_link(self, file): + ''' + Get public link from given file object. + ''' + file = self.normalize_node(file) + + if not ('h' in file and 'k' in file): + raise errors.ValidationError('File id and key must be present') + + request = {'a': 'l', 'n': file['h']} + return RequestDraft(request, lambda response: self.final_get_file_link(response, file)) + + def final_get_file_link(self, response, file): + if response == -11: + raise errors.RequestError( + "Can't get a public link from that file " + "(is this a shared file?)" + ) + + public_handle = response + decrypted_key = crypto.a32_to_base64(file['key']) + url = f'{self.schema}://{self.domain}/#!{public_handle}!{decrypted_key}' + return url + + def get_file_link(self, *args, **kwargs): + return self._api_request(self.draft_get_file_link(*args, **kwargs)) + + # GET FILES #################################################################################### + + def _init_shared_keys(self, files): + ''' + Init shared key not associated with a user. + Seems to happen when a folder is shared, + some files are exchanged and then the + folder is un-shared. + Keys are stored in files['s'] and files['ok'] + ''' + ok_dict = {} + + for ok_item in files.get('ok', []): + shared_key = crypto.decrypt_key( + crypto.base64_to_a32(ok_item['k']), self.master_key + ) + ok_dict[ok_item['h']] = shared_key + for s_item in files.get('s', []): + if s_item['u'] not in self.shared_keys: + self.shared_keys[s_item['u']] = {} + if s_item['h'] in ok_dict: + self.shared_keys[s_item['u']][s_item['h']] = ok_dict[s_item['h']] + + def _process_file(self, file): + if file['t'] in [NODE_TYPE_FILE, NODE_TYPE_DIR]: + keys = dict( + keypart.split(':', 1) for keypart in file['k'].split('/') + if ':' in keypart) + uid = file['u'] + key = None + # my objects + if uid in keys: + key = crypto.decrypt_key(crypto.base64_to_a32(keys[uid]), self.master_key) + # shared folders + elif 'su' in file and 'sk' in file and ':' in file['k']: + shared_key = crypto.decrypt_key( + crypto.base64_to_a32(file['sk']), self.master_key + ) + key = crypto.decrypt_key(crypto.base64_to_a32(keys[file['h']]), shared_key) + if file['su'] not in self.shared_keys: + self.shared_keys[file['su']] = {} + self.shared_keys[file['su']][file['h']] = shared_key + # shared files + elif file['u'] and file['u'] in self.shared_keys: + for hkey in self.shared_keys[file['u']]: + shared_key = self.shared_keys[file['u']][hkey] + if hkey in keys: + key = keys[hkey] + key = crypto.decrypt_key(crypto.base64_to_a32(key), shared_key) + break + if file['h'] and file['h'] in self.shared_keys.get('EXP', ()): + shared_key = self.shared_keys['EXP'][file['h']] + encrypted_key = crypto.str_to_a32( + crypto.base64_url_decode(file['k'].split(':')[-1]) + ) + key = crypto.decrypt_key(encrypted_key, shared_key) + file['shared_folder_key'] = shared_key + if key is not None: + if file['t'] == NODE_TYPE_FILE: + k = crypto.interleave_xor_8(key) + file['iv'] = key[4:6] + (0, 0) + file['meta_mac'] = key[6:8] + else: + k = key + file['key'] = key + file['k'] = k + attributes = crypto.base64_url_decode(file['a']) + attributes = crypto.decrypt_attr(attributes, k) + file['a'] = attributes + # other => wrong object + elif file['k'] == '': + file['a'] = False + elif file['t'] == NODE_TYPE_ROOT: + self.root_id = file['h'] + file['a'] = {'n': 'Cloud Drive'} + elif file['t'] == NODE_TYPE_INBOX: + self.inbox_id = file['h'] + file['a'] = {'n': 'Inbox'} + elif file['t'] == NODE_TYPE_TRASH: + self.trashbin_id = file['h'] + file['a'] = {'n': 'Rubbish Bin'} + return file + + def get_files(self, public_folder_handle=None): + logger.info('Getting all files...') + + params = {} + if public_folder_handle is not None: + params['n'] = public_folder_handle + + files = self._api_request({'a': 'f', 'c': 1, 'r': 1}, params=params) + + files_dict = {} + self._init_shared_keys(files) + for file in files['f']: + processed_file = self._process_file(file) + # ensure each file has a name before returning + if processed_file['a']: + files_dict[file['h']] = processed_file + self._nodes = files_dict + return files_dict + + def get_files_in_node(self, target): + ''' + Get all files in a given target. + Params: + target: a node's id string, or one of the special nodes + e.g. NODE_TYPE_TRASH. + ''' + if type(target) == int: + if target in [NODE_TYPE_FILE, NODE_TYPE_DIR]: + raise TypeError('Can\'t use file or dir node type.') + node_id = self.get_node_by_type(target)[0] + else: + node_id = target + + files = self._api_request({'a': 'f', 'c': 1}) + # MERGE COMMON CODE WITH GET_FILES + files_dict = {} + self._init_shared_keys(files) + for file in files['f']: + processed_file = self._process_file(file, self.shared_keys) + if processed_file['a'] and processed_file['p'] == node_id: + files_dict[file['h']] = processed_file + return files_dict + + # GET FOLDER LINK ############################################################################## + + def draft_get_folder_link(self, file): + file = self.normalize_node(file) + + if not ('h' in file and 'k' in file): + raise errors.ValidationError('File id and key must be present') + + request = {'a': 'l', 'n': file['h']} + return RequestDraft(request, lambda response: self.final_get_folder_link(response, file)) + + def final_get_folder_link(self, response, file): + # THIS WILL NEVER HAPPEN DUE TO RAISE BY CODE. + if response == -11: + raise errors.RequestError( + "Can't get a public link from that folder " + "(is this a shared folder?)" + ) + + public_handle = response + decrypted_key = crypto.a32_to_base64(file['shared_folder_key']) + return ( + f'{self.schema}://{self.domain}/#F!{public_handle}!{decrypted_key}' + ) + + def get_folder_link(self, *args, **kwargs): + return self._api_request(self.draft_get_folder_link(*args, **kwargs)) + + # GET ID FROM HANDLE ########################################################################### + + def draft_get_id_from_public_handle(self, public_handle): + request = {'a': 'f', 'f': 1, 'p': public_handle} + return RequestDraft(request, self.final_get_id_from_public_handle) + + def final_get_id_from_public_handle(self, response): + node_id = self.get_id_from_obj(response) + return node_id + + def get_id_from_public_handle(self, *args, **kwargs): + return self._api_request(self.draft_get_id_from_public_handle(*args, **kwargs)) + + # GET NODE BY TYPE ############################################################################# + + def get_node_by_type(self, type): + ''' + Get a node by it's numeric type id, e.g: + 2: special: root cloud drive + 3: special: inbox + 4: special: trash bin + ''' + # Should we also check for NODE_TYPE_FILE, NODE_TYPE_DIR here? + nodes = self.get_files() + for node in list(nodes.items()): + if node[1]['t'] == type: + return node + + # GET PUBLIC FILE INFO ######################################################################### + + def draft_get_public_file_info(self, file_handle, file_key): + request = {'a': 'g', 'p': file_handle, 'ssm': 1} + return RequestDraft(request, self.final_get_public_file_info) + + def final_get_public_file_info(self, response): + data = response + if isinstance(data, int): + raise errors.RequestError(data) + + if 'at' not in data or 's' not in data: + raise ValueError("Unexpected result", data) + + key = crypto.base64_to_a32(file_key) + k = crypto.interleave_xor_8(key) + + size = data['s'] + unencrypted_attrs = crypto.decrypt_attr(crypto.base64_url_decode(data['at']), k) + if not unencrypted_attrs: + return None + result = {'size': size, 'name': unencrypted_attrs['n']} + return result + + def get_public_file_info(self, *args, **kwargs): + ''' + Get size and name of a public file. + ''' + return self._api_request(self.draft_get_public_file_info(*args, **kwargs)) + + # GET PUBLIC FOLDER INFO ####################################################################### + + def get_public_folder_info(self, folder_handle, folder_key): + ''' + Get the total size of a public folder. + ''' + # At the moment, the key is not actually needed. However if we decide + # to extract more statistics, then we may need it and I'd rather not + # change the function interface when that happens. So let's just take + # the key now even though it does nothing. + files = self.get_public_folder_files(folder_handle).values() + size = sum(file['s'] for file in files if file['t'] == NODE_TYPE_FILE) + return {'size': size} + + def get_public_folder_files(self, folder_handle): + # At the moment, the returned files will not have a decrypted 'a'. + # TODO: cross-reference process_files code and figure out how to + # decrypt them + return self.get_files(public_folder_handle=folder_handle) + + # GET PUBLIC URL INFO ########################################################################## + + def get_public_url_info(self, url): + ''' + Dispatch to get_public_folder_info and get_public_file_info. + ''' + (public_handle, decryption_key) = self.parse_url(url) + if '/#F!' in url: + return self.get_public_folder_info(public_handle, decryption_key) + else: + return self.get_public_file_info(public_handle, decryption_key) + + # GET STORAGE QUOTA ############################################################################ + + def draft_get_storage_quota(self): + ''' + Get disk quota usage and maximum. + ''' + request = { + 'a': 'uq', + 'strg': 1, + 'v': 1 + } + return RequestDraft(request, self.final_get_storage_quota) + + def final_get_storage_quota(self, response): + response = { + 'total': response['mstrg'], + 'used': response['cstrg'], + 'remaining': response['mstrg'] - response['cstrg'], + } + return response + + def get_storage_quota(self, *args, **kwargs): + return self._api_request(self.draft_get_storage_quota(*args, **kwargs)) + + # GET TRANSFER QUOTA ########################################################################### + + def draft_get_transfer_quota(self): + ''' + Get transfer quota usage and maximum. + ''' + request = { + 'a': 'uq', + 'xfer': 1, + 'v': 1 + } + return RequestDraft(request, self.final_get_transfer_quota) + + def final_get_transfer_quota(self, response): + if response['utype'] == 0: + # For free accounts, there is no specified limit and your bandwidth + # is measured in a 6-hour rolling window. + response = { + 'total': None, + 'used': sum(response['tah']), + 'remaining': None, + } + else: + # For Pro users, bandwidth limits are clearly defined by the + # account and the response contains simple integers for total, used. + response = { + 'total': response['mxfer'], + 'used': response['caxfer'], + 'remaining': response['mxfer'] - response['caxfer'], + } + return response + + def get_transfer_quota(self, *args, **kwargs): + return self._api_request(self.draft_get_transfer_quota(*args, **kwargs)) + + # GET FILE LINK FROM UPLOAD #################################################################### + + def get_upload_link(self, file): + ''' + Get a file's public link including decryption key + Requires upload() response as input + ''' + if 'f' in file: + file = file['f'][0] + public_handle = self._api_request({'a': 'l', 'n': file['h']}) + file_key = file['k'][file['k'].index(':') + 1:] + decrypted_key = crypto.a32_to_base64( + crypto.decrypt_key(crypto.base64_to_a32(file_key), self.master_key) + ) + return f'{self.schema}://{self.domain}/#!{public_handle}!{decrypted_key}' + else: + raise ValueError('''Upload() response required as input, + use get_links() for regular file input''') + + # GET USER INFO ################################################################################ + + def draft_get_user(self): + request = {'a': 'ug'} + return RequestDraft(request, self.final_get_user) + + def final_get_user(self, response): + return response + + def get_user(self, *args, **kwargs): + return self._api_request(self.draft_get_user(*args, **kwargs)) + + # IMPORT PUBLIC FILE ########################################################################### + + def draft_import_public_file( + self, + file_handle, + file_key, + dest_node=None, + dest_name=None, + ): + # Providing dest_node spares an API call to retrieve it. + dest_node = self.normalize_node_id(dest_node) + + # Providing dest_name spares an API call to retrieve it. + if dest_name is None: + pl_info = self.get_public_file_info(file_handle, file_key) + dest_name = pl_info['name'] + + key = crypto.base64_to_a32(file_key) + k = crypto.interleave_xor_8(key) + encrypted_key = crypto.a32_to_base64(crypto.encrypt_key(key, self.master_key)) + encrypted_name = crypto.base64_url_encode(crypto.encrypt_attr({'n': dest_name}, k)) + request = { + 'a': 'p', + 't': dest_node, + 'n': [ + { + 'ph': file_handle, + 't': NODE_TYPE_FILE, + 'a': encrypted_name, + 'k': encrypted_key + } + ] + } + return RequestDraft(request, self.final_import_public_file) + + def final_import_public_file(self, response): + return response + + def import_public_file(self, *args, **kwargs): + ''' + Import the public file into user account + ''' + return self._api_request(self.draft_import_public_file(*args, **kwargs)) + + # IMPORT PUBLIC FOLDER ######################################################################### + + def draft_import_public_folder( + self, + folder_handle, + folder_key, + dest_node=None, + dest_name=None, + ): + dest_node = self.normalize_node_id(dest_node) + + folder_key = crypto.base64_to_a32(folder_key) + + nodes = self.get_public_folder_files(folder_handle) + + # For all files and folders in the public folder, their 'p' will + # correspond to the 'h' of either the public folder, or some nested + # folder within. But, the public folder itself will have a 'p' that + # does not correspond to any 'h'. In this first loop, we gather the + # 'h' of all folders, so that in the next loop we can tell if we are + # processing the root folder by checking that its 'p' is not a known + # folder's 'h'. + folder_ids = set() + for node in nodes: + if node['t'] == NODE_TYPE_DIR: + folder_ids.add(node['h']) + + import_list = [] + for node in nodes: + k = node['k'].split(':')[1] + k = crypto.decrypt_key(crypto.base64_to_a32(k), folder_key) + new_k = crypto.a32_to_base64(crypto.encrypt_key(k, self.master_key)) + + node_import_args = { + 'h': node['h'], + 'k': new_k, + 't': node['t'], + } + + if node['p'] not in folder_ids: + # This is the root public folder. + if dest_name is not None: + new_a = {'n': dest_name} + new_a = crypto.base64_url_encode(crypto.encrypt_attr(new_a, k)) + node_import_args['a'] = new_a + else: + node_import_args['a'] = node['a'] + + # The root should not have a 'p' argument. + + else: + node_import_args['a'] = node['a'] + node_import_args['p'] = node['p'] + + import_list.append(node_import_args) + + request = { + 'a': 'p', + 't': dest_node, + 'n': import_list, + 'v': 3, + 'i': self.request_id, + 'sm': 1, + } + return RequestDraft(request, self.final_import_public_folder) + + def final_import_public_folder(self, response): + return response + + def import_public_folder(self, *args, **kwargs): + return self._api_request(self.draft_import_public_folder(*args, **kwargs)) + + # IMPORT PUBLIC URL ############################################################################ + + def draft_import_public_url(self, url, dest_node=None, dest_name=None): + (public_handle, decryption_key) = self.parse_url(url) + if '/#F!' in url: + return self.draft_import_public_folder( + public_handle, + decryption_key, + dest_node=dest_node, + dest_name=dest_name + ) + else: + return self.draft_import_public_file( + public_handle, + decryption_key, + dest_node=dest_node, + dest_name=dest_name + ) + + def final_import_public_url(self, response): + return response + + def import_public_url(self, *args, **kwargs): + ''' + Import the public url into user account + ''' + return self._api_request(self.draft_import_public_url(*args, **kwargs)) + + # MOVE FILES ################################################################################### + + def draft_move(self, file_id, target): + if isinstance(target, int): + target_node_id = str(self.get_node_by_type(target)[0]) + + elif isinstance(target, str): + target_node_id = target + + elif isinstance(target, dict): + target_node_id = target['h'] + + elif isinstance(target, tuple): + target_node_id = target[1]['h'] + + else: + raise TypeError(target) + + request = { + 'a': 'm', + 'n': file_id, + 't': target_node_id, + 'i': self.request_id + } + return RequestDraft(request, self.final_move) + + def final_move(self, response): + return response + + def move(self, *args, **kwargs): + ''' + Move a file to another parent node + + Params: + file_id: the file to move. + target: a node's id string, or one of the special nodes + e.g. NODE_TYPE_TRASH, or the structure returned by find(). + ''' + return self._api_request(self.draft_move(*args, **kwargs)) + + # RECYCLE FILE ################################################################################# + + def draft_recycle_file(self, file_id): + return self.draft_move(file_id, self._trash_folder_node_id) + + def final_recycle_file(response): + return response + + def recycle_file(self, *args, **kwargs): + ''' + Move a file to the rubbish bin by its file id. + ''' + return self._api_request(self.draft_recycle_file(*args, **kwargs)) + + def recycle_url(self, url): + ''' + Move a file to the rubbish bin by its public url. + Because this relies on the get_id_from_public_handle endpoint to + work, this method does not offer drafts. + ''' + (public_handle, decryption_key) = self.parse_url(url) + file_id = self.get_id_from_public_handle(public_handle) + return self.move(file_id, self._trash_folder_node_id) + + def recycle_urls(self, urls): + ''' + Move multiple files to the rubbish bin by their public urls. + Because this relies on the get_id_from_public_handle endpoint to + work, this method does not offer drafts and will take care of + batching by itself. + ''' + parseds = [self.parse_url(url) for url in urls] + requests = [self.draft_get_id_from_public_handle(handle) for (handle, key) in parseds] + file_ids = self._api_request(requests) + requests = [self.draft_move(file_id) for file_id in file_ids] + return self._api_request(requests) + + # RENAME FILE ################################################################################## + + def draft_rename(self, file, new_name): + file = file[1] + # create new attribs + attribs = {'n': new_name} + # encrypt attribs + encrypt_attribs = crypto.base64_url_encode(crypto.encrypt_attr(attribs, file['k'])) + encrypted_key = crypto.a32_to_base64( + crypto.encrypt_key(file['key'], self.master_key) + ) + # update attributes + request = { + 'a': 'a', + 'attr': encrypt_attribs, + 'key': encrypted_key, + 'n': file['h'], + 'i': self.request_id + } + return RequestDraft(request, self.final_rename) + + def final_rename(self, response): + return response + + def rename(self, *args, **kwargs): + return self._api_request(self.draft_rename(*args, **kwargs)) + + # UPLOAD ####################################################################################### + def upload(self, filename, dest=None, dest_filename=None): # determine storage node if dest is None: @@ -918,313 +1454,3 @@ class Mega: data = self._api_request(request) logger.info('Upload complete') return data - - def _mkdir(self, name, parent_node_id): - # generate random aes key (128) for folder - ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)] - - # encrypt attribs - attribs = {'n': name} - encrypt_attribs = crypto.base64_url_encode(crypto.encrypt_attr(attribs, ul_key[:4])) - encrypted_key = crypto.a32_to_base64(crypto.encrypt_key(ul_key[:4], self.master_key)) - - # update attributes - request = { - 'a': 'p', - 't': parent_node_id, - 'n': [ - { - 'h': 'xxxxxxxx', - 't': NODE_TYPE_DIR, - 'a': encrypt_attribs, - 'k': encrypted_key - } - ], - 'i': self.request_id - } - data = self._api_request(request) - return data - - def _root_node_id(self): - if not hasattr(self, 'root_id'): - self.get_files() - return self.root_id - - def create_folder(self, name, dest=None): - dirs = tuple(dir_name for dir_name in str(name).split('/') if dir_name) - folder_node_ids = {} - for idx, directory_name in enumerate(dirs): - existing_node_id = self.find_path_descriptor(directory_name) - if existing_node_id: - folder_node_ids[idx] = existing_node_id - continue - if idx == 0: - if dest is None: - parent_node_id = self._root_node_id() - else: - parent_node_id = dest - else: - parent_node_id = folder_node_ids[idx - 1] - created_node = self._mkdir(name=directory_name, - parent_node_id=parent_node_id) - node_id = created_node['f'][0]['h'] - folder_node_ids[idx] = node_id - return dict(zip(dirs, folder_node_ids.values())) - - def rename(self, file, new_name): - file = file[1] - # create new attribs - attribs = {'n': new_name} - # encrypt attribs - encrypt_attribs = crypto.base64_url_encode(crypto.encrypt_attr(attribs, file['k'])) - encrypted_key = crypto.a32_to_base64( - crypto.encrypt_key(file['key'], self.master_key) - ) - # update attributes - request = { - 'a': 'a', - 'attr': encrypt_attribs, - 'key': encrypted_key, - 'n': file['h'], - 'i': self.request_id - } - return self._api_request(request) - - def move(self, file_id, target): - """ - Move a file to another parent node - - Params: - file_id: the file to move. - target: a node's id string, or one of the special nodes - e.g. NODE_TYPE_TRASH, or the structure returned by find(). - """ - if isinstance(target, int): - target_node_id = str(self.get_node_by_type(target)[0]) - - elif isinstance(target, str): - target_node_id = target - - elif isinstance(target, dict): - target_node_id = target['h'] - - elif isinstance(target, tuple): - target_node_id = target[1]['h'] - - else: - raise TypeError(target) - - request = { - 'a': 'm', - 'n': file_id, - 't': target_node_id, - 'i': self.request_id - } - return self._api_request(request) - - def add_contact(self, email): - """ - Add another user to your mega contact list - """ - return self._edit_contact(email, True) - - def remove_contact(self, email): - """ - Remove a user to your mega contact list - """ - return self._edit_contact(email, False) - - def _edit_contact(self, email, add): - """ - Editing contacts - """ - if add is True: - l = '1' # add command - elif add is False: - l = '0' # remove command - else: - raise errors.ValidationError('add parameter must be of type bool') - - if not re.match(r"[^@]+@[^@]+\.[^@]+", email): - raise errors.ValidationError('add_contact requires a valid email address') - else: - request = { - 'a': 'ur', - 'u': email, - 'l': l, - 'i': self.request_id - } - return self._api_request(request) - - def get_public_url_info(self, url): - """ - Dispatch to get_public_folder_info and get_public_file_info. - """ - (public_handle, decryption_key) = self._parse_url(url) - if '/#F!' in url: - return self.get_public_folder_info(public_handle, decryption_key) - else: - return self.get_public_file_info(public_handle, decryption_key) - - def import_public_url(self, url, dest_node=None, dest_name=None): - """ - Import the public url into user account - """ - (public_handle, decryption_key) = self._parse_url(url) - if '/#F!' in url: - return self.import_public_folder( - public_handle, - decryption_key, - dest_node=dest_node, - dest_name=dest_name - ) - else: - return self.import_public_file( - public_handle, - decryption_key, - dest_node=dest_node, - dest_name=dest_name - ) - - def get_public_folder_files(self, folder_handle): - # At the moment, the returned files will not have a decrypted 'a'. - # TODO: cross-reference process_files code and figure out how to - # decrypt them - return self.get_files(public_folder_handle=folder_handle) - - def get_public_folder_info(self, folder_handle, folder_key): - """ - Get the total size of a public folder. - """ - # At the moment, the key is not actually needed. However if we decide - # to extract more statistics, then we may need it and I'd rather not - # change the function interface when that happens. So let's just take - # the key now even though it does nothing. - files = self.get_public_folder_files(folder_handle).values() - size = sum(file['s'] for file in files if file['t'] == NODE_TYPE_FILE) - return {'size': size} - - def import_public_folder( - self, folder_handle, folder_key, dest_node=None, dest_name=None - ): - if dest_node is None: - dest_node = self.get_node_by_type(NODE_TYPE_ROOT)[1]['h'] - elif isinstance(dest_node, int): - dest_node = self.get_node_by_type(dest_node)[1] - elif isinstance(dest_node, dict): - dest_node = dest_node['h'] - elif isinstance(dest_node, str): - pass - else: - raise TypeError(f'Invalid dest_node {dest_node}.') - - folder_key = crypto.base64_to_a32(folder_key) - - nodes = self.get_public_folder_files(folder_handle) - - # For all files and folders in the public folder, their 'p' will - # correspond to the 'h' of either the public folder, or some nested - # folder within. But, the public folder itself will have a 'p' that - # does not correspond to any 'h'. In this first loop, we gather the - # 'h' of all folders, so that in the next loop we can tell if we are - # processing the root folder by checking that its 'p' is not a known - # folder's 'h'. - folder_ids = set() - for node in nodes: - if node['t'] == NODE_TYPE_DIR: - folder_ids.add(node['h']) - - import_list = [] - for node in nodes: - k = node['k'].split(':')[1] - k = crypto.decrypt_key(crypto.base64_to_a32(k), folder_key) - new_k = crypto.a32_to_base64(crypto.encrypt_key(k, self.master_key)) - - node_import_args = { - 'h': node['h'], - 'k': new_k, - 't': node['t'], - } - - if node['p'] not in folder_ids: - # This is the root public folder. - if dest_name is not None: - new_a = {'n': dest_name} - new_a = crypto.base64_url_encode(crypto.encrypt_attr(new_a, k)) - node_import_args['a'] = new_a - else: - node_import_args['a'] = node['a'] - - # The root should not have a 'p' argument. - - else: - node_import_args['a'] = node['a'] - node_import_args['p'] = node['p'] - - import_list.append(node_import_args) - - request = { - 'a': 'p', - 't': dest_node, - 'n': import_list, - 'v': 3, - 'i': self.request_id, - 'sm': 1, - } - return self._api_request(request) - - def get_public_file_info(self, file_handle, file_key): - """ - Get size and name of a public file. - """ - data = self._api_request({'a': 'g', 'p': file_handle, 'ssm': 1}) - if isinstance(data, int): - raise errors.RequestError(data) - - if 'at' not in data or 's' not in data: - raise ValueError("Unexpected result", data) - - key = crypto.base64_to_a32(file_key) - k = crypto.interleave_xor_8(key) - - size = data['s'] - unencrypted_attrs = crypto.decrypt_attr(crypto.base64_url_decode(data['at']), k) - if not unencrypted_attrs: - return None - result = {'size': size, 'name': unencrypted_attrs['n']} - return result - - def import_public_file(self, - file_handle, - file_key, - dest_node=None, - dest_name=None): - """ - Import the public file into user account - """ - # Providing dest_node spare an API call to retrieve it. - if dest_node is None: - dest_node = self.get_node_by_type(NODE_TYPE_ROOT)[1] - - # Providing dest_name spares an API call to retrieve it. - if dest_name is None: - pl_info = self.get_public_file_info(file_handle, file_key) - dest_name = pl_info['name'] - - key = crypto.base64_to_a32(file_key) - k = crypto.interleave_xor_8(key) - encrypted_key = crypto.a32_to_base64(crypto.encrypt_key(key, self.master_key)) - encrypted_name = crypto.base64_url_encode(crypto.encrypt_attr({'n': dest_name}, k)) - request = { - 'a': 'p', - 't': dest_node['h'], - 'n': [ - { - 'ph': file_handle, - 't': NODE_TYPE_FILE, - 'a': encrypted_name, - 'k': encrypted_key - } - ] - } - return self._api_request(request)