Create folder and export, support sub folders

This commit is contained in:
Richard O'Dwyer 2019-10-23 21:06:16 +01:00
parent a7aa99bfe5
commit 5bef10b4cb
5 changed files with 117 additions and 44 deletions

View file

@ -10,6 +10,7 @@ Release History
- Adds support for login with a v2 Mega user account.
- Adds ``export()`` method to share a file or folder, returning public share URL with key.
- Adds code, message attrs to RequestError exception, makes message in raised exceptions include more details.
- Alters ``create_folder()`` to accept a path including multiple sub directories, adds support to create them all (similar to 'mkdir -p' on unix systems).
0.9.20 (2019-10-17)

View file

@ -151,6 +151,17 @@ Create a folder
.. code:: python
m.create_folder('new_folder')
m.create_folder('new_folder/sub_folder/subsub_folder')
Returns a dict of folder node name and node_id, e.g.
.. code:: python
{
'new_folder': 'qpFhAYwA',
'sub_folder': '2pdlmY4Z',
'subsub_folder': 'GgMFCKLZ'
}
Rename a file or a folder
~~~~~~~~~~~~~~~~~~~~~~~~~
@ -160,7 +171,7 @@ Rename a file or a folder
file = m.find('myfile.doc')
m.rename(file, 'my_file.doc')
M
~
.. _`https://code.richard.do/explore/projects`: https://code.richard.do/explore/projects

View file

@ -14,3 +14,4 @@ setuptools
twine
wheel
rope
pytest-mock

View file

@ -295,9 +295,11 @@ class Mega(object):
"""
Return file object from given filename
"""
files = self.get_files()
if handle:
return files[handle]
path = Path(filename)
filename = path.name
files = self.get_files()
parent_dir_name = path.parent.name
for file in list(files.items()):
parent_node_id = None
@ -314,8 +316,6 @@ class Mega(object):
file[1]['a'] and file[1]['a']['n'] == filename
):
return file
if handle and file[1]['h'] == handle:
return file
def get_files(self):
"""
@ -559,42 +559,49 @@ class Mega(object):
)
def _export_file(self, node):
node_data = self._node_data(node)
self._api_request([
{
'a': 'l',
'n': node[1]['h'],
'n': node_data['h'],
'i': self.request_id
}
])
return self.get_link(node)
def export(self, path):
self.get_files()
folder = self.find(path)
if folder[1]['t'] == 0:
return self._export_file(folder)
if folder:
def export(self, path=None, node_id=None):
nodes = self.get_files()
if node_id:
node = nodes[node_id]
else:
node = self.find(path)
node_data = self._node_data(node)
is_file_node = node_data['t'] == 0
if is_file_node:
return self._export_file(node)
if node:
try:
# If already exported
return self.get_folder_link(folder)
return self.get_folder_link(node)
except (RequestError, KeyError):
pass
master_key_cipher = AES.new(a32_to_str(self.master_key), AES.MODE_ECB)
ha = base64_url_encode(
master_key_cipher.encrypt(folder[1]['h'] + folder[1]['h'])
master_key_cipher.encrypt(node_data['h'] + node_data['h'])
)
share_key = secrets.token_bytes(16)
ok = base64_url_encode(master_key_cipher.encrypt(share_key))
share_key_cipher = AES.new(share_key, AES.MODE_ECB)
node_key = folder[1]['k']
node_key = node_data['k']
encrypted_node_key = base64_url_encode(
share_key_cipher.encrypt(a32_to_str(node_key))
)
node_id = folder[1]['h']
node_id = node_data['h']
request_body = [
{
'a': 's2',
@ -611,8 +618,7 @@ class Mega(object):
]
self._api_request(request_body)
nodes = self.get_files()
link = self.get_folder_link(nodes[node_id])
return link
return self.get_folder_link(nodes[node_id])
def download_url(self, url, dest_path=None, dest_filename=None):
"""
@ -847,14 +853,7 @@ class Mega(object):
input_file.close()
return data
def create_folder(self, name, dest=None):
# determine storage node
if dest is None:
# if none set, upload to cloud drive node
if not hasattr(self, 'root_id'):
self.get_files()
dest = self.root_id
def _mkdir(self, name, parent_node_id):
# generate random aes key (128) for folder
ul_key = [random.randint(0, 0xFFFFFFFF) for _ in range(6)]
@ -867,7 +866,7 @@ class Mega(object):
data = self._api_request(
{
'a': 'p',
't': dest,
't': parent_node_id,
'n': [
{
'h': 'xxxxxxxx',
@ -881,6 +880,33 @@ class Mega(object):
)
return data
def _root_node_id(self):
if not hasattr(self, 'root_id'):
self.get_files()
return self.root_id
def create_folder(self, name, dest=None):
dirs = tuple(dir_name for dir_name in str(name).split('/') if dir_name)
folder_node_ids = {}
for idx, directory_name in enumerate(dirs):
existing_node_id = self.find_path_descriptor(directory_name)
if existing_node_id:
folder_node_ids[idx] = existing_node_id
continue
if idx == 0:
if dest is None:
parent_node_id = self._root_node_id()
else:
parent_node_id = dest
else:
parent_node_id = folder_node_ids[idx - 1]
created_node = self._mkdir(
name=directory_name, parent_node_id=parent_node_id
)
node_id = created_node['f'][0]['h']
folder_node_ids[idx] = node_id
return dict(zip(dirs, folder_node_ids.values()))
def rename(self, file, new_name):
file = file[1]
# create new attribs

View file

@ -1,4 +1,5 @@
import random
from pathlib import Path
import os
import pytest
@ -10,16 +11,20 @@ TEST_PUBLIC_URL = (
'https://mega.nz/#!hYVmXKqL!r0d0-WRnFwulR_shhuEDwrY1Vo103-am1MyUy8oV6Ps'
)
TEST_FILE = os.path.basename(__file__)
TEST_FOLDER = 'mega.py_testfolder_{0}'.format(random.random())
@pytest.fixture
def mega():
def folder_name():
return 'mega.py_testfolder_{0}'.format(random.random())
@pytest.fixture
def mega(folder_name):
mega_ = Mega()
mega_.login(email=os.environ['EMAIL'], password=os.environ['PASS'])
node = mega_.create_folder(TEST_FOLDER)
created_nodes = mega_.create_folder(folder_name)
yield mega_
node_id = node['f'][0]['h']
node_id = next(iter(created_nodes.values()))
mega_.destroy(node_id)
@ -60,25 +65,39 @@ def test_get_link(mega):
class TestExport:
def test_export_folder(self, mega):
def test_export_folder(self, mega, folder_name):
public_url = None
for _ in range(2):
result_public_share_url = mega.export(TEST_FOLDER)
result_public_share_url = mega.export(folder_name)
if not public_url:
public_url = result_public_share_url
assert result_public_share_url.startswith('https://mega.co.nz/#F!')
assert result_public_share_url == public_url
def test_export_single_file(self, mega):
def test_export_folder_within_folder(self, mega, folder_name):
folder_path = Path(folder_name) / 'subdir' / 'anothersubdir'
mega.create_folder(name=folder_path)
result_public_share_url = mega.export(path=folder_path)
assert result_public_share_url.startswith('https://mega.co.nz/#F!')
def test_export_folder_using_node_id(self, mega, folder_name):
node_id = mega.find(folder_name)[0]
result_public_share_url = mega.export(node_id=node_id)
assert result_public_share_url.startswith('https://mega.co.nz/#F!')
def test_export_single_file(self, mega, folder_name):
# Upload a single file into a folder
folder = mega.find(TEST_FOLDER)
folder = mega.find(folder_name)
dest_node_id = folder[1]['h']
mega.upload(
__file__, dest=dest_node_id, dest_filename='test.py'
)
path = '{}/test.py'.format(TEST_FOLDER)
path = '{}/test.py'.format(folder_name)
assert mega.find(path)
for _ in range(2):
@ -94,20 +113,35 @@ def test_import_public_url(mega):
assert isinstance(resp, int)
def test_create_folder(mega):
resp = mega.create_folder(TEST_FOLDER)
assert isinstance(resp, dict)
class TestCreateFolder:
def test_create_folder(self, mega, folder_name):
folder_names_and_node_ids = mega.create_folder(folder_name)
assert isinstance(folder_names_and_node_ids, dict)
assert len(folder_names_and_node_ids) == 1
def test_create_folder_with_sub_folders(self, mega, folder_name, mocker):
folder_names_and_node_ids = mega.create_folder(
name=(Path(folder_name) / 'subdir' / 'anothersubdir')
)
assert len(folder_names_and_node_ids) == 3
assert folder_names_and_node_ids == {
folder_name: mocker.ANY,
'subdir': mocker.ANY,
'anothersubdir': mocker.ANY,
}
def test_rename(mega):
file = mega.find(TEST_FOLDER)
def test_rename(mega, folder_name):
file = mega.find(folder_name)
if file:
resp = mega.rename(file, TEST_FOLDER)
resp = mega.rename(file, folder_name)
assert isinstance(resp, int)
def test_delete_folder(mega):
folder_node = mega.find(TEST_FOLDER)[0]
def test_delete_folder(mega, folder_name):
folder_node = mega.find(folder_name)[0]
resp = mega.delete(folder_node)
assert isinstance(resp, int)