Adds export method, fixes login for v2 accounts, rm get_contacts() method

This commit is contained in:
Richard O'Dwyer 2019-10-22 22:39:42 +01:00
parent d9a77e5fd9
commit 93f3e6d376
6 changed files with 301 additions and 153 deletions

2
.gitignore vendored
View file

@ -5,7 +5,7 @@
#################
## Eclipse
#################
.env.fish
*.pydevproject
.project
.metadata

View file

@ -6,7 +6,9 @@ Release History
0.9.21 (unreleased)
+++++++++++++++++++
- Nothing changed yet.
- Removes broken method ``get_contacts()``.
- Adds support for login with a v2 Mega user account.
- Adds ``export()`` method to share a file or folder, returning public share URL with key.
0.9.20 (2019-10-17)

View file

@ -13,3 +13,4 @@ zest.releaser
setuptools
twine
wheel
rope

View file

@ -0,0 +1,19 @@
[{
"a": "log",
"e": 99635
}, {
"a": "s2",
"n": "oUchHSzJ",
"s": [{
"u": "EXP",
"r": 0
}],
"i": "q6CpLxTDNV",
"ok": "uGuwXS80VfifU1hcLlKcrQ",
"ha": "n6dI3_qVste9XPma5fMIKQ",
"cr": [
["oUchHSzJ"],
["oUchHSzJ"],
[0, 0, "ff5m7sr6LZlYyRtiEE9EZA"]
]
}]

View file

@ -1,5 +1,7 @@
import re
import json
import secrets
import hashlib
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Util import Counter
@ -41,10 +43,27 @@ class Mega(object):
return self
def _login_user(self, email, password):
password_aes = prepare_key(str_to_a32(password))
uh = stringhash(email, password_aes)
resp = self._api_request({'a': 'us', 'user': email, 'uh': uh})
# if numeric error code response
email = email.lower()
get_user_salt_resp = self._api_request({'a': 'us0', 'user': email})
user_salt = None
try:
user_salt = base64_to_a32(get_user_salt_resp['s'])
except KeyError:
# v1 user account
password_aes = prepare_key(str_to_a32(password))
user_hash = stringhash(email, password_aes)
else:
# v2 user account
pbkdf2_key = hashlib.pbkdf2_hmac(
hash_name='sha512',
password=password.encode(),
salt=a32_to_str(user_salt),
iterations=100000,
dklen=32
)
password_aes = str_to_a32(pbkdf2_key[:16])
user_hash = base64_url_encode(pbkdf2_key[-16:])
resp = self._api_request({'a': 'us', 'user': email, 'uh': user_hash})
if isinstance(resp, int):
raise RequestError(resp)
self._login_process(resp, password_aes)
@ -70,7 +89,6 @@ class Mega(object):
)
resp = self._api_request({'a': 'us', 'user': user})
# if numeric error code response
if isinstance(resp, int):
raise RequestError(resp)
self._login_process(resp, password_key)
@ -113,7 +131,6 @@ class Mega(object):
self.rsa_private_key[1]
)
)
sid = '%x' % rsa_decrypter.key._decrypt(encrypted_sid)
sid = binascii.unhexlify('0' + sid if len(sid) % 2 else sid)
self.sid = base64_url_encode(sid[:43])
@ -135,20 +152,8 @@ class Mega(object):
params=params,
data=json.dumps(data),
timeout=self.timeout,
headers={
'Origin':
'https://mega.nz',
'Referer':
'https://mega.nz/login',
'User-Agent': (
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:69.0) '
'Gecko/20100101 Firefox/69.0'
),
}
)
json_resp = json.loads(req.text)
# if numeric error code response
if isinstance(json_resp, int):
raise RequestError(json_resp)
return json_resp[0]
@ -163,9 +168,6 @@ class Mega(object):
raise RequestError('Url key missing')
def _process_file(self, file, shared_keys):
"""
Process a file
"""
if file['t'] == 0 or file['t'] == 1:
keys = dict(
keypart.split(':', 1)
@ -194,6 +196,13 @@ class Mega(object):
key = keys[hkey]
key = decrypt_key(base64_to_a32(key), shared_key)
break
if file['h'] and file['h'] in shared_keys.get('EXP', ()):
shared_key = shared_keys['EXP'][file['h']]
encrypted_key = str_to_a32(
base64_url_decode(file['k'].split(':')[-1])
)
key = decrypt_key(encrypted_key, shared_key)
file['shared_folder_key'] = shared_key
if key is not None:
# file
if file['t'] == 0:
@ -244,9 +253,7 @@ class Mega(object):
shared_keys[s_item['u']] = {}
if s_item['h'] in ok_dict:
shared_keys[s_item['u']][s_item['h']] = ok_dict[s_item['h']]
##########################################################################
# GET
self.shared_keys = shared_keys
def find_path_descriptor(self, path):
"""
@ -264,8 +271,11 @@ class Mega(object):
for foldername in paths:
if foldername != '':
for file in files.items():
if file[1]['a'] and file[1]['t'] and \
file[1]['a']['n'] == foldername:
if (
file[1]['a'] and
file[1]['t'] and
file[1]['a']['n'] == foldername
):
if parent_desc == file[1]['p']:
parent_desc = file[0]
found = True
@ -275,22 +285,40 @@ class Mega(object):
return None
return parent_desc
def find(self, filename):
def find(self, filename=None, handle=None):
"""
Return file object from given filename
"""
from pathlib import Path
path = Path(filename)
filename = path.name
files = self.get_files()
parent_dir_name = path.parent.name
for file in list(files.items()):
if not isinstance(file[1]['a'], dict):
continue
if file[1]['a'] and file[1]['a']['n'] == filename:
parent_node_id = None
if parent_dir_name:
parent_node_id = self.find_path_descriptor(parent_dir_name)
if (
filename and parent_node_id and
file[1]['a'] and file[1]['a']['n'] == filename
and parent_node_id == file[1]['p']
):
return file
# if not isinstance(file[1]['a'], dict):
# continue
if (
filename and
file[1]['a'] and file[1]['a']['n'] == filename
):
return file
if handle and file[1]['h'] == handle:
return file
def get_files(self):
"""
Get all files in account
"""
files = self._api_request({'a': 'f', 'c': 1})
files = self._api_request({'a': 'f', 'c': 1, 'r': 1})
files_dict = {}
shared_keys = {}
self._init_shared_keys(files, shared_keys)
@ -341,6 +369,31 @@ class Mega(object):
else:
raise ValidationError('File id and key must be present')
def _node_data(self, node):
try:
return node[1]
except (IndexError, KeyError):
return node
def get_folder_link(self, file):
try:
file = file[1]
except (IndexError, KeyError):
pass
if 'h' in file and 'k' in file:
public_handle = self._api_request({'a': 'l', 'n': file['h']})
if public_handle == -11:
raise RequestError(
"Can't get a public link from that file "
"(is this a shared file?)"
)
decrypted_key = a32_to_base64(file['shared_folder_key'])
return '{0}://{1}/#F!{2}!{3}'.format(
self.schema, self.domain, public_handle, decrypted_key
)
else:
raise ValidationError('File id and key must be present')
def get_user(self):
user_data = self._api_request({'a': 'ug'})
return user_data
@ -442,8 +495,6 @@ class Mega(object):
if 'balance' in user_data:
return user_data['balance']
##########################################################################
# DELETE
def delete(self, public_handle):
"""
Delete a file by its public handle
@ -491,8 +542,6 @@ class Mega(object):
post_list.append({"a": "d", "n": file, "i": self.request_id})
return self._api_request(post_list)
##########################################################################
# DOWNLOAD
def download(self, file, dest_path=None, dest_filename=None):
"""
Download a file by it's file object
@ -506,6 +555,68 @@ class Mega(object):
is_public=False
)
def _export_file(self, node):
self._api_request([
{
'a': 'l',
'n': node[1]['h'],
'i': self.request_id
}
])
return self.get_link(node)
def export(self, path):
self.get_files()
folder = self.find(path)
if folder[1]['t'] == 0:
return self._export_file(folder)
if folder:
try:
# If already exported
return self.get_folder_link(folder)
except (RequestError, KeyError):
pass
user_id = folder[1]['u']
user_pub_key = self._api_request({'a': 'uk', 'u': user_id})['pubk']
node_key = folder[1]['k']
master_key_cipher = AES.new(a32_to_str(self.master_key), AES.MODE_ECB)
ha = base64_url_encode(
master_key_cipher.encrypt(folder[1]['h'] + folder[1]['h'])
)
share_key = secrets.token_bytes(16)
ok = base64_url_encode(master_key_cipher.encrypt(share_key))
share_key_cipher = AES.new(share_key, AES.MODE_ECB)
encrypted_node_key = base64_url_encode(
share_key_cipher.encrypt(a32_to_str(node_key))
)
node_id = folder[1]['h']
request_body = [
{
'a': 's2',
'n': node_id,
's': [{
'u': 'EXP',
'r': 0
}],
'i': self.request_id,
'ok': ok,
'ha': ha,
'cr': [
[node_id],
[node_id],
[0, 0, encrypted_node_key]
]
}]
self._api_request(request_body)
nodes = self.get_files()
link = self.get_folder_link(nodes[node_id])
return link
def download_url(self, url, dest_path=None, dest_filename=None):
"""
Download a file by it's public url
@ -631,8 +742,6 @@ class Mega(object):
shutil.move(temp_output_file.name, dest_path + file_name)
##########################################################################
# UPLOAD
def upload(self, filename, dest=None, dest_filename=None):
# determine storage node
if dest is None:
@ -767,10 +876,8 @@ class Mega(object):
# update attributes
data = self._api_request(
{
'a':
'p',
't':
dest,
'a':'p',
't': dest,
'n': [
{
'h': 'xxxxxxxx',
@ -779,8 +886,7 @@ class Mega(object):
'k': encrypted_key
}
],
'i':
self.request_id
'i': self.request_id
}
)
return data
@ -881,21 +987,6 @@ class Mega(object):
}
)
def get_contacts(self):
raise NotImplementedError()
# TODO implement this
# sn param below = maxaction var with function getsc() in mega.co.nz js
# seens to be the 'sn' attrib of the previous request response...
# requests goto /sc rather than
# req = requests.post(
# '{0}://g.api.{1}/sc'.format(self.schema, self.domain),
# params={'sn': 'ZMxcQ_DmHnM', 'ssl': '1'},
# data=json.dumps(None),
# timeout=self.timeout)
# json_resp = json.loads(req.text)
# print json_resp
def get_public_url_info(self, url):
"""
Get size and name from a public url, dict returned
@ -917,8 +1008,6 @@ class Mega(object):
Get size and name of a public file
"""
data = self._api_request({'a': 'g', 'p': file_handle, 'ssm': 1})
# if numeric error code response
if isinstance(data, int):
raise RequestError(data)

View file

@ -1,21 +1,10 @@
"""
These unit tests will upload a test file,a test folder and a test contact,
Perform api operations on them,
And them remove them from your account.
"""
import unittest
import random
import os
import pytest
from mega import Mega
mega = Mega()
# anonymous login
m = mega.login()
# normal login
# m = mega.login(email, password)
FIND_RESP = None
TEST_CONTACT = 'test@mega.co.nz'
TEST_PUBLIC_URL = (
'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps'
@ -24,83 +13,131 @@ TEST_FILE = os.path.basename(__file__)
TEST_FOLDER = 'mega.py_testfolder_{0}'.format(random.random())
class TestMega(unittest.TestCase):
def test_mega(self):
self.assertIsInstance(mega, Mega)
def test_login(self):
self.assertIsInstance(mega, Mega)
def test_get_user(self):
resp = m.get_user()
self.assertIsInstance(resp, dict)
def test_get_quota(self):
resp = m.get_quota()
self.assertIsInstance(int(resp), int)
def test_get_storage_space(self):
resp = m.get_storage_space(mega=True)
self.assertIsInstance(resp, dict)
def test_get_files(self):
files = m.get_files()
self.assertIsInstance(files, dict)
def test_get_link(self):
file = m.find(TEST_FILE)
if file:
link = m.get_link(file)
self.assertIsInstance(link, str)
def test_import_public_url(self):
resp = m.import_public_url(TEST_PUBLIC_URL)
file_handle = m.get_id_from_obj(resp)
resp = m.destroy(file_handle)
self.assertIsInstance(resp, int)
def test_create_folder(self):
resp = m.create_folder(TEST_FOLDER)
self.assertIsInstance(resp, dict)
def test_rename(self):
file = m.find(TEST_FOLDER)
if file:
resp = m.rename(file, TEST_FOLDER)
self.assertIsInstance(resp, int)
def test_delete_folder(self):
folder_node = m.find(TEST_FOLDER)[0]
resp = m.delete(folder_node)
self.assertIsInstance(resp, int)
def test_delete(self):
file = m.find(TEST_FILE)
if file:
resp = m.delete(file[0])
self.assertIsInstance(resp, int)
def test_destroy(self):
file = m.find(TEST_FILE)
if file:
resp = m.destroy(file[0])
self.assertIsInstance(resp, int)
def test_empty_trash(self):
# resp None if already empty, else int
resp = m.empty_trash()
if resp is not None:
self.assertIsInstance(resp, int)
def test_add_contact(self):
resp = m.add_contact(TEST_CONTACT)
self.assertIsInstance(resp, int)
def test_remove_contact(self):
resp = m.remove_contact(TEST_CONTACT)
self.assertIsInstance(resp, int)
@pytest.fixture
def mega():
mega_ = Mega()
mega_.login(email=os.environ['EMAIL'], password=os.environ['PASS'])
node = mega_.create_folder(TEST_FOLDER)
yield mega_
node_id = node['f'][0]['h']
mega_.destroy(node_id)
if __name__ == '__main__':
unittest.main()
def test_mega(mega):
assert isinstance(mega, Mega)
def test_login(mega):
assert isinstance(mega, Mega)
def test_get_user(mega):
resp = mega.get_user()
assert isinstance(resp, dict)
def test_get_quota(mega):
resp = mega.get_quota()
assert isinstance(int(resp), int)
def test_get_storage_space(mega):
resp = mega.get_storage_space(mega=True)
assert isinstance(resp, dict)
def test_get_files(mega):
files = mega.get_files()
assert isinstance(files, dict)
def test_get_link(mega):
file = mega.find(TEST_FILE)
if file:
link = mega.get_link(file)
assert isinstance(link, str)
class TestExport:
def test_export_folder(self, mega):
public_url = None
for _ in range(2):
result_public_share_url = mega.export(TEST_FOLDER)
if not public_url:
public_url = result_public_share_url
assert result_public_share_url.startswith('https://mega.co.nz/#F!')
assert result_public_share_url == public_url
def test_export_single_file(self, mega):
# Upload a single file into a folder
folder = mega.find(TEST_FOLDER)
dest_node_id = folder[1]['h']
result = mega.upload(
__file__, dest=dest_node_id, dest_filename='test.py'
)
path = f'{TEST_FOLDER}/test.py'
assert mega.find(path)
for _ in range(2):
result_public_share_url = mega.export(path)
assert result_public_share_url.startswith('https://mega.co.nz/#!')
def test_import_public_url(mega):
resp = mega.import_public_url(TEST_PUBLIC_URL)
file_handle = mega.get_id_from_obj(resp)
resp = mega.destroy(file_handle)
assert isinstance(resp, int)
def test_create_folder(mega):
resp = mega.create_folder(TEST_FOLDER)
assert isinstance(resp, dict)
def test_rename(mega):
file = mega.find(TEST_FOLDER)
if file:
resp = mega.rename(file, TEST_FOLDER)
assert isinstance(resp, int)
def test_delete_folder(mega):
folder_node = mega.find(TEST_FOLDER)[0]
resp = mega.delete(folder_node)
assert isinstance(resp, int)
def test_delete(mega):
file = mega.find(TEST_FILE)
if file:
resp = mega.delete(file[0])
assert isinstance(resp, int)
def test_destroy(mega):
file = mega.find(TEST_FILE)
if file:
resp = mega.destroy(file[0])
assert isinstance(resp, int)
def test_empty_trash(mega):
# resp None if already empty, else int
resp = mega.empty_trash()
if resp is not None:
assert isinstance(resp, int)
def test_add_contact(mega):
resp = mega.add_contact(TEST_CONTACT)
assert isinstance(resp, int)
def test_remove_contact(mega):
resp = mega.remove_contact(TEST_CONTACT)
assert isinstance(resp, int)