various fixes

This commit is contained in:
Richard O'Dwyer 2019-10-16 21:20:22 +01:00
parent 52471551a8
commit 00c2fb2192
12 changed files with 390 additions and 203 deletions

1
.gitignore vendored
View file

@ -10,3 +10,4 @@ build/
dist/
.pypirc
MANIFEST
.vscode/

View file

@ -1,5 +1,5 @@
language: python
python:
- "2.7"
install: pip install -r requirements.txt
script: python tests/unit-tests.py
install: pip install -r requirements-dev.txt
script: python setup.py install && pytest tests/unit-tests.py -x -s -v

View file

@ -1,3 +1,4 @@
import os
from mega import Mega
@ -8,12 +9,12 @@ def test():
see readme.md for more information
"""
#user details
email = 'your@email.com'
password = 'password'
# user details
email = os.environ['EMAIL']
password = os.environ['PASS']
mega = Mega()
#mega = Mega({'verbose': True}) # verbose option for print output
# mega = Mega({'verbose': True}) # verbose option for print output
# login
m = mega.login(email, password)
@ -47,16 +48,17 @@ def test():
# download file. by file object or url
print m.download(file, '/tmp')
#m.download_url(link)
# m.download_url(link)
#delete or destroy file. by id or url
# delete or destroy file. by id or url
print(m.delete(file[0]))
#print(m.destroy(file[0]))
#print(m.delete_url(link))
#print(m.destroy_url(link))
# print(m.destroy(file[0]))
# print(m.delete_url(link))
# print(m.destroy_url(link))
# empty trash
print(m.empty_trash())
if __name__ == '__main__':
test()

View file

@ -1 +1 @@
from .mega import Mega
from .mega import Mega # noqa

View file

@ -48,14 +48,14 @@ def prepare_key(arr):
def encrypt_key(a, key):
return sum(
(aes_cbc_encrypt_a32(a[i:i + 4], key)
for i in range(0, len(a), 4)), ())
(aes_cbc_encrypt_a32(a[i:i + 4], key) for i in range(0, len(a), 4)), ()
)
def decrypt_key(a, key):
return sum(
(aes_cbc_decrypt_a32(a[i:i + 4], key)
for i in range(0, len(a), 4)), ())
(aes_cbc_decrypt_a32(a[i:i + 4], key) for i in range(0, len(a), 4)), ()
)
def encrypt_attr(attr, key):
@ -86,7 +86,7 @@ def mpi_to_int(s):
def base64_url_decode(data):
data += '=='[(2 - len(data) * 3) % 4:]
data += '==' [(2 - len(data) * 3) % 4:]
for search, replace in (('-', '+'), ('_', '/'), (',', '')):
data = data.replace(search, replace)
return base64.b64decode(data)
@ -110,12 +110,12 @@ def a32_to_base64(a):
def get_chunks(size):
p = 0
s = 0x20000
while p+s < size:
yield(p, s)
while p + s < size:
yield (p, s)
p += s
if s < 0x100000:
s += 0x20000
yield(p, size-p)
yield (p, size - p)
# more general functions

View file

@ -9,6 +9,5 @@ class RequestError(Exception):
"""
Error in API request
"""
#TODO add error response messages
# TODO add error response messages
pass

View file

@ -9,7 +9,11 @@ import binascii
import requests
import shutil
from .errors import ValidationError, RequestError
from .crypto import *
from .crypto import (
a32_to_base64, encrypt_key, base64_url_encode, encrypt_attr, base64_to_a32,
base64_url_decode, decrypt_attr, a32_to_str, get_chunks, str_to_a32,
decrypt_key, mpi_to_int, stringhash, prepare_key, make_id,
)
import tempfile
@ -17,7 +21,7 @@ class Mega(object):
def __init__(self, options=None):
self.schema = 'https'
self.domain = 'mega.co.nz'
self.timeout = 160 # max time (secs) to wait for resp from api requests
self.timeout = 160 # max secs to wait for resp from api requests
self.sid = None
self.sequence_num = random.randint(0, 0xFFFFFFFF)
self.request_id = make_id(10)
@ -37,7 +41,7 @@ class Mega(object):
password_aes = prepare_key(str_to_a32(password))
uh = stringhash(email, password_aes)
resp = self._api_request({'a': 'us', 'user': email, 'uh': uh})
#if numeric error code response
# if numeric error code response
if isinstance(resp, int):
raise RequestError(resp)
self._login_process(resp, password_aes)
@ -47,15 +51,23 @@ class Mega(object):
password_key = [random.randint(0, 0xFFFFFFFF)] * 4
session_self_challenge = [random.randint(0, 0xFFFFFFFF)] * 4
user = self._api_request({
'a': 'up',
'k': a32_to_base64(encrypt_key(master_key, password_key)),
'ts': base64_url_encode(a32_to_str(session_self_challenge) +
a32_to_str(encrypt_key(session_self_challenge, master_key)))
})
user = self._api_request(
{
'a':
'up',
'k':
a32_to_base64(encrypt_key(master_key, password_key)),
'ts':
base64_url_encode(
a32_to_str(session_self_challenge) + a32_to_str(
encrypt_key(session_self_challenge, master_key)
)
)
}
)
resp = self._api_request({'a': 'us', 'user': user})
#if numeric error code response
# if numeric error code response
if isinstance(resp, int):
raise RequestError(resp)
self._login_process(resp, password_key)
@ -66,27 +78,34 @@ class Mega(object):
if 'tsid' in resp:
tsid = base64_url_decode(resp['tsid'])
key_encrypted = a32_to_str(
encrypt_key(str_to_a32(tsid[:16]), self.master_key))
encrypt_key(str_to_a32(tsid[:16]), self.master_key)
)
if key_encrypted == tsid[-16:]:
self.sid = resp['tsid']
elif 'csid' in resp:
encrypted_rsa_private_key = base64_to_a32(resp['privk'])
rsa_private_key = decrypt_key(encrypted_rsa_private_key,
self.master_key)
rsa_private_key = decrypt_key(
encrypted_rsa_private_key, self.master_key
)
private_key = a32_to_str(rsa_private_key)
self.rsa_private_key = [0, 0, 0, 0]
for i in range(4):
l = ((ord(private_key[0]) * 256 + ord(private_key[1]) + 7) / 8) + 2
l = (
(ord(private_key[0]) * 256 + ord(private_key[1]) + 7) / 8
) + 2
self.rsa_private_key[i] = mpi_to_int(private_key[:l])
private_key = private_key[l:]
encrypted_sid = mpi_to_int(base64_url_decode(resp['csid']))
rsa_decrypter = RSA.construct(
(self.rsa_private_key[0] * self.rsa_private_key[1],
0L, self.rsa_private_key[2], self.rsa_private_key[0],
self.rsa_private_key[1]))
(
self.rsa_private_key[0] * self.rsa_private_key[1], 0L,
self.rsa_private_key[2], self.rsa_private_key[0],
self.rsa_private_key[1]
)
)
sid = '%x' % rsa_decrypter.key._decrypt(encrypted_sid)
sid = binascii.unhexlify('0' + sid if len(sid) % 2 else sid)
@ -99,24 +118,36 @@ class Mega(object):
if self.sid:
params.update({'sid': self.sid})
#ensure input data is a list
# ensure input data is a list
if not isinstance(data, list):
data = [data]
url = '{0}://g.api.{1}/cs'.format(self.schema, self.domain)
req = requests.post(
'{0}://g.api.{1}/cs'.format(self.schema, self.domain),
url,
params=params,
data=json.dumps(data),
timeout=self.timeout)
timeout=self.timeout,
headers={
'Origin':
'https://mega.nz',
'Referer':
'https://mega.nz/login',
'User-Agent': (
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:69.0) '
'Gecko/20100101 Firefox/69.0'
),
}
)
json_resp = json.loads(req.text)
#if numeric error code response
# if numeric error code response
if isinstance(json_resp, int):
raise RequestError(json_resp)
return json_resp[0]
def _parse_url(self, url):
#parse file id and key from url
# parse file id and key from url
if '!' in url:
match = re.findall(r'/#!(.*)', url)
path = match[0]
@ -129,7 +160,11 @@ class Mega(object):
Process a file
"""
if file['t'] == 0 or file['t'] == 1:
keys = dict(keypart.split(':', 1) for keypart in file['k'].split('/') if ':' in keypart)
keys = dict(
keypart.split(':', 1)
for keypart in file['k'].split('/')
if ':' in keypart
)
uid = file['u']
key = None
# my objects
@ -137,7 +172,9 @@ class Mega(object):
key = decrypt_key(base64_to_a32(keys[uid]), self.master_key)
# shared folders
elif 'su' in file and 'sk' in file and ':' in file['k']:
shared_key = decrypt_key(base64_to_a32(file['sk']), self.master_key)
shared_key = decrypt_key(
base64_to_a32(file['sk']), self.master_key
)
key = decrypt_key(base64_to_a32(keys[file['h']]), shared_key)
if file['su'] not in shared_keys:
shared_keys[file['su']] = {}
@ -153,8 +190,10 @@ class Mega(object):
if key is not None:
# file
if file['t'] == 0:
k = (key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6],
key[3] ^ key[7])
k = (
key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6],
key[3] ^ key[7]
)
file['iv'] = key[4:6] + (0, 0)
file['meta_mac'] = key[6:8]
# folder
@ -189,7 +228,9 @@ class Mega(object):
"""
ok_dict = {}
for ok_item in files['ok']:
shared_key = decrypt_key(base64_to_a32(ok_item['k']), self.master_key)
shared_key = decrypt_key(
base64_to_a32(ok_item['k']), self.master_key
)
ok_dict[ok_item['h']] = shared_key
for s_item in files['s']:
if s_item['u'] not in shared_keys:
@ -233,6 +274,8 @@ class Mega(object):
"""
files = self.get_files()
for file in files.items():
if not isinstance(file[1]['a'], dict):
continue
if file[1]['a'] and file[1]['a']['n'] == filename:
return file
@ -246,7 +289,7 @@ class Mega(object):
self._init_shared_keys(files, shared_keys)
for file in files['f']:
processed_file = self._process_file(file, shared_keys)
#ensure each file has a name before returning
# ensure each file has a name before returning
if processed_file['a']:
files_dict[file['h']] = processed_file
return files_dict
@ -260,15 +303,17 @@ class Mega(object):
file = file['f'][0]
public_handle = self._api_request({'a': 'l', 'n': file['h']})
file_key = file['k'][file['k'].index(':') + 1:]
decrypted_key = a32_to_base64(decrypt_key(base64_to_a32(file_key),
self.master_key))
return '{0}://{1}/#!{2}!{3}'.format(self.schema,
self.domain,
public_handle,
decrypted_key)
decrypted_key = a32_to_base64(
decrypt_key(base64_to_a32(file_key), self.master_key)
)
return '{0}://{1}/#!{2}!{3}'.format(
self.schema, self.domain, public_handle, decrypted_key
)
else:
raise ValueError('''Upload() response required as input,
use get_link() for regular file input''')
raise ValueError(
'''Upload() response required as input,
use get_link() for regular file input'''
)
def get_link(self, file):
"""
@ -278,12 +323,14 @@ class Mega(object):
if 'h' in file and 'k' in file:
public_handle = self._api_request({'a': 'l', 'n': file['h']})
if public_handle == -11:
raise RequestError("Can't get a public link from that file (is this a shared file?)")
raise RequestError(
"Can't get a public link from that file "
"(is this a shared file?)"
)
decrypted_key = a32_to_base64(file['key'])
return '{0}://{1}/#!{2}!{3}'.format(self.schema,
self.domain,
public_handle,
decrypted_key)
return '{0}://{1}/#!{2}!{3}'.format(
self.schema, self.domain, public_handle, decrypted_key
)
else:
raise ValidationError('File id and key must be present')
@ -326,7 +373,7 @@ class Mega(object):
return files_dict
def get_id_from_public_handle(self, public_handle):
#get node data
# get node data
node_data = self._api_request({'a': 'f', 'f': 1, 'p': public_handle})
node_id = self.get_id_from_obj(node_data)
return node_id
@ -338,7 +385,7 @@ class Mega(object):
node_id = None
for i in node_data['f']:
if i['h'] is not u'':
if i['h'] != u'':
node_id = i['h']
return node_id
@ -346,8 +393,15 @@ class Mega(object):
"""
Get current remaining disk quota in MegaBytes
"""
json_resp = self._api_request({'a': 'uq', 'xfer': 1})
#convert bytes to megabyes
json_resp = self._api_request(
{
'a': 'uq',
'xfer': 1,
'strg': 1,
'v': 1
}
)
# convert bytes to megabyes
return json_resp['mstrg'] / 1048576
def get_storage_space(self, giga=False, mega=False, kilo=False):
@ -402,9 +456,13 @@ class Mega(object):
"""
Destroy a file by its private id
"""
return self._api_request({'a': 'd',
'n': file_id,
'i': self.request_id})
return self._api_request(
{
'a': 'd',
'n': file_id,
'i': self.request_id
}
)
def destroy_url(self, url):
"""
@ -423,9 +481,7 @@ class Mega(object):
if files != {}:
post_list = []
for file in files:
post_list.append({"a": "d",
"n": file,
"i": self.request_id})
post_list.append({"a": "d", "n": file, "i": self.request_id})
return self._api_request(post_list)
##########################################################################
@ -434,7 +490,14 @@ class Mega(object):
"""
Download a file by it's file object
"""
self._download_file(None, None, file=file[1], dest_path=dest_path, dest_filename=dest_filename, is_public=False)
self._download_file(
None,
None,
file=file[1],
dest_path=dest_path,
dest_filename=dest_filename,
is_public=False
)
def download_url(self, url, dest_path=None, dest_filename=None):
"""
@ -443,18 +506,42 @@ class Mega(object):
path = self._parse_url(url).split('!')
file_id = path[0]
file_key = path[1]
self._download_file(file_id, file_key, dest_path, dest_filename, is_public=True)
self._download_file(
file_id, file_key, dest_path, dest_filename, is_public=True
)
def _download_file(self, file_handle, file_key, dest_path=None, dest_filename=None, is_public=False, file=None):
def _download_file(
self,
file_handle,
file_key,
dest_path=None,
dest_filename=None,
is_public=False,
file=None
):
if file is None:
if is_public:
file_key = base64_to_a32(file_key)
file_data = self._api_request({'a': 'g', 'g': 1, 'p': file_handle})
file_data = self._api_request(
{
'a': 'g',
'g': 1,
'p': file_handle
}
)
else:
file_data = self._api_request({'a': 'g', 'g': 1, 'n': file_handle})
file_data = self._api_request(
{
'a': 'g',
'g': 1,
'n': file_handle
}
)
k = (file_key[0] ^ file_key[4], file_key[1] ^ file_key[5],
file_key[2] ^ file_key[6], file_key[3] ^ file_key[7])
k = (
file_key[0] ^ file_key[4], file_key[1] ^ file_key[5],
file_key[2] ^ file_key[6], file_key[3] ^ file_key[7]
)
iv = file_key[4:6] + (0, 0)
meta_mac = file_key[6:8]
else:
@ -485,11 +572,12 @@ class Mega(object):
else:
dest_path += '/'
temp_output_file = tempfile.NamedTemporaryFile(mode='w+b', prefix='megapy_', delete=False)
temp_output_file = tempfile.NamedTemporaryFile(
mode='w+b', prefix='megapy_', delete=False
)
k_str = a32_to_str(k)
counter = Counter.new(
128, initial_value=((iv[0] << 32) + iv[1]) << 64)
counter = Counter.new(128, initial_value=((iv[0] << 32) + iv[1]) << 64)
aes = AES.new(k_str, AES.MODE_CTR, counter=counter)
mac_str = '\0' * 16
@ -502,11 +590,11 @@ class Mega(object):
temp_output_file.write(chunk)
encryptor = AES.new(k_str, AES.MODE_CBC, iv_str)
for i in range(0, len(chunk)-16, 16):
for i in range(0, len(chunk) - 16, 16):
block = chunk[i:i + 16]
encryptor.encrypt(block)
#fix for files under 16 bytes failing
# fix for files under 16 bytes failing
if file_size > 16:
i += 16
else:
@ -520,7 +608,11 @@ class Mega(object):
if self.options.get('verbose') is True:
# temp file size
file_info = os.stat(temp_output_file.name)
print('{0} of {1} downloaded'.format(file_info.st_size, file_size))
print(
'{0} of {1} downloaded'.format(
file_info.st_size, file_size
)
)
file_mac = str_to_a32(mac_str)
@ -535,22 +627,24 @@ class Mega(object):
##########################################################################
# UPLOAD
def upload(self, filename, dest=None, dest_filename=None):
#determine storage node
# determine storage node
if dest is None:
#if none set, upload to cloud drive node
# if none set, upload to cloud drive node
if not hasattr(self, 'root_id'):
self.get_files()
dest = self.root_id
#request upload url, call 'u' method
# request upload url, call 'u' method
input_file = open(filename, 'rb')
file_size = os.path.getsize(filename)
ul_url = self._api_request({'a': 'u', 's': file_size})['p']
#generate random aes key (128) for file
# generate random aes key (128) for file
ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)]
k_str = a32_to_str(ul_key[:4])
count = Counter.new(128, initial_value=((ul_key[4] << 32) + ul_key[5]) << 64)
count = Counter.new(
128, initial_value=((ul_key[4] << 32) + ul_key[5]) << 64
)
aes = AES.new(k_str, AES.MODE_CTR, counter=count)
upload_progress = 0
@ -565,11 +659,11 @@ class Mega(object):
upload_progress += len(chunk)
encryptor = AES.new(k_str, AES.MODE_CBC, iv_str)
for i in range(0, len(chunk)-16, 16):
for i in range(0, len(chunk) - 16, 16):
block = chunk[i:i + 16]
encryptor.encrypt(block)
#fix for files under 16 bytes failing
# fix for files under 16 bytes failing
if file_size > 16:
i += 16
else:
@ -580,23 +674,31 @@ class Mega(object):
block += '\0' * (16 - len(block) % 16)
mac_str = mac_encryptor.encrypt(encryptor.encrypt(block))
#encrypt file and upload
# encrypt file and upload
chunk = aes.encrypt(chunk)
output_file = requests.post(ul_url + "/" + str(chunk_start),
data=chunk, timeout=self.timeout)
output_file = requests.post(
ul_url + "/" + str(chunk_start),
data=chunk,
timeout=self.timeout
)
completion_file_handle = output_file.text
if self.options.get('verbose') is True:
# upload progress
print('{0} of {1} uploaded'.format(upload_progress, file_size))
print(
'{0} of {1} uploaded'.format(
upload_progress, file_size
)
)
else:
output_file = requests.post(ul_url + "/0",
data='', timeout=self.timeout)
output_file = requests.post(
ul_url + "/0", data='', timeout=self.timeout
)
completion_file_handle = output_file.text
file_mac = str_to_a32(mac_str)
#determine meta mac
# determine meta mac
meta_mac = (file_mac[0] ^ file_mac[1], file_mac[2] ^ file_mac[3])
if dest_filename is not None:
@ -605,68 +707,92 @@ class Mega(object):
attribs = {'n': os.path.basename(filename)}
encrypt_attribs = base64_url_encode(encrypt_attr(attribs, ul_key[:4]))
key = [ul_key[0] ^ ul_key[4], ul_key[1] ^ ul_key[5],
ul_key[2] ^ meta_mac[0], ul_key[3] ^ meta_mac[1],
ul_key[4], ul_key[5], meta_mac[0], meta_mac[1]]
key = [
ul_key[0] ^ ul_key[4], ul_key[1] ^ ul_key[5],
ul_key[2] ^ meta_mac[0], ul_key[3] ^ meta_mac[1], ul_key[4],
ul_key[5], meta_mac[0], meta_mac[1]
]
encrypted_key = a32_to_base64(encrypt_key(key, self.master_key))
#update attributes
data = self._api_request({'a': 'p', 't': dest, 'n': [{
'h': completion_file_handle,
't': 0,
'a': encrypt_attribs,
'k': encrypted_key}]})
#close input file and return API msg
# update attributes
data = self._api_request(
{
'a':
'p',
't':
dest,
'n': [
{
'h': completion_file_handle,
't': 0,
'a': encrypt_attribs,
'k': encrypted_key
}
]
}
)
# close input file and return API msg
input_file.close()
return data
##########################################################################
# OTHER OPERATIONS
def create_folder(self, name, dest=None):
#determine storage node
# determine storage node
if dest is None:
#if none set, upload to cloud drive node
# if none set, upload to cloud drive node
if not hasattr(self, 'root_id'):
self.get_files()
dest = self.root_id
#generate random aes key (128) for folder
# generate random aes key (128) for folder
ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)]
#encrypt attribs
# encrypt attribs
attribs = {'n': name}
encrypt_attribs = base64_url_encode(encrypt_attr(attribs, ul_key[:4]))
encrypted_key = a32_to_base64(encrypt_key(ul_key[:4], self.master_key))
#update attributes
data = self._api_request({'a': 'p',
't': dest,
'n': [{
'h': 'xxxxxxxx',
't': 1,
'a': encrypt_attribs,
'k': encrypted_key}
],
'i': self.request_id})
#return API msg
# update attributes
data = self._api_request(
{
'a':
'p',
't':
dest,
'n': [
{
'h': 'xxxxxxxx',
't': 1,
'a': encrypt_attribs,
'k': encrypted_key
}
],
'i':
self.request_id
}
)
return data
def rename(self, file, new_name):
file = file[1]
#create new attribs
# create new attribs
attribs = {'n': new_name}
#encrypt attribs
# encrypt attribs
encrypt_attribs = base64_url_encode(encrypt_attr(attribs, file['k']))
encrypted_key = a32_to_base64(encrypt_key(file['key'], self.master_key))
encrypted_key = a32_to_base64(
encrypt_key(file['key'], self.master_key)
)
#update attributes
data = self._api_request([{
'a': 'a',
'attr': encrypt_attribs,
'key': encrypted_key,
'n': file['h'],
'i': self.request_id}])
#return API msg
# update attributes
data = self._api_request(
[
{
'a': 'a',
'attr': encrypt_attribs,
'key': encrypted_key,
'n': file['h'],
'i': self.request_id
}
]
)
return data
def move(self, file_id, target):
@ -689,7 +815,7 @@ class Mega(object):
target's structure returned by find()
"""
#determine target_node_id
# determine target_node_id
if type(target) == int:
target_node_id = str(self.get_node_by_type(target)[0])
elif type(target) in (str, unicode):
@ -697,10 +823,14 @@ class Mega(object):
else:
file = target[1]
target_node_id = file['h']
return self._api_request({'a': 'm',
'n': file_id,
't': target_node_id,
'i': self.request_id})
return self._api_request(
{
'a': 'm',
'n': file_id,
't': target_node_id,
'i': self.request_id
}
)
def add_contact(self, email):
"""
@ -728,26 +858,29 @@ class Mega(object):
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
ValidationError('add_contact requires a valid email address')
else:
return self._api_request({'a': 'ur',
'u': email,
'l': l,
'i': self.request_id})
return self._api_request(
{
'a': 'ur',
'u': email,
'l': l,
'i': self.request_id
}
)
def get_contacts(self):
raise NotImplementedError()
# TODO implement this
# sn param below = maxaction var with function getsc() in mega.co.nz js
# seens to be the 'sn' attrib of the previous request response...
# mega.co.nz js full source @ http://homepages.shu.ac.uk/~rjodwyer/mega-scripts-all.js
# requests goto /sc rather than
#req = requests.post(
#'{0}://g.api.{1}/sc'.format(self.schema, self.domain),
# params={'sn': 'ZMxcQ_DmHnM', 'ssl': '1'},
# data=json.dumps(None),
# timeout=self.timeout)
#json_resp = json.loads(req.text)
#print json_resp
# req = requests.post(
# '{0}://g.api.{1}/sc'.format(self.schema, self.domain),
# params={'sn': 'ZMxcQ_DmHnM', 'ssl': '1'},
# data=json.dumps(None),
# timeout=self.timeout)
# json_resp = json.loads(req.text)
# print json_resp
def get_public_url_info(self, url):
"""
@ -761,18 +894,17 @@ class Mega(object):
Import the public url into user account
"""
file_handle, file_key = self._parse_url(url).split('!')
return self.import_public_file(file_handle, file_key, dest_node=dest_node, dest_name=dest_name)
return self.import_public_file(
file_handle, file_key, dest_node=dest_node, dest_name=dest_name
)
def get_public_file_info(self, file_handle, file_key):
"""
Get size and name of a public file
"""
data = self._api_request({
'a': 'g',
'p': file_handle,
'ssm': 1})
data = self._api_request({'a': 'g', 'p': file_handle, 'ssm': 1})
#if numeric error code response
# if numeric error code response
if isinstance(data, int):
raise RequestError(data)
@ -780,20 +912,22 @@ class Mega(object):
raise ValueError("Unexpected result", data)
key = base64_to_a32(file_key)
k = (key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], key[3] ^ key[7])
k = (
key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], key[3] ^ key[7]
)
size = data['s']
unencrypted_attrs = decrypt_attr(base64_url_decode(data['at']), k)
if not unencrypted_attrs:
return None
result = {
'size': size,
'name': unencrypted_attrs['n']}
result = {'size': size, 'name': unencrypted_attrs['n']}
return result
def import_public_file(self, file_handle, file_key, dest_node=None, dest_name=None):
def import_public_file(
self, file_handle, file_key, dest_node=None, dest_name=None
):
"""
Import the public file into user account
"""
@ -809,19 +943,27 @@ class Mega(object):
dest_name = pl_info['name']
key = base64_to_a32(file_key)
k = (key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], key[3] ^ key[7])
k = (
key[0] ^ key[4], key[1] ^ key[5], key[2] ^ key[6], key[3] ^ key[7]
)
encrypted_key = a32_to_base64(encrypt_key(key, self.master_key))
encrypted_name = base64_url_encode(encrypt_attr({'n': dest_name}, k))
data = self._api_request({
'a': 'p',
't': dest_node['h'],
'n': [{
'ph': file_handle,
't': 0,
'a': encrypted_name,
'k': encrypted_key}]})
#return API msg
data = self._api_request(
{
'a':
'p',
't':
dest_node['h'],
'n': [
{
'ph': file_handle,
't': 0,
'a': encrypted_name,
'k': encrypted_key
}
]
}
)
return data

8
requirements-dev.txt Normal file
View file

@ -0,0 +1,8 @@
-r requirements.txt
pytest
ipdb
flake8
pep8-naming
autoflake
mccabe
yapf

View file

@ -1,3 +1,2 @@
requests>=0.10
pycrypto
mega.py

28
setup.cfg Normal file
View file

@ -0,0 +1,28 @@
[bdist_wheel]
universal = 1
[zest.releaser]
create-wheel = yes
[tool:pytest]
addopts = -x -s -v
norecursedirs = .git
[flake8]
exclude = .git,__pycache__,legacy,build,dist,.tox
max-complexity = 15
ignore = E741
[yapf]
based_on_style = pep8
spaces_before_comment = 2
split_before_logical_operator = true
indent_width = 4
split_complex_comprehension = true
column_limit = 79
dedent_closing_brackets = true
spaces_around_power_operator = true
no_spaces_around_selected_binary_operators = false
split_penalty_import_names = 500
join_multiple_lines = true

View file

@ -9,25 +9,31 @@ def get_packages(package):
"""
Return root package & all sub-packages.
"""
return [dirpath
for dirpath, dirnames, filenames in os.walk(package)
if os.path.exists(os.path.join(dirpath, '__init__.py'))]
return [
dirpath for dirpath, dirnames, filenames in os.walk(package)
if os.path.exists(os.path.join(dirpath, '__init__.py'))
]
def get_package_data(package):
"""
Return all files under the root package, that are not in a
package themselves.
"""
walk = [(dirpath.replace(package + os.sep, '', 1), filenames)
for dirpath, dirnames, filenames in os.walk(package)
if not os.path.exists(os.path.join(dirpath, '__init__.py'))]
walk = [
(dirpath.replace(package + os.sep, '', 1), filenames)
for dirpath, dirnames, filenames in os.walk(package)
if not os.path.exists(os.path.join(dirpath, '__init__.py'))
]
filepaths = []
for base, filenames in walk:
filepaths.extend([os.path.join(base, filename)
for filename in filenames])
filepaths.extend(
[os.path.join(base, filename) for filename in filenames]
)
return {package: filepaths}
setup(
name='mega.py',
version='0.9.17',
@ -41,8 +47,7 @@ setup(
install_requires=['pycrypto', 'requests'],
classifiers=[
'Intended Audience :: Developers',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Operating System :: OS Independent', 'Programming Language :: Python',
'Topic :: Internet :: WWW/HTTP'
]
)

View file

@ -3,23 +3,26 @@ These unit tests will upload a test file,a test folder and a test contact,
Perform api operations on them,
And them remove them from your account.
"""
from mega import Mega
import unittest
import random
import os
email = 'your@email.com'
password = 'password'
from mega import Mega
email = os.environ['EMAIL']
password = os.environ['PASS']
mega = Mega()
# anonymous login
m = mega.login()
# normal login
#m = mega.login(email, password)
# m = mega.login(email, password)
FIND_RESP = None
TEST_CONTACT = 'test@mega.co.nz'
TEST_PUBLIC_URL = 'https://mega.co.nz/#!EYI2VagT!Ic1yblki8oM4v6XHquCe4gu84kxc4glFchj8OvcT5lw'
TEST_PUBLIC_URL = (
'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps'
)
TEST_FILE = os.path.basename(__file__)
TEST_FOLDER = 'mega.py_testfolder_{0}'.format(random.random())
@ -88,7 +91,7 @@ class TestMega(unittest.TestCase):
self.assertIsInstance(resp, int)
def test_empty_trash(self):
#resp None if already empty, else int
# resp None if already empty, else int
resp = m.empty_trash()
if resp is not None:
self.assertIsInstance(resp, int)