Fixes download, updated download methods to return downloded path instead of None

This commit is contained in:
Richard O'Dwyer 2019-11-12 09:49:57 +00:00
parent c0916b77bb
commit 92a86ff439
3 changed files with 83 additions and 57 deletions

View file

@ -6,7 +6,9 @@ Release History
1.0.3 (unreleased) 1.0.3 (unreleased)
------------------ ------------------
- Nothing changed yet. - Fixes broken ``download`` method.
- Changes ``download`` and ``download_url`` methods to return the path to the downloaded file, previously returned ``None``.
- Added LICENSE.
1.0.2 (2019-11-07) 1.0.2 (2019-11-07)

View file

@ -568,7 +568,7 @@ class Mega:
""" """
Download a file by it's file object Download a file by it's file object
""" """
self._download_file( return self._download_file(
file_handle=None, file_handle=None,
file_key=None, file_key=None,
file=file[1], file=file[1],
@ -646,7 +646,7 @@ class Mega:
path = self._parse_url(url).split('!') path = self._parse_url(url).split('!')
file_id = path[0] file_id = path[0]
file_key = path[1] file_key = path[1]
self._download_file( return self._download_file(
file_handle=file_id, file_handle=file_id,
file_key=file_key, file_key=file_key,
dest_path=dest_path, dest_path=dest_path,
@ -694,7 +694,7 @@ class Mega:
iv = file['iv'] iv = file['iv']
meta_mac = file['meta_mac'] meta_mac = file['meta_mac']
# Seems to happens sometime... When this occurs, files are # Seems to happens sometime... When this occurs, files are
# inaccessible also in the official also in the official web app. # inaccessible also in the official also in the official web app.
# Strangely, files can come back later. # Strangely, files can come back later.
if 'g' not in file_data: if 'g' not in file_data:
@ -716,51 +716,53 @@ class Mega:
else: else:
dest_path += '/' dest_path += '/'
temp_output_file = tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(
mode='w+b', prefix='megapy_', delete=False mode='w+b', prefix='megapy_', delete=False
) ) as temp_output_file:
k_str = a32_to_str(k)
counter = Counter.new(
128, initial_value=((iv[0] << 32) + iv[1]) << 64
)
aes = AES.new(k_str, AES.MODE_CTR, counter=counter)
k_str = a32_to_str(k) mac_str = '\0' * 16
counter = Counter.new(128, initial_value=((iv[0] << 32) + iv[1]) << 64) mac_encryptor = AES.new(k_str, AES.MODE_CBC, mac_str)
aes = AES.new(k_str, AES.MODE_CTR, counter=counter) iv_str = a32_to_str([iv[0], iv[1], iv[0], iv[1]])
mac_str = '\0' * 16 for chunk_start, chunk_size in get_chunks(file_size):
mac_encryptor = AES.new(k_str, AES.MODE_CBC, mac_str) chunk = input_file.read(chunk_size)
iv_str = a32_to_str([iv[0], iv[1], iv[0], iv[1]]) chunk = aes.decrypt(chunk)
temp_output_file.write(chunk)
for chunk_start, chunk_size in get_chunks(file_size): encryptor = AES.new(k_str, AES.MODE_CBC, iv_str)
chunk = input_file.read(chunk_size) for i in range(0, len(chunk) - 16, 16):
chunk = aes.decrypt(chunk) block = chunk[i:i + 16]
temp_output_file.write(chunk) encryptor.encrypt(block)
# fix for files under 16 bytes failing
if file_size > 16:
i += 16
else:
i = 0
encryptor = AES.new(k_str, AES.MODE_CBC, iv_str)
for i in range(0, len(chunk) - 16, 16):
block = chunk[i:i + 16] block = chunk[i:i + 16]
encryptor.encrypt(block) if len(block) % 16:
block += b'\0' * (16 - (len(block) % 16))
mac_str = mac_encryptor.encrypt(encryptor.encrypt(block))
# fix for files under 16 bytes failing file_info = os.stat(temp_output_file.name)
if file_size > 16: logger.info(
i += 16 '%s of %s downloaded', file_info.st_size, file_size
else: )
i = 0 file_mac = str_to_a32(mac_str)
# check mac integrity
block = chunk[i:i + 16] if (
if len(block) % 16: file_mac[0] ^ file_mac[1], file_mac[2] ^ file_mac[3]
block += '\0' * (16 - (len(block) % 16)) ) != meta_mac:
mac_str = mac_encryptor.encrypt(encryptor.encrypt(block)) raise ValueError('Mismatched mac')
output_path = Path(dest_path + file_name)
file_info = os.stat(temp_output_file.name) shutil.move(temp_output_file.name, output_path)
logger.info('%s of %s downloaded', file_info.st_size, file_size) return output_path
file_mac = str_to_a32(mac_str)
temp_output_file.close()
# check mac integrity
if (file_mac[0] ^ file_mac[1], file_mac[2] ^ file_mac[3]) != meta_mac:
raise ValueError('Mismatched mac')
shutil.move(temp_output_file.name, dest_path + file_name)
def upload(self, filename, dest=None, dest_filename=None): def upload(self, filename, dest=None, dest_filename=None):
# determine storage node # determine storage node

View file

@ -28,6 +28,17 @@ def mega(folder_name):
mega_.destroy(node_id) mega_.destroy(node_id)
@pytest.fixture
def uploaded_file(mega, folder_name):
folder = mega.find(folder_name)
dest_node_id = folder[1]['h']
mega.upload(
__file__, dest=dest_node_id, dest_filename='test.py'
)
path = f'{folder_name}/test.py'
return mega.find(path)
def test_mega(mega): def test_mega(mega):
assert isinstance(mega, Mega) assert isinstance(mega, Mega)
@ -56,11 +67,9 @@ def test_get_files(mega):
assert isinstance(files, dict) assert isinstance(files, dict)
def test_get_link(mega): def test_get_link(mega, uploaded_file):
file = mega.find(TEST_FILE) link = mega.get_link(uploaded_file)
if file: assert isinstance(link, str)
link = mega.get_link(file)
assert isinstance(link, str)
class TestExport: class TestExport:
@ -171,18 +180,31 @@ def test_delete_folder(mega, folder_name):
assert isinstance(resp, int) assert isinstance(resp, int)
def test_delete(mega): def test_delete(mega, uploaded_file):
file = mega.find(TEST_FILE) resp = mega.delete(uploaded_file[0])
if file: assert isinstance(resp, int)
resp = mega.delete(file[0])
assert isinstance(resp, int)
def test_destroy(mega): def test_destroy(mega, uploaded_file):
file = mega.find(TEST_FILE) resp = mega.destroy(uploaded_file[0])
if file: assert isinstance(resp, int)
resp = mega.destroy(file[0])
assert isinstance(resp, int)
def test_download(mega, tmpdir, folder_name):
# Upload a single file into a folder
folder = mega.find(folder_name)
dest_node_id = folder[1]['h']
mega.upload(
__file__, dest=dest_node_id, dest_filename='test.py'
)
path = f'{folder_name}/test.py'
file = mega.find(path)
output_path = mega.download(
file=file, dest_path=tmpdir, dest_filename='test.py'
)
assert output_path.exists()
def test_empty_trash(mega): def test_empty_trash(mega):