Merge remote-tracking branch 'upstream/master'

# Conflicts:
#	README.md
#	src/mega/crypto.py
#	src/mega/errors.py
#	src/mega/mega.py
This commit is contained in:
Ethan Dalool 2020-09-21 21:38:20 -07:00
commit 5a2f14b312
15 changed files with 305 additions and 339 deletions

View file

@ -1,8 +1,6 @@
sudo: false sudo: false
language: python language: python
python: python:
- 3.6
- 3.7
- 3.8 - 3.8
env: env:
- TOXENV=py-normal - TOXENV=py-normal

92
HISTORY.md Normal file
View file

@ -0,0 +1,92 @@
Release History
===============
1.0.9 (unreleased)
------------------
- Nothing changed yet.
1.0.8 (2020-06-25)
------------------
- Fixes find method returning the wrong file when more than one file
exists with that name.
- Handle new shared file URLS.
1.0.7 (2020-03-25)
------------------
- Fix login by calculating public RSA exponent instead of hardcoding.
1.0.6 (2020-02-03)
------------------
- Fixes RSA public exponent issue.
- Switches dependency pycrypto to pycryptodome.
1.0.5 (2019-11-18)
------------------
- Increase the wait time in between failed API request retries.
1.0.4 (2019-11-18)
------------------
- Increase the wait time in between failed API request retries.
1.0.3 (2019-11-12)
------------------
- Fixes broken `download` method.
- Changes `download` and `download_url` methods to return the path to
the downloaded file, previously returned `None`.
- Added LICENSE.
1.0.2 (2019-11-07)
------------------
- Reverts, "Replace pycrypto dependency with pycryptodome" as breaks
login process.
1.0.1 (2019-11-06)
------------------
- When a request fails due to EAGAIN response, retry with exp backoff
up to 20 seconds.
- Adds logging, removes print statements.
- Replace pycrypto dependency with pycryptodome.
- Removes Python 2 specific code.
1.0.0 (2019-10-31)
------------------
- Removes broken method `get_contacts()`.
- Adds support for login with a v2 Mega user account.
- Adds `export()` method to share a file or folder, returning public
share URL with key.
- Adds code, message attrs to RequestError exception, makes message in
raised exceptions include more details.
- Alters `create_folder()` to accept a path including multiple sub
directories, adds support to create them all (similar to 'mkdir -p'
on unix systems).
- Adds `exclude_deleted=True` optional arg to `find()` method, to
exclude deleted nodes from results.
0.9.20 (2019-10-17)
-------------------
- Python 3 bugfix to `upload` method.
0.9.19 (2019-10-16)
-------------------
- Python 3 support and bugfixes.
- Update packaging code.
- Added changelog.
0.9.18 (2013-07-04)
-------------------
- Unknown

View file

@ -1,79 +0,0 @@
.. :changelog:
Release History
===============
1.0.7 (unreleased)
------------------
- Fix login by calculating public RSA exponent instead of hardcoding.
1.0.6 (2020-02-03)
------------------
- Fixes RSA public exponent issue.
- Switches dependency pycrypto to pycryptodome.
1.0.5 (2019-11-18)
------------------
- Increase the wait time in between failed API request retries.
1.0.4 (2019-11-18)
------------------
- Increase the wait time in between failed API request retries.
1.0.3 (2019-11-12)
------------------
- Fixes broken ``download`` method.
- Changes ``download`` and ``download_url`` methods to return the path to the downloaded file, previously returned ``None``.
- Added LICENSE.
1.0.2 (2019-11-07)
------------------
- Reverts, "Replace pycrypto dependency with pycryptodome" as breaks login process.
1.0.1 (2019-11-06)
------------------
- When a request fails due to EAGAIN response, retry with exp backoff up to 20 seconds.
- Adds logging, removes print statements.
- Replace pycrypto dependency with pycryptodome.
- Removes Python 2 specific code.
1.0.0 (2019-10-31)
------------------
- Removes broken method ``get_contacts()``.
- Adds support for login with a v2 Mega user account.
- Adds ``export()`` method to share a file or folder, returning public share URL with key.
- Adds code, message attrs to RequestError exception, makes message in raised exceptions include more details.
- Alters ``create_folder()`` to accept a path including multiple sub directories, adds support to create them all (similar to 'mkdir -p' on unix systems).
- Adds ``exclude_deleted=True`` optional arg to ``find()`` method, to exclude deleted nodes from results.
0.9.20 (2019-10-17)
-------------------
- Python 3 bugfix to ``upload`` method.
0.9.19 (2019-10-16)
-------------------
- Python 3 support and bugfixes.
- Update packaging code.
- Added changelog.
0.9.18 (2013-07-04)
-------------------
- Unknown

View file

@ -1,9 +1,9 @@
include HISTORY.rst include HISTORY.md
include README.rst include README.md
include requirements.txt include requirements.txt
recursive-include tests * recursive-include tests *
recursive-exclude * __pycache__ recursive-exclude * __pycache__
recursive-exclude * *.py[co] recursive-exclude * *.py[co]
recursive-include docs *.rst conf.py Makefile make.bat recursive-include docs *.md conf.py Makefile make.bat

View file

@ -1,4 +1,5 @@
import os import os
import uuid
from mega import Mega from mega import Mega
@ -8,7 +9,7 @@ def test():
comment/uncomment lines to test various parts of the API comment/uncomment lines to test various parts of the API
see readme.md for more information see readme.md for more information
""" """
unique = str(uuid.uuid4())
# user details # user details
email = os.environ['EMAIL'] email = os.environ['EMAIL']
password = os.environ['PASS'] password = os.environ['PASS']
@ -36,10 +37,11 @@ def test():
print((files[file])) print((files[file]))
# upload file # upload file
print((m.upload('examples.py'))) print((m.upload(filename='examples.py',
dest_filename=f'examples_{unique}.py')))
# search for a file in account # search for a file in account
file = m.find('examples.py') file = m.find(f'examples_{unique}.py')
if file: if file:
# get public link # get public link

View file

@ -1,17 +1,20 @@
-r requirements.txt -r requirements.txt
pytest pytest==5.4.3
ipdb ipdb==0.13.3
flake8 flake8==3.8.3
pep8-naming pep8-naming==0.11.1
autoflake autoflake==1.3.1
mccabe mccabe==0.6.1
yapf brunette==0.1.5
tox tox==3.15.2
coverage coverage==5.1
pytest-cov pytest-cov==2.10.0
zest.releaser zest.releaser==6.20.1
setuptools setuptools==47.3.1
twine twine==3.2.0
wheel wheel==0.34.2
rope rope==0.17.0
pytest-mock pytest-mock==3.1.1
brunette==0.1.5
lock-requirements==0.1.1
requests-mock==1.8.0

View file

@ -11,20 +11,12 @@ norecursedirs = .git
[flake8] [flake8]
exclude = .git,__pycache__,legacy,build,dist,.tox exclude = .git,__pycache__,legacy,build,dist,.tox
max-complexity = 15 max-complexity = 15
ignore = E741,W504 ignore = E741,W504,W503
[yapf] [tool:brunette]
based_on_style = pep8 line-length = 79
spaces_before_comment = 2 verbose = true
split_before_logical_operator = true single-quotes = true
indent_width = 4
split_complex_comprehension = true
column_limit = 79
dedent_closing_brackets = true
spaces_around_power_operator = true
no_spaces_around_selected_binary_operators = false
split_penalty_import_names = 500
join_multiple_lines = true
[coverage:run] [coverage:run]
omit = omit =

View file

@ -14,33 +14,30 @@ os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir)))
with open('requirements.txt') as f: with open('requirements.txt') as f:
install_requires = f.read().splitlines() install_requires = f.read().splitlines()
with open('README.rst', 'r', encoding='utf-8') as rm_file: with open('README.md', 'r', encoding='utf-8') as rm_file:
readme = rm_file.read() readme = rm_file.read()
with open('HISTORY.rst', 'r', encoding='utf-8') as hist_file: with open('HISTORY.md', 'r', encoding='utf-8') as hist_file:
history = hist_file.read() history = hist_file.read()
setup( setup(name='mega.py',
name='mega.py', version='1.0.9.dev0',
version='1.0.7.dev0', packages=find_packages('src', exclude=('tests', )),
packages=find_packages('src', exclude=('tests', )), package_dir={'': 'src'},
package_dir={'': 'src'}, include_package_data=True,
include_package_data=True, zip_safe=False,
zip_safe=False, url='https://github.com/odwyersoftware/mega.py',
description='Python lib for the Mega.co.nz API', description='Python lib for the Mega.co.nz API',
long_description=readme + '\n\n' + history, long_description=readme + '\n\n' + history,
author='Richard O\'Dwyer', long_description_content_type='text/markdown',
author_email='richard@richard.do', author='O\'Dwyer Software',
license='Creative Commons Attribution-Noncommercial-Share Alike license', author_email='hello@odwyer.software',
install_requires=install_requires, license='Creative Commons Attribution-Noncommercial-Share Alike license',
classifiers=[ install_requires=install_requires,
'Intended Audience :: Developers', classifiers=[
'Operating System :: OS Independent', 'Intended Audience :: Developers',
'Programming Language :: Python', 'Operating System :: OS Independent',
'Programming Language :: Python :: 3', 'Programming Language :: Python',
'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7', 'Topic :: Internet :: WWW/HTTP',
'Programming Language :: Python :: 3.8', ])
'Topic :: Internet :: WWW/HTTP',
]
)

View file

@ -23,29 +23,24 @@ else:
def makestring(x): def makestring(x):
return codecs.latin_1_decode(x)[0] return codecs.latin_1_decode(x)[0]
def aes_cbc_encrypt(data, key): def aes_cbc_encrypt(data, key):
aes_cipher = AES.new(key, AES.MODE_CBC, makebyte('\0' * 16)) aes_cipher = AES.new(key, AES.MODE_CBC, makebyte('\0' * 16))
return aes_cipher.encrypt(data) return aes_cipher.encrypt(data)
def aes_cbc_decrypt(data, key): def aes_cbc_decrypt(data, key):
aes_cipher = AES.new(key, AES.MODE_CBC, makebyte('\0' * 16)) aes_cipher = AES.new(key, AES.MODE_CBC, makebyte('\0' * 16))
return aes_cipher.decrypt(data) return aes_cipher.decrypt(data)
def aes_cbc_encrypt_a32(data, key): def aes_cbc_encrypt_a32(data, key):
return str_to_a32(aes_cbc_encrypt(a32_to_str(data), a32_to_str(key))) return str_to_a32(aes_cbc_encrypt(a32_to_str(data), a32_to_str(key)))
def aes_cbc_decrypt_a32(data, key): def aes_cbc_decrypt_a32(data, key):
return str_to_a32(aes_cbc_decrypt(a32_to_str(data), a32_to_str(key))) return str_to_a32(aes_cbc_decrypt(a32_to_str(data), a32_to_str(key)))
def stringhash(str, aeskey): def stringhash(str, aeskey):
""" '''
As defined by MEGA's weblient crypto.js. Search for "function stringhash". As defined by MEGA's weblient crypto.js. Search for "function stringhash".
""" '''
s32 = str_to_a32(str) s32 = str_to_a32(str)
h32 = [0, 0, 0, 0] h32 = [0, 0, 0, 0]
for (index, word) in enumerate(s32): for (index, word) in enumerate(s32):
@ -54,7 +49,6 @@ def stringhash(str, aeskey):
h32 = aes_cbc_encrypt_a32(h32, aeskey) h32 = aes_cbc_encrypt_a32(h32, aeskey)
return a32_to_base64((h32[0], h32[2])) return a32_to_base64((h32[0], h32[2]))
def prepare_key(arr): def prepare_key(arr):
pkey = [0x93C467E3, 0x7DB0C7A4, 0xD1BE3F81, 0x0152CB56] pkey = [0x93C467E3, 0x7DB0C7A4, 0xD1BE3F81, 0x0152CB56]
for r in range(0x10000): for r in range(0x10000):
@ -66,7 +60,6 @@ def prepare_key(arr):
pkey = aes_cbc_encrypt_a32(pkey, key) pkey = aes_cbc_encrypt_a32(pkey, key)
return pkey return pkey
def encrypt_key(a, key): def encrypt_key(a, key):
encrypted = tuple( encrypted = tuple(
piece piece
@ -83,25 +76,21 @@ def decrypt_key(a, key):
) )
return decrypted return decrypted
def encrypt_attr(attr, key): def encrypt_attr(attr, key):
attr = makebyte('MEGA' + json.dumps(attr)) attr = makebyte('MEGA' + json.dumps(attr))
if len(attr) % 16: if len(attr) % 16:
attr += b'\0' * (16 - len(attr) % 16) attr += b'\0' * (16 - len(attr) % 16)
return aes_cbc_encrypt(attr, a32_to_str(key)) return aes_cbc_encrypt(attr, a32_to_str(key))
def decrypt_attr(attr, key): def decrypt_attr(attr, key):
attr = aes_cbc_decrypt(attr, a32_to_str(key)) attr = aes_cbc_decrypt(attr, a32_to_str(key))
attr = makestring(attr) attr = makestring(attr)
attr = attr.rstrip('\0') attr = attr.rstrip('\0')
return json.loads(attr[4:]) if attr[:6] == 'MEGA{"' else False return json.loads(attr[4:]) if attr[:6] == 'MEGA{"' else False
def a32_to_str(a): def a32_to_str(a):
return struct.pack('>%dI' % len(a), *a) return struct.pack('>%dI' % len(a), *a)
def str_to_a32(b): def str_to_a32(b):
if isinstance(b, str): if isinstance(b, str):
b = makebyte(b) b = makebyte(b)
@ -110,7 +99,6 @@ def str_to_a32(b):
b += b'\0' * (4 - len(b) % 4) b += b'\0' * (4 - len(b) % 4)
return struct.unpack('>%dI' % (len(b) / 4), b) return struct.unpack('>%dI' % (len(b) / 4), b)
def mpi_to_int(s): def mpi_to_int(s):
''' '''
A Multi-precision integer is encoded as a series of bytes in big-endian A Multi-precision integer is encoded as a series of bytes in big-endian
@ -127,10 +115,6 @@ def extended_gcd(a, b):
return (g, x - (b // a) * y, y) return (g, x - (b // a) * y, y)
def modular_inverse(a, m): def modular_inverse(a, m):
'''
Thank you Mart Bakhoff for this solution.
https://stackoverflow.com/a/9758173
'''
g, x, y = extended_gcd(a, m) g, x, y = extended_gcd(a, m)
if g != 1: if g != 1:
raise Exception('modular inverse does not exist') raise Exception('modular inverse does not exist')
@ -141,16 +125,14 @@ def interleave_xor_8(b):
return (b[0] ^ b[4], b[1] ^ b[5], b[2] ^ b[6], b[3] ^ b[7]) return (b[0] ^ b[4], b[1] ^ b[5], b[2] ^ b[6], b[3] ^ b[7])
def base64_url_decode(data): def base64_url_decode(data):
data += '==' [(2 - len(data) * 3) % 4:] data += '=='[(2 - len(data) * 3) % 4:]
for search, replace in (('-', '+'), ('_', '/'), (',', '')): for search, replace in (('-', '+'), ('_', '/'), (',', '')):
data = data.replace(search, replace) data = data.replace(search, replace)
return base64.b64decode(data) return base64.b64decode(data)
def base64_to_a32(s): def base64_to_a32(s):
return str_to_a32(base64_url_decode(s)) return str_to_a32(base64_url_decode(s))
def base64_url_encode(data): def base64_url_encode(data):
data = base64.b64encode(data) data = base64.b64encode(data)
data = makestring(data) data = makestring(data)
@ -158,16 +140,14 @@ def base64_url_encode(data):
data = data.replace(search, replace) data = data.replace(search, replace)
return data return data
def a32_to_base64(a): def a32_to_base64(a):
return base64_url_encode(a32_to_str(a)) return base64_url_encode(a32_to_str(a))
def get_chunks(size): def get_chunks(size):
""" '''
Given the size of a file in bytes, return tuples (chunk_start, chunk_size) Given the size of a file in bytes, return tuples (chunk_start, chunk_size)
for the purposes of downloading or uploading a file in chunks. for the purposes of downloading or uploading a file in chunks.
""" '''
chunk_start = 0 chunk_start = 0
chunk_size = 0x20000 chunk_size = 0x20000
while chunk_start + chunk_size < size: while chunk_start + chunk_size < size:
@ -178,7 +158,6 @@ def get_chunks(size):
chunk_size += 0x20000 chunk_size += 0x20000
yield (chunk_start, size - chunk_start) yield (chunk_start, size - chunk_start)
def make_id(length): def make_id(length):
possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
text = ''.join(random.choice(possible) for i in range(length)) text = ''.join(random.choice(possible) for i in range(length))

View file

@ -4,7 +4,6 @@ class ValidationError(Exception):
""" """
pass pass
class RequestError(Exception): class RequestError(Exception):
""" """
Error in API request Error in API request
@ -18,7 +17,6 @@ class RequestError(Exception):
def __str__(self): def __str__(self):
return self.message return self.message
class EINTERNAL(RequestError): class EINTERNAL(RequestError):
code = -1 code = -1
message = ( message = (
@ -152,7 +150,6 @@ _CODE_TO_CLASSES = {
-22: EAPPKEY, -22: EAPPKEY,
} }
def error_for_code(code): def error_for_code(code):
cls = _CODE_TO_CLASSES[code] cls = _CODE_TO_CLASSES[code]
return cls() return cls()

View file

@ -16,7 +16,6 @@ from Crypto.Cipher import AES
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from Crypto.Util import Counter from Crypto.Util import Counter
from . import crypto from . import crypto
from . import errors from . import errors
@ -223,20 +222,25 @@ class Mega:
(fileid, filekey). (fileid, filekey).
""" """
# File urls are '#!', Folder urls are '#F!' # File urls are '#!', Folder urls are '#F!'
match = re.findall(r'/#F?!(.*)!(.*)', url) if '/file/' in url:
if not match: # V2 URL structure
raise errors.ValidationError('Invalid public url. Should have /#!id!key') url = url.replace(' ', '')
file_id = re.findall(r'\W\w\w\w\w\w\w\w\w\W', url)[0][1:-1]
(public_handle, decryption_key) = match[0] id_index = re.search(file_id, url).end()
return (public_handle, decryption_key) key = url[id_index + 1:]
return f'{file_id}!{key}'
elif '!' in url:
match = re.findall(r'/#F?!(.*)!(.*)', url)
if not match:
raise errors.ValidationError('Invalid public url. Should have /#!id!key')
(public_handle, decryption_key) = match[0]
return (public_handle, decryption_key)
def _process_file(self, file): def _process_file(self, file):
if file['t'] in [NODE_TYPE_FILE, NODE_TYPE_DIR]: if file['t'] in [NODE_TYPE_FILE, NODE_TYPE_DIR]:
keys = dict( keys = dict(
keypart.split(':', 1) keypart.split(':', 1) for keypart in file['k'].split('/')
for keypart in file['k'].split('/') if ':' in keypart)
if ':' in keypart
)
uid = file['u'] uid = file['u']
key = None key = None
# my objects # my objects
@ -329,11 +333,8 @@ class Mega:
for foldername in paths: for foldername in paths:
if foldername != '': if foldername != '':
for file in files.items(): for file in files.items():
if ( if (file[1]['a'] and file[1]['t']
file[1]['a'] and and file[1]['a']['n'] == foldername):
file[1]['t'] and
file[1]['a']['n'] == foldername
):
if parent_desc == file[1]['p']: if parent_desc == file[1]['p']:
parent_desc = file[0] parent_desc = file[0]
found = True found = True
@ -355,31 +356,25 @@ class Mega:
parent_dir_name = path.parent.name parent_dir_name = path.parent.name
for file in list(files.items()): for file in list(files.items()):
parent_node_id = None parent_node_id = None
if parent_dir_name: try:
parent_node_id = self.find_path_descriptor( if parent_dir_name:
parent_dir_name, files=files parent_node_id = self.find_path_descriptor(parent_dir_name,
) files=files)
if ( if (filename and parent_node_id and file[1]['a']
filename and parent_node_id and and file[1]['a']['n'] == filename
file[1]['a'] and file[1]['a']['n'] == filename and and parent_node_id == file[1]['p']):
parent_node_id == file[1]['p'] if (exclude_deleted and self._trash_folder_node_id
): == file[1]['p']):
if ( continue
exclude_deleted and return file
self._trash_folder_node_id == file[1]['p'] elif (filename and file[1]['a']
): and file[1]['a']['n'] == filename):
if (exclude_deleted
and self._trash_folder_node_id == file[1]['p']):
continue continue
return file return file
if ( except TypeError:
filename and continue
file[1]['a'] and file[1]['a']['n'] == filename
):
if (
exclude_deleted and
self._trash_folder_node_id == file[1]['p']
):
continue
return file
def get_files(self, public_folder_handle=None): def get_files(self, public_folder_handle=None):
logger.info('Getting all files...') logger.info('Getting all files...')
@ -416,10 +411,8 @@ class Mega:
f'/#!{public_handle}!{decrypted_key}' f'/#!{public_handle}!{decrypted_key}'
) )
else: else:
raise ValueError( raise ValueError('''Upload() response required as input,
'''Upload() response required as input, use get_link() for regular file input''')
use get_link() for regular file input'''
)
def get_link(self, file): def get_link(self, file):
""" """
@ -617,24 +610,20 @@ class Mega:
""" """
Download a file by it's file object Download a file by it's file object
""" """
return self._download_file( return self._download_file(file_handle=None,
file_handle=None, file_key=None,
file_key=None, file=file[1],
file=file[1], dest_path=dest_path,
dest_path=dest_path, dest_filename=dest_filename,
dest_filename=dest_filename, is_public=False)
is_public=False
)
def _export_file(self, node): def _export_file(self, node):
node_data = self._node_data(node) node_data = self._node_data(node)
self._api_request([ self._api_request([{
{ 'a': 'l',
'a': 'l', 'n': node_data['h'],
'n': node_data['h'], 'i': self.request_id
'i': self.request_id }])
}
])
return self.get_link(node) return self.get_link(node)
def export(self, path=None, node_id=None): def export(self, path=None, node_id=None):
@ -670,20 +659,23 @@ class Mega:
) )
node_id = node_data['h'] node_id = node_data['h']
request_body = [ request_body = [{
{ 'a':
'a': 's2', 's2',
'n': node_id, 'n':
's': [{ node_id,
'u': 'EXP', 's': [{
'r': 0 'u': 'EXP',
}], 'r': 0
'i': self.request_id, }],
'ok': ok, 'i':
'ha': ha, self.request_id,
'cr': [[node_id], [node_id], [0, 0, encrypted_node_key]] 'ok':
} ok,
] 'ha':
ha,
'cr': [[node_id], [node_id], [0, 0, encrypted_node_key]]
}]
self._api_request(request_body) self._api_request(request_body)
nodes = self.get_files() nodes = self.get_files()
return self.get_folder_link(nodes[node_id]) return self.get_folder_link(nodes[node_id])
@ -701,15 +693,13 @@ class Mega:
is_public=True, is_public=True,
) )
def _download_file( def _download_file(self,
self, file_handle,
file_handle, file_key,
file_key, dest_path=None,
dest_path=None, dest_filename=None,
dest_filename=None, is_public=False,
is_public=False, file=None):
file=None
):
if file is None: if file is None:
if is_public: if is_public:
file_key = crypto.base64_to_a32(file_key) file_key = crypto.base64_to_a32(file_key)
@ -725,7 +715,6 @@ class Mega:
'n': file_handle 'n': file_handle
} }
file_data = self._api_request(request) file_data = self._api_request(request)
k = crypto.interleave_xor_8(file_key) k = crypto.interleave_xor_8(file_key)
iv = file_key[4:6] + (0, 0) iv = file_key[4:6] + (0, 0)
meta_mac = file_key[6:8] meta_mac = file_key[6:8]
@ -757,15 +746,15 @@ class Mega:
else: else:
dest_path += '/' dest_path += '/'
with tempfile.NamedTemporaryFile( temp_output_file = tempfile.NamedTemporaryFile(
mode='w+b', prefix='megapy_', delete=False mode='w+b', prefix='megapy_', delete=False
) as temp_output_file: )
with temp_output_file:
k_str = crypto.a32_to_str(k) k_str = crypto.a32_to_str(k)
counter = Counter.new( counter = Counter.new(
128, initial_value=((iv[0] << 32) + iv[1]) << 64 128, initial_value=((iv[0] << 32) + iv[1]) << 64
) )
aes = AES.new(k_str, AES.MODE_CTR, counter=counter) aes = AES.new(k_str, AES.MODE_CTR, counter=counter)
mac_str = '\0' * 16 mac_str = '\0' * 16
mac_encryptor = AES.new(k_str, AES.MODE_CBC, mac_str.encode("utf8")) mac_encryptor = AES.new(k_str, AES.MODE_CBC, mac_str.encode("utf8"))
iv_str = crypto.a32_to_str([iv[0], iv[1], iv[0], iv[1]]) iv_str = crypto.a32_to_str([iv[0], iv[1], iv[0], iv[1]])
@ -797,9 +786,8 @@ class Mega:
) )
file_mac = crypto.str_to_a32(mac_str) file_mac = crypto.str_to_a32(mac_str)
# check mac integrity # check mac integrity
if ( if (file_mac[0] ^ file_mac[1],
file_mac[0] ^ file_mac[1], file_mac[2] ^ file_mac[3] file_mac[2] ^ file_mac[3]) != meta_mac:
) != meta_mac:
raise ValueError('Mismatched mac') raise ValueError('Mismatched mac')
output_path = pathlib.Path(dest_path + file_name) output_path = pathlib.Path(dest_path + file_name)
shutil.move(temp_output_file.name, output_path) shutil.move(temp_output_file.name, output_path)
@ -822,8 +810,7 @@ class Mega:
ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)] ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)]
k_str = crypto.a32_to_str(ul_key[:4]) k_str = crypto.a32_to_str(ul_key[:4])
count = Counter.new( count = Counter.new(
128, initial_value=((ul_key[4] << 32) + ul_key[5]) << 64 128, initial_value=((ul_key[4] << 32) + ul_key[5]) << 64)
)
aes = AES.new(k_str, AES.MODE_CTR, counter=count) aes = AES.new(k_str, AES.MODE_CTR, counter=count)
upload_progress = 0 upload_progress = 0
@ -855,19 +842,17 @@ class Mega:
# encrypt file and upload # encrypt file and upload
chunk = aes.encrypt(chunk) chunk = aes.encrypt(chunk)
output_file = requests.post( output_file = requests.post(ul_url + "/" +
ul_url + "/" + str(chunk_start), str(chunk_start),
data=chunk, data=chunk,
timeout=self.timeout timeout=self.timeout)
)
completion_file_handle = output_file.text completion_file_handle = output_file.text
logger.info( logger.info('%s of %s uploaded', upload_progress,
'%s of %s uploaded', upload_progress, file_size file_size)
)
else: else:
output_file = requests.post( output_file = requests.post(ul_url + "/0",
ul_url + "/0", data='', timeout=self.timeout data='',
) timeout=self.timeout)
completion_file_handle = output_file.text completion_file_handle = output_file.text
logger.info('Chunks uploaded') logger.info('Chunks uploaded')
@ -955,9 +940,8 @@ class Mega:
parent_node_id = dest parent_node_id = dest
else: else:
parent_node_id = folder_node_ids[idx - 1] parent_node_id = folder_node_ids[idx - 1]
created_node = self._mkdir( created_node = self._mkdir(name=directory_name,
name=directory_name, parent_node_id=parent_node_id parent_node_id=parent_node_id)
)
node_id = created_node['f'][0]['h'] node_id = created_node['f'][0]['h']
folder_node_ids[idx] = node_id folder_node_ids[idx] = node_id
return dict(zip(dirs, folder_node_ids.values())) return dict(zip(dirs, folder_node_ids.values()))
@ -1185,9 +1169,11 @@ class Mega:
result = {'size': size, 'name': unencrypted_attrs['n']} result = {'size': size, 'name': unencrypted_attrs['n']}
return result return result
def import_public_file( def import_public_file(self,
self, file_handle, file_key, dest_node=None, dest_name=None file_handle,
): file_key,
dest_node=None,
dest_name=None):
""" """
Import the public file into user account Import the public file into user account
""" """
@ -1202,7 +1188,6 @@ class Mega:
key = crypto.base64_to_a32(file_key) key = crypto.base64_to_a32(file_key)
k = crypto.interleave_xor_8(key) k = crypto.interleave_xor_8(key)
encrypted_key = crypto.a32_to_base64(crypto.encrypt_key(key, self.master_key)) encrypted_key = crypto.a32_to_base64(crypto.encrypt_key(key, self.master_key))
encrypted_name = crypto.base64_url_encode(crypto.encrypt_attr({'n': dest_name}, k)) encrypted_name = crypto.base64_url_encode(crypto.encrypt_attr({'n': dest_name}, k))
request = { request = {

View file

@ -3,39 +3,17 @@ import pytest
from mega.crypto import get_chunks from mega.crypto import get_chunks
@pytest.mark.parametrize( @pytest.mark.parametrize('file_size, exp_result', [
'file_size, exp_result', [ (10, ((0, 10), )),
( (1000, ((0, 1000), )),
10, (1000000, ((0, 131072), (131072, 262144), (393216, 393216),
( (786432, 213568))),
(0, 10), (10000000, ((0, 131072), (131072, 262144), (393216, 393216),
)
),
(
1000,
(
(0, 1000),
)
),
(
1000000,
(
(0, 131072), (131072, 262144), (393216, 393216),
(786432, 213568)
)
),
(
10000000,
(
(0, 131072), (131072, 262144), (393216, 393216),
(786432, 524288), (1310720, 655360), (1966080, 786432), (786432, 524288), (1310720, 655360), (1966080, 786432),
(2752512, 917504), (3670016, 1048576), (4718592, 1048576), (2752512, 917504), (3670016, 1048576), (4718592, 1048576),
(5767168, 1048576), (6815744, 1048576), (7864320, 1048576), (5767168, 1048576), (6815744, 1048576), (7864320, 1048576),
(8912896, 1048576), (9961472, 38528) (8912896, 1048576), (9961472, 38528))),
) ])
),
]
)
def test_get_chunks(file_size, exp_result): def test_get_chunks(file_size, exp_result):
result = tuple(get_chunks(file_size)) result = tuple(get_chunks(file_size))

View file

@ -3,12 +3,9 @@ import pytest
from mega.errors import RequestError, _CODE_TO_DESCRIPTIONS from mega.errors import RequestError, _CODE_TO_DESCRIPTIONS
@pytest.mark.parametrize( @pytest.mark.parametrize('code, exp_message',
'code, exp_message', [ [(code, f'{desc[0]}, {desc[1]}')
(code, f'{desc[0]}, {desc[1]}') for code, desc in _CODE_TO_DESCRIPTIONS.items()])
for code, desc in _CODE_TO_DESCRIPTIONS.items()
]
)
def test_request_error(code, exp_message): def test_request_error(code, exp_message):
exc = RequestError(code) exc = RequestError(code)

View file

@ -2,15 +2,16 @@ import random
from pathlib import Path from pathlib import Path
import os import os
import requests_mock
import pytest import pytest
from mega import Mega from mega import Mega
TEST_CONTACT = 'test@mega.co.nz' TEST_CONTACT = 'test@mega.co.nz'
TEST_PUBLIC_URL = ( TEST_PUBLIC_URL = (
'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps' 'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps')
)
TEST_FILE = os.path.basename(__file__) TEST_FILE = os.path.basename(__file__)
MODULE = 'mega.mega'
@pytest.fixture @pytest.fixture
@ -32,9 +33,7 @@ def mega(folder_name):
def uploaded_file(mega, folder_name): def uploaded_file(mega, folder_name):
folder = mega.find(folder_name) folder = mega.find(folder_name)
dest_node_id = folder[1]['h'] dest_node_id = folder[1]['h']
mega.upload( mega.upload(__file__, dest=dest_node_id, dest_filename='test.py')
__file__, dest=dest_node_id, dest_filename='test.py'
)
path = f'{folder_name}/test.py' path = f'{folder_name}/test.py'
return mega.find(path) return mega.find(path)
@ -72,8 +71,8 @@ def test_get_link(mega, uploaded_file):
assert isinstance(link, str) assert isinstance(link, str)
@pytest.mark.skip
class TestExport: class TestExport:
def test_export_folder(self, mega, folder_name): def test_export_folder(self, mega, folder_name):
public_url = None public_url = None
for _ in range(2): for _ in range(2):
@ -103,9 +102,7 @@ class TestExport:
# Upload a single file into a folder # Upload a single file into a folder
folder = mega.find(folder_name) folder = mega.find(folder_name)
dest_node_id = folder[1]['h'] dest_node_id = folder[1]['h']
mega.upload( mega.upload(__file__, dest=dest_node_id, dest_filename='test.py')
__file__, dest=dest_node_id, dest_filename='test.py'
)
path = f'{folder_name}/test.py' path = f'{folder_name}/test.py'
assert mega.find(path) assert mega.find(path)
@ -131,8 +128,7 @@ class TestCreateFolder:
def test_create_folder_with_sub_folders(self, mega, folder_name, mocker): def test_create_folder_with_sub_folders(self, mega, folder_name, mocker):
folder_names_and_node_ids = mega.create_folder( folder_names_and_node_ids = mega.create_folder(
name=(Path(folder_name) / 'subdir' / 'anothersubdir') name=(Path(folder_name) / 'subdir' / 'anothersubdir'))
)
assert len(folder_names_and_node_ids) == 3 assert len(folder_names_and_node_ids) == 3
assert folder_names_and_node_ids == { assert folder_names_and_node_ids == {
@ -143,16 +139,21 @@ class TestCreateFolder:
class TestFind: class TestFind:
def test_find_file(self, mega, folder_name): def test_find_file(self, mega, folder_name):
folder = mega.find(folder_name) folder = mega.find(folder_name)
assert folder
dest_node_id = folder[1]['h'] dest_node_id = folder[1]['h']
mega.upload( mega.upload(__file__, dest=dest_node_id, dest_filename='test.py')
__file__, dest=dest_node_id, dest_filename='test.py' file1 = mega.find(f'{folder_name}/test.py')
) assert file1
path = f'{folder_name}/test.py'
assert mega.find(path) dest_node_id2 = mega.create_folder('new_folder')['new_folder']
mega.upload(__file__, dest=dest_node_id2, dest_filename='test.py')
file2 = mega.find('new_folder/test.py')
assert file2
# Check that the correct test.py was found
assert file1 != file2
def test_path_not_found_returns_none(self, mega): def test_path_not_found_returns_none(self, mega):
assert mega.find('not_found') is None assert mega.find('not_found') is None
@ -194,15 +195,13 @@ def test_download(mega, tmpdir, folder_name):
# Upload a single file into a folder # Upload a single file into a folder
folder = mega.find(folder_name) folder = mega.find(folder_name)
dest_node_id = folder[1]['h'] dest_node_id = folder[1]['h']
mega.upload( mega.upload(__file__, dest=dest_node_id, dest_filename='test.py')
__file__, dest=dest_node_id, dest_filename='test.py'
)
path = f'{folder_name}/test.py' path = f'{folder_name}/test.py'
file = mega.find(path) file = mega.find(path)
output_path = mega.download( output_path = mega.download(file=file,
file=file, dest_path=tmpdir, dest_filename='test.py' dest_path=tmpdir,
) dest_filename='test.py')
assert output_path.exists() assert output_path.exists()
@ -222,3 +221,29 @@ def test_add_contact(mega):
def test_remove_contact(mega): def test_remove_contact(mega):
resp = mega.remove_contact(TEST_CONTACT) resp = mega.remove_contact(TEST_CONTACT)
assert isinstance(resp, int) assert isinstance(resp, int)
@pytest.mark.parametrize('url, expected_file_id_and_key', [
('https://mega.nz/#!Ue5VRSIQ!kC2E4a4JwfWWCWYNJovGFHlbz8F'
'N-ISsBAGPzvTjT6k',
'Ue5VRSIQ!kC2E4a4JwfWWCWYNJovGFHlbz8FN-ISsBAGPzvTjT6k'),
('https://mega.nz/file/cH51DYDR#qH7QOfRcM-7N9riZWdSjsRq'
'5VDTLfIhThx1capgVA30',
'cH51DYDR!qH7QOfRcM-7N9riZWdSjsRq5VDTLfIhThx1capgVA30'),
])
def test_parse_url(url, expected_file_id_and_key, mega):
assert mega._parse_url(url) == expected_file_id_and_key
@pytest.mark.skip
class TestAPIRequest:
@pytest.mark.parametrize('response_text', ['-3', '-9'])
def test_when_api_returns_int_raises_exception(
self,
mega,
response_text,
):
with requests_mock.Mocker() as m:
m.post(f'{mega.schema}://g.api.{mega.domain}/cs',
text=response_text)
mega._api_request(data={})

View file

@ -1,5 +1,5 @@
[tox] [tox]
envlist = py{36,37,38}-normal,lint envlist = py{38}-normal,lint
[testenv] [testenv]
commands = commands =