Merge pull request #2 from richardARPANET/export

Adds export method, fixes login and more
This commit is contained in:
Richard 2019-10-31 22:23:04 +00:00 committed by GitHub
commit b44bb998c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 537 additions and 201 deletions

2
.gitignore vendored
View file

@ -5,7 +5,7 @@
#################
## Eclipse
#################
.env.fish
*.pydevproject
.project
.metadata

View file

@ -1,8 +1,9 @@
sudo: false
language: python
python:
- 2.7
- 3.6
- 3.7
- 3.8
env:
- TOXENV=py-normal

View file

@ -6,7 +6,12 @@ Release History
0.9.21 (unreleased)
+++++++++++++++++++
- Nothing changed yet.
- Removes broken method ``get_contacts()``.
- Adds support for login with a v2 Mega user account.
- Adds ``export()`` method to share a file or folder, returning public share URL with key.
- Adds code, message attrs to RequestError exception, makes message in raised exceptions include more details.
- Alters ``create_folder()`` to accept a path including multiple sub directories, adds support to create them all (similar to 'mkdir -p' on unix systems).
- Adds ``exclude_deleted=True`` optional arg to ``find()`` method, to exclude deleted nodes from results.
0.9.20 (2019-10-17)

View file

@ -1,14 +1,8 @@
**NOTICE**: If you're reading this on GitHub.com please be aware this is
a mirror of the primary remote located at
`https://code.richard.do/explore/projects`_. Please direct issues and
`https://code.richard.do/richardARPANET/mega.py`_. Please direct issues and
pull requests there.
Deprecated
==========
Mega.py is now deprecated, please use the official SDK
`https://github.com/meganz/sdk`_.
I aim to write a wrapper for the SDK when i have the time to do so.
--------------
@ -122,6 +116,24 @@ Upload a file, and get its public link
m.get_upload_link(file)
# see mega.py for destination and filename options
Export a file or folder
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code:: python
public_exported_web_link = m.export('myfile.doc')
public_exported_web_link = m.export('my_mega_folder/my_sub_folder_to_share')
# e.g. https://mega.nz/#F!WlVl1CbZ!M3wmhwZDENMNUJoBsdzFng
Fine a file or folder
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code:: python
folder = m.find('my_mega_folder')
# Excludes results which are in the Trash folder (i.e. deleted)
folder = m.find('my_mega_folder', exclude_deleted=True)
Upload a file to a destination folder
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -157,6 +169,17 @@ Create a folder
.. code:: python
m.create_folder('new_folder')
m.create_folder('new_folder/sub_folder/subsub_folder')
Returns a dict of folder node name and node_id, e.g.
.. code:: python
{
'new_folder': 'qpFhAYwA',
'sub_folder': '2pdlmY4Z',
'subsub_folder': 'GgMFCKLZ'
}
Rename a file or a folder
~~~~~~~~~~~~~~~~~~~~~~~~~
@ -166,7 +189,7 @@ Rename a file or a folder
file = m.find('myfile.doc')
m.rename(file, 'my_file.doc')
M
~
.. _`https://code.richard.do/explore/projects`: https://code.richard.do/explore/projects

View file

@ -13,3 +13,5 @@ zest.releaser
setuptools
twine
wheel
rope
pytest-mock

View file

@ -1,2 +1,4 @@
requests>=0.10
pycrypto
pathlib==1.0.1
python2-secrets==1.0.5

View file

@ -11,7 +11,7 @@ norecursedirs = .git
[flake8]
exclude = .git,__pycache__,legacy,build,dist,.tox
max-complexity = 15
ignore = E741
ignore = E741,W504
[yapf]
based_on_style = pep8

View file

@ -14,11 +14,11 @@ os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir)))
with open('requirements.txt') as f:
install_requires = f.read().splitlines()
# with open('README.rst', 'r', encoding='utf-8') as rm_file:
# readme = rm_file.read()
with open('README.rst', 'r', encoding='utf-8') as rm_file:
readme = rm_file.read()
# with open('HISTORY.rst', 'r', encoding='utf-8') as hist_file:
# history = hist_file.read()
with open('HISTORY.rst', 'r', encoding='utf-8') as hist_file:
history = hist_file.read()
setup(
name='mega.py',
@ -28,6 +28,7 @@ setup(
include_package_data=True,
zip_safe=False,
description='Python lib for the Mega.co.nz API',
long_description=readme + '\n\n' + history,
author='Richard O\'Dwyer',
author_email='richard@richard.do',
license='Creative Commons Attribution-Noncommercial-Share Alike license',
@ -36,13 +37,7 @@ setup(
'Intended Audience :: Developers',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.2',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',

View file

@ -5,9 +5,83 @@ class ValidationError(Exception):
pass
_CODE_TO_DESCRIPTIONS = {
-1: (
'EINTERNAL',
(
'An internal error has occurred. Please submit a bug report, '
'detailing the exact circumstances in which this error occurred'
)
),
-2: ('EARGS', 'You have passed invalid arguments to this command'),
-3: (
'EAGAIN',
(
'(always at the request level) A temporary congestion or server '
'malfunction prevented your request from being processed. '
'No data was altered. Retry. Retries must be spaced with '
'exponential backoff'
)
),
-4: (
'ERATELIMIT',
(
'You have exceeded your command weight per time quota. Please '
'wait a few seconds, then try again (this should never happen '
'in sane real-life applications)'
)
),
-5: ('EFAILED', 'The upload failed. Please restart it from scratch'),
-6: (
'ETOOMANY',
'Too many concurrent IP addresses are accessing this upload target URL'
),
-7: (
'ERANGE',
(
'The upload file packet is out of range or not starting and '
'ending on a chunk boundary'
)
),
-8: (
'EEXPIRED',
(
'The upload target URL you are trying to access has expired. '
'Please request a fresh one'
)
),
-9: ('ENOENT', 'Object (typically, node or user) not found'),
-10: ('ECIRCULAR', 'Circular linkage attempted'),
-11: (
'EACCESS',
'Access violation (e.g., trying to write to a read-only share)'
),
-12: ('EEXIST', 'Trying to create an object that already exists'),
-13: ('EINCOMPLETE', 'Trying to access an incomplete resource'),
-14: ('EKEY', 'A decryption operation failed (never returned by the API)'),
-15: ('ESID', 'Invalid or expired user session, please relogin'),
-16: ('EBLOCKED', 'User blocked'),
-17: ('EOVERQUOTA', 'Request over quota'),
-18: (
'ETEMPUNAVAIL',
'Resource temporarily not available, please try again later'
),
-19: ('ETOOMANYCONNECTIONS', 'many connections on this resource'),
-20: ('EWRITE', 'Write failed'),
-21: ('EREAD', 'Read failed'),
-22: ('EAPPKEY', 'Invalid application key; request not processed'),
}
class RequestError(Exception):
"""
Error in API request
"""
# TODO add error response messages
pass
def __init__(self, message):
code = message
self.code = code
code_desc, long_desc = _CODE_TO_DESCRIPTIONS[code]
self.message = f'{code_desc}, {long_desc}'
def __str__(self):
return self.message

View file

@ -1,5 +1,9 @@
import re
import time
import json
import secrets
from pathlib import Path
import hashlib
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Util import Counter
@ -28,6 +32,7 @@ class Mega(object):
self.sid = None
self.sequence_num = random.randint(0, 0xFFFFFFFF)
self.request_id = make_id(10)
self._trash_folder_node_id = None
if options is None:
options = {}
@ -38,13 +43,31 @@ class Mega(object):
self._login_user(email, password)
else:
self.login_anonymous()
self._trash_folder_node_id = self.get_node_by_type(4)[0]
return self
def _login_user(self, email, password):
password_aes = prepare_key(str_to_a32(password))
uh = stringhash(email, password_aes)
resp = self._api_request({'a': 'us', 'user': email, 'uh': uh})
# if numeric error code response
email = email.lower()
get_user_salt_resp = self._api_request({'a': 'us0', 'user': email})
user_salt = None
try:
user_salt = base64_to_a32(get_user_salt_resp['s'])
except KeyError:
# v1 user account
password_aes = prepare_key(str_to_a32(password))
user_hash = stringhash(email, password_aes)
else:
# v2 user account
pbkdf2_key = hashlib.pbkdf2_hmac(
hash_name='sha512',
password=password.encode(),
salt=a32_to_str(user_salt),
iterations=100000,
dklen=32
)
password_aes = str_to_a32(pbkdf2_key[:16])
user_hash = base64_url_encode(pbkdf2_key[-16:])
resp = self._api_request({'a': 'us', 'user': email, 'uh': user_hash})
if isinstance(resp, int):
raise RequestError(resp)
self._login_process(resp, password_aes)
@ -70,7 +93,6 @@ class Mega(object):
)
resp = self._api_request({'a': 'us', 'user': user})
# if numeric error code response
if isinstance(resp, int):
raise RequestError(resp)
self._login_process(resp, password_key)
@ -96,7 +118,8 @@ class Mega(object):
for i in range(4):
if PYTHON2:
l = (
(ord(private_key[0]) * 256 + ord(private_key[1]) + 7) / 8
(ord(private_key[0]) * 256 +
ord(private_key[1]) + 7) / 8
) + 2
else:
l = int(
@ -113,7 +136,6 @@ class Mega(object):
self.rsa_private_key[1]
)
)
sid = '%x' % rsa_decrypter.key._decrypt(encrypted_sid)
sid = binascii.unhexlify('0' + sid if len(sid) % 2 else sid)
self.sid = base64_url_encode(sid[:43])
@ -135,21 +157,12 @@ class Mega(object):
params=params,
data=json.dumps(data),
timeout=self.timeout,
headers={
'Origin':
'https://mega.nz',
'Referer':
'https://mega.nz/login',
'User-Agent': (
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:69.0) '
'Gecko/20100101 Firefox/69.0'
),
}
)
json_resp = json.loads(req.text)
# if numeric error code response
if isinstance(json_resp, int):
if json_resp == -3:
time.sleep(0.2)
return self._api_request(data=data)
raise RequestError(json_resp)
return json_resp[0]
@ -163,9 +176,6 @@ class Mega(object):
raise RequestError('Url key missing')
def _process_file(self, file, shared_keys):
"""
Process a file
"""
if file['t'] == 0 or file['t'] == 1:
keys = dict(
keypart.split(':', 1)
@ -194,6 +204,13 @@ class Mega(object):
key = keys[hkey]
key = decrypt_key(base64_to_a32(key), shared_key)
break
if file['h'] and file['h'] in shared_keys.get('EXP', ()):
shared_key = shared_keys['EXP'][file['h']]
encrypted_key = str_to_a32(
base64_url_decode(file['k'].split(':')[-1])
)
key = decrypt_key(encrypted_key, shared_key)
file['shared_folder_key'] = shared_key
if key is not None:
# file
if file['t'] == 0:
@ -244,11 +261,9 @@ class Mega(object):
shared_keys[s_item['u']] = {}
if s_item['h'] in ok_dict:
shared_keys[s_item['u']][s_item['h']] = ok_dict[s_item['h']]
self.shared_keys = shared_keys
##########################################################################
# GET
def find_path_descriptor(self, path):
def find_path_descriptor(self, path, files=()):
"""
Find descriptor of folder inside a path. i.e.: folder1/folder2/folder3
Params:
@ -258,14 +273,17 @@ class Mega(object):
"""
paths = path.split('/')
files = self.get_files()
files = files or self.get_files()
parent_desc = self.root_id
found = False
for foldername in paths:
if foldername != '':
for file in files.items():
if file[1]['a'] and file[1]['t'] and \
file[1]['a']['n'] == foldername:
if (
file[1]['a'] and
file[1]['t'] and
file[1]['a']['n'] == foldername
):
if parent_desc == file[1]['p']:
parent_desc = file[0]
found = True
@ -275,22 +293,49 @@ class Mega(object):
return None
return parent_desc
def find(self, filename):
def find(self, filename=None, handle=None, exclude_deleted=False):
"""
Return file object from given filename
"""
files = self.get_files()
if handle:
return files[handle]
path = Path(filename)
filename = path.name
parent_dir_name = path.parent.name
for file in list(files.items()):
if not isinstance(file[1]['a'], dict):
continue
if file[1]['a'] and file[1]['a']['n'] == filename:
parent_node_id = None
if parent_dir_name:
parent_node_id = self.find_path_descriptor(
parent_dir_name, files=files
)
if (
filename and parent_node_id and
file[1]['a'] and file[1]['a']['n'] == filename and
parent_node_id == file[1]['p']
):
if (
exclude_deleted and
self._trash_folder_node_id == file[1]['p']
):
continue
return file
if (
filename and
file[1]['a'] and file[1]['a']['n'] == filename
):
if (
exclude_deleted and
self._trash_folder_node_id == file[1]['p']
):
continue
return file
def get_files(self):
"""
Get all files in account
"""
files = self._api_request({'a': 'f', 'c': 1})
files = self._api_request({'a': 'f', 'c': 1, 'r': 1})
files_dict = {}
shared_keys = {}
self._init_shared_keys(files, shared_keys)
@ -341,6 +386,31 @@ class Mega(object):
else:
raise ValidationError('File id and key must be present')
def _node_data(self, node):
try:
return node[1]
except (IndexError, KeyError):
return node
def get_folder_link(self, file):
try:
file = file[1]
except (IndexError, KeyError):
pass
if 'h' in file and 'k' in file:
public_handle = self._api_request({'a': 'l', 'n': file['h']})
if public_handle == -11:
raise RequestError(
"Can't get a public link from that file "
"(is this a shared file?)"
)
decrypted_key = a32_to_base64(file['shared_folder_key'])
return '{0}://{1}/#F!{2}!{3}'.format(
self.schema, self.domain, public_handle, decrypted_key
)
else:
raise ValidationError('File id and key must be present')
def get_user(self):
user_data = self._api_request({'a': 'ug'})
return user_data
@ -442,8 +512,6 @@ class Mega(object):
if 'balance' in user_data:
return user_data['balance']
##########################################################################
# DELETE
def delete(self, public_handle):
"""
Delete a file by its public handle
@ -491,8 +559,6 @@ class Mega(object):
post_list.append({"a": "d", "n": file, "i": self.request_id})
return self._api_request(post_list)
##########################################################################
# DOWNLOAD
def download(self, file, dest_path=None, dest_filename=None):
"""
Download a file by it's file object
@ -506,6 +572,68 @@ class Mega(object):
is_public=False
)
def _export_file(self, node):
node_data = self._node_data(node)
self._api_request([
{
'a': 'l',
'n': node_data['h'],
'i': self.request_id
}
])
return self.get_link(node)
def export(self, path=None, node_id=None):
nodes = self.get_files()
if node_id:
node = nodes[node_id]
else:
node = self.find(path)
node_data = self._node_data(node)
is_file_node = node_data['t'] == 0
if is_file_node:
return self._export_file(node)
if node:
try:
# If already exported
return self.get_folder_link(node)
except (RequestError, KeyError):
pass
master_key_cipher = AES.new(a32_to_str(self.master_key), AES.MODE_ECB)
ha = base64_url_encode(
master_key_cipher.encrypt(node_data['h'] + node_data['h'])
)
share_key = secrets.token_bytes(16)
ok = base64_url_encode(master_key_cipher.encrypt(share_key))
share_key_cipher = AES.new(share_key, AES.MODE_ECB)
node_key = node_data['k']
encrypted_node_key = base64_url_encode(
share_key_cipher.encrypt(a32_to_str(node_key))
)
node_id = node_data['h']
request_body = [
{
'a': 's2',
'n': node_id,
's': [{
'u': 'EXP',
'r': 0
}],
'i': self.request_id,
'ok': ok,
'ha': ha,
'cr': [[node_id], [node_id], [0, 0, encrypted_node_key]]
}
]
self._api_request(request_body)
nodes = self.get_files()
return self.get_folder_link(nodes[node_id])
def download_url(self, url, dest_path=None, dest_filename=None):
"""
Download a file by it's public url
@ -631,8 +759,6 @@ class Mega(object):
shutil.move(temp_output_file.name, dest_path + file_name)
##########################################################################
# UPLOAD
def upload(self, filename, dest=None, dest_filename=None):
# determine storage node
if dest is None:
@ -683,18 +809,11 @@ class Mega(object):
# encrypt file and upload
chunk = aes.encrypt(chunk)
try:
output_file = requests.post(
ul_url + "/" + str(chunk_start),
data=chunk,
timeout=self.timeout
)
except:
output_file = requests.post(
ul_url + "/" + str(chunk_start),
data=chunk,
timeout=self.timeout
)
output_file = requests.post(
ul_url + "/" + str(chunk_start),
data=chunk,
timeout=self.timeout
)
completion_file_handle = output_file.text
if self.options.get('verbose') is True:
@ -748,14 +867,7 @@ class Mega(object):
input_file.close()
return data
def create_folder(self, name, dest=None):
# determine storage node
if dest is None:
# if none set, upload to cloud drive node
if not hasattr(self, 'root_id'):
self.get_files()
dest = self.root_id
def _mkdir(self, name, parent_node_id):
# generate random aes key (128) for folder
ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)]
@ -767,10 +879,8 @@ class Mega(object):
# update attributes
data = self._api_request(
{
'a':
'p',
't':
dest,
'a': 'p',
't': parent_node_id,
'n': [
{
'h': 'xxxxxxxx',
@ -779,12 +889,38 @@ class Mega(object):
'k': encrypted_key
}
],
'i':
self.request_id
'i': self.request_id
}
)
return data
def _root_node_id(self):
if not hasattr(self, 'root_id'):
self.get_files()
return self.root_id
def create_folder(self, name, dest=None):
dirs = tuple(dir_name for dir_name in str(name).split('/') if dir_name)
folder_node_ids = {}
for idx, directory_name in enumerate(dirs):
existing_node_id = self.find_path_descriptor(directory_name)
if existing_node_id:
folder_node_ids[idx] = existing_node_id
continue
if idx == 0:
if dest is None:
parent_node_id = self._root_node_id()
else:
parent_node_id = dest
else:
parent_node_id = folder_node_ids[idx - 1]
created_node = self._mkdir(
name=directory_name, parent_node_id=parent_node_id
)
node_id = created_node['f'][0]['h']
folder_node_ids[idx] = node_id
return dict(zip(dirs, folder_node_ids.values()))
def rename(self, file, new_name):
file = file[1]
# create new attribs
@ -881,21 +1017,6 @@ class Mega(object):
}
)
def get_contacts(self):
raise NotImplementedError()
# TODO implement this
# sn param below = maxaction var with function getsc() in mega.co.nz js
# seens to be the 'sn' attrib of the previous request response...
# requests goto /sc rather than
# req = requests.post(
# '{0}://g.api.{1}/sc'.format(self.schema, self.domain),
# params={'sn': 'ZMxcQ_DmHnM', 'ssl': '1'},
# data=json.dumps(None),
# timeout=self.timeout)
# json_resp = json.loads(req.text)
# print json_resp
def get_public_url_info(self, url):
"""
Get size and name from a public url, dict returned
@ -917,8 +1038,6 @@ class Mega(object):
Get size and name of a public file
"""
data = self._api_request({'a': 'g', 'p': file_handle, 'ssm': 1})
# if numeric error code response
if isinstance(data, int):
raise RequestError(data)

0
src/tests/__init__.py Normal file
View file

17
src/tests/test_errors.py Normal file
View file

@ -0,0 +1,17 @@
import pytest
from mega.errors import RequestError, _CODE_TO_DESCRIPTIONS
@pytest.mark.parametrize(
'code, exp_message', [
(code, f'{desc[0]}, {desc[1]}')
for code, desc in _CODE_TO_DESCRIPTIONS.items()
]
)
def test_request_error(code, exp_message):
exc = RequestError(code)
assert exc.code == code
assert exc.message == exp_message
assert str(exc) == exp_message

View file

@ -1,106 +1,202 @@
"""
These unit tests will upload a test file,a test folder and a test contact,
Perform api operations on them,
And them remove them from your account.
"""
import unittest
import random
from pathlib import Path
import os
import pytest
from mega import Mega
mega = Mega()
# anonymous login
m = mega.login()
# normal login
# m = mega.login(email, password)
FIND_RESP = None
TEST_CONTACT = 'test@mega.co.nz'
TEST_PUBLIC_URL = (
'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps'
)
TEST_FILE = os.path.basename(__file__)
TEST_FOLDER = 'mega.py_testfolder_{0}'.format(random.random())
class TestMega(unittest.TestCase):
def test_mega(self):
self.assertIsInstance(mega, Mega)
def test_login(self):
self.assertIsInstance(mega, Mega)
def test_get_user(self):
resp = m.get_user()
self.assertIsInstance(resp, dict)
def test_get_quota(self):
resp = m.get_quota()
self.assertIsInstance(int(resp), int)
def test_get_storage_space(self):
resp = m.get_storage_space(mega=True)
self.assertIsInstance(resp, dict)
def test_get_files(self):
files = m.get_files()
self.assertIsInstance(files, dict)
def test_get_link(self):
file = m.find(TEST_FILE)
if file:
link = m.get_link(file)
self.assertIsInstance(link, str)
def test_import_public_url(self):
resp = m.import_public_url(TEST_PUBLIC_URL)
file_handle = m.get_id_from_obj(resp)
resp = m.destroy(file_handle)
self.assertIsInstance(resp, int)
def test_create_folder(self):
resp = m.create_folder(TEST_FOLDER)
self.assertIsInstance(resp, dict)
def test_rename(self):
file = m.find(TEST_FOLDER)
if file:
resp = m.rename(file, TEST_FOLDER)
self.assertIsInstance(resp, int)
def test_delete_folder(self):
folder_node = m.find(TEST_FOLDER)[0]
resp = m.delete(folder_node)
self.assertIsInstance(resp, int)
def test_delete(self):
file = m.find(TEST_FILE)
if file:
resp = m.delete(file[0])
self.assertIsInstance(resp, int)
def test_destroy(self):
file = m.find(TEST_FILE)
if file:
resp = m.destroy(file[0])
self.assertIsInstance(resp, int)
def test_empty_trash(self):
# resp None if already empty, else int
resp = m.empty_trash()
if resp is not None:
self.assertIsInstance(resp, int)
def test_add_contact(self):
resp = m.add_contact(TEST_CONTACT)
self.assertIsInstance(resp, int)
def test_remove_contact(self):
resp = m.remove_contact(TEST_CONTACT)
self.assertIsInstance(resp, int)
@pytest.fixture
def folder_name():
return 'mega.py_testfolder_{0}'.format(random.random())
if __name__ == '__main__':
unittest.main()
@pytest.fixture
def mega(folder_name):
mega_ = Mega()
mega_.login(email=os.environ['EMAIL'], password=os.environ['PASS'])
created_nodes = mega_.create_folder(folder_name)
yield mega_
node_id = next(iter(created_nodes.values()))
mega_.destroy(node_id)
def test_mega(mega):
assert isinstance(mega, Mega)
def test_login(mega):
assert isinstance(mega, Mega)
def test_get_user(mega):
resp = mega.get_user()
assert isinstance(resp, dict)
def test_get_quota(mega):
resp = mega.get_quota()
assert isinstance(int(resp), int)
def test_get_storage_space(mega):
resp = mega.get_storage_space(mega=True)
assert isinstance(resp, dict)
def test_get_files(mega):
files = mega.get_files()
assert isinstance(files, dict)
def test_get_link(mega):
file = mega.find(TEST_FILE)
if file:
link = mega.get_link(file)
assert isinstance(link, str)
class TestExport:
def test_export_folder(self, mega, folder_name):
public_url = None
for _ in range(2):
result_public_share_url = mega.export(folder_name)
if not public_url:
public_url = result_public_share_url
assert result_public_share_url.startswith('https://mega.co.nz/#F!')
assert result_public_share_url == public_url
def test_export_folder_within_folder(self, mega, folder_name):
folder_path = Path(folder_name) / 'subdir' / 'anothersubdir'
mega.create_folder(name=folder_path)
result_public_share_url = mega.export(path=folder_path)
assert result_public_share_url.startswith('https://mega.co.nz/#F!')
def test_export_folder_using_node_id(self, mega, folder_name):
node_id = mega.find(folder_name)[0]
result_public_share_url = mega.export(node_id=node_id)
assert result_public_share_url.startswith('https://mega.co.nz/#F!')
def test_export_single_file(self, mega, folder_name):
# Upload a single file into a folder
folder = mega.find(folder_name)
dest_node_id = folder[1]['h']
mega.upload(
__file__, dest=dest_node_id, dest_filename='test.py'
)
path = f'{folder_name}/test.py'
assert mega.find(path)
for _ in range(2):
result_public_share_url = mega.export(path)
assert result_public_share_url.startswith('https://mega.co.nz/#!')
def test_import_public_url(mega):
resp = mega.import_public_url(TEST_PUBLIC_URL)
file_handle = mega.get_id_from_obj(resp)
resp = mega.destroy(file_handle)
assert isinstance(resp, int)
class TestCreateFolder:
def test_create_folder(self, mega, folder_name):
folder_names_and_node_ids = mega.create_folder(folder_name)
assert isinstance(folder_names_and_node_ids, dict)
assert len(folder_names_and_node_ids) == 1
def test_create_folder_with_sub_folders(self, mega, folder_name, mocker):
folder_names_and_node_ids = mega.create_folder(
name=(Path(folder_name) / 'subdir' / 'anothersubdir')
)
assert len(folder_names_and_node_ids) == 3
assert folder_names_and_node_ids == {
folder_name: mocker.ANY,
'subdir': mocker.ANY,
'anothersubdir': mocker.ANY,
}
class TestFind:
def test_find_file(self, mega, folder_name):
folder = mega.find(folder_name)
dest_node_id = folder[1]['h']
mega.upload(
__file__, dest=dest_node_id, dest_filename='test.py'
)
path = f'{folder_name}/test.py'
assert mega.find(path)
def test_path_not_found_returns_none(self, mega):
assert mega.find('not_found') is None
def test_exclude_deleted_files(self, mega, folder_name):
folder_node_id = mega.find(folder_name)[0]
assert mega.find(folder_name)
mega.delete(folder_node_id)
assert mega.find(folder_name)
assert not mega.find(folder_name, exclude_deleted=True)
def test_rename(mega, folder_name):
file = mega.find(folder_name)
if file:
resp = mega.rename(file, folder_name)
assert isinstance(resp, int)
def test_delete_folder(mega, folder_name):
folder_node = mega.find(folder_name)[0]
resp = mega.delete(folder_node)
assert isinstance(resp, int)
def test_delete(mega):
file = mega.find(TEST_FILE)
if file:
resp = mega.delete(file[0])
assert isinstance(resp, int)
def test_destroy(mega):
file = mega.find(TEST_FILE)
if file:
resp = mega.destroy(file[0])
assert isinstance(resp, int)
def test_empty_trash(mega):
# resp None if already empty, else int
resp = mega.empty_trash()
if resp is not None:
assert isinstance(resp, int)
def test_add_contact(mega):
resp = mega.add_contact(TEST_CONTACT)
assert isinstance(resp, int)
def test_remove_contact(mega):
resp = mega.remove_contact(TEST_CONTACT)
assert isinstance(resp, int)

View file

@ -1,5 +1,5 @@
[tox]
envlist = py{27,37}-normal,lint
envlist = py{36,37,38}-normal,lint
[testenv]
commands =
@ -7,7 +7,9 @@ commands =
coverage erase
python setup.py install
pytest {toxinidir}/src/tests/tests.py
passenv =
EMAIL
PASS
deps =
-rrequirements-dev.txt