Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3595][Core] add ability to download files from daemon #443

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 87 additions & 15 deletions deluge/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import glob
import logging
import os
import random
import shutil
import string
import tempfile
from base64 import b64decode, b64encode
from typing import Any, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -46,6 +48,7 @@
InvalidTorrentError,
)
from deluge.event import (
CallbackHandlingEvent,
NewVersionAvailableEvent,
SessionPausedEvent,
SessionResumedEvent,
Expand Down Expand Up @@ -508,6 +511,84 @@ async def add_torrents():

return task.deferLater(reactor, 0, add_torrents)

@maybe_coroutine
async def _download_file(
self,
url,
callback=None,
headers=None,
allow_compression=True,
handle_redirects=True,
) -> bytes:
tmp_fd, tmp_file = tempfile.mkstemp(prefix='deluge_url.')
try:
filename = await download_file(
url=url,
filename=tmp_file,
callback=callback,
headers=headers,
force_filename=True,
allow_compression=allow_compression,
handle_redirects=handle_redirects,
)
except Exception:
raise
else:
with open(filename, 'rb') as _file:
data = _file.read()
return data
finally:
try:
os.close(tmp_fd)
os.remove(tmp_file)
except OSError as ex:
log.warning(f'Unable to delete temp file {tmp_file}: , {ex}')

@export
@maybe_coroutine
async def download_file(
self,
url,
callback=None,
headers=None,
allow_compression=True,
handle_redirects=True,
) -> 'defer.Deferred[Optional[bytes]]':
"""Downloads a file from a URL and returns the content as bytes.
Use this method to download from the daemon itself (like a proxy).
Args:
url (str): The url to download from.
callback (func, str): A function to be called when partial data is received,
it's signature should be: func(data, current_length, total_length).
headers (dict): Any optional headers to send.
allow_compression (bool): Allows gzip & deflate decoding.
handle_redirects (bool): HTTP redirects handled automatically or not.
Returns:
a Deferred which returns the content as bytes or None
"""
log.info(f'Attempting to download URL {url}')

if isinstance(callback, str):
original_callback = callback

def emit(*args, **kwargs):
component.get('EventManager').emit(
CallbackHandlingEvent(original_callback, *args, **kwargs)
)

callback = emit

try:
return await self._download_file(
url, callback, headers, allow_compression, handle_redirects
)
except Exception:
log.error(f'Failed to download file from URL {url}')
raise

@export
@maybe_coroutine
async def add_torrent_url(
Expand All @@ -524,26 +605,17 @@ async def add_torrent_url(
Returns:
a Deferred which returns the torrent_id as a str or None
"""
log.info('Attempting to add URL %s', url)
log.info(f'Attempting to add URL {url}')

tmp_fd, tmp_file = tempfile.mkstemp(prefix='deluge_url.', suffix='.torrent')
try:
filename = await download_file(
url, tmp_file, headers=headers, force_filename=True
)
data = await self._download_file(url, headers=headers)
except Exception:
log.error('Failed to add torrent from URL %s', url)
log.error(f'Failed to add torrent from URL {url}')
raise
else:
with open(filename, 'rb') as _file:
data = _file.read()
return self.add_torrent_file(filename, b64encode(data), options)
finally:
try:
os.close(tmp_fd)
os.remove(tmp_file)
except OSError as ex:
log.warning(f'Unable to delete temp file {tmp_file}: , {ex}')
chars = string.ascii_letters + string.digits
tmp_file_name = ''.join(random.choices(chars, k=7))
return self.add_torrent_file(tmp_file_name, b64encode(data), options)

@export
def add_torrent_magnet(self, uri: str, options: dict) -> str:
Expand Down
5 changes: 5 additions & 0 deletions deluge/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,8 @@ def __init__(self, external_ip):
external_ip (str): The IP address.
"""
self._args = [external_ip]


class CallbackHandlingEvent(DelugeEvent):
def __init__(self, callback, *args, **kwargs):
self._args = [callback, args, kwargs]
3 changes: 1 addition & 2 deletions deluge/plugins/Blocklist/deluge_blocklist/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import deluge.configmanager
from deluge.common import is_url
from deluge.core.rpcserver import export
from deluge.httpdownloader import download_file
from deluge.plugins.pluginbase import CorePluginBase

from .common import IP, BadIP
Expand Down Expand Up @@ -326,7 +325,7 @@ def on_retrieve_data(data, current_length, total_length):
log.debug('Attempting to download blocklist %s', url)
log.debug('Sending headers: %s', headers)
self.is_downloading = True
return download_file(
return self.core.download_file(
url,
deluge.configmanager.get_config_dir('blocklist.download'),
on_retrieve_data,
Expand Down
54 changes: 54 additions & 0 deletions deluge/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,3 +509,57 @@ def test_create_torrent(self, path, tmp_path, piece_length):
assert f.read() == filecontent

lt.torrent_info(filecontent)

@pytest.fixture
def _download_file_content(self):
with open(
common.get_test_data_file('ubuntu-9.04-desktop-i386.iso.torrent'), 'rb'
) as _file:
data = _file.read()
return data

@pytest_twisted.inlineCallbacks
def test_download_file(self, mock_mkstemp, _download_file_content):
url = (
f'http://localhost:{self.listen_port}/ubuntu-9.04-desktop-i386.iso.torrent'
)

file_content = yield self.core.download_file(url)
assert file_content == _download_file_content
assert not os.path.isfile(mock_mkstemp[1])

async def test_download_file_with_cookie(self, _download_file_content):
url = f'http://localhost:{self.listen_port}/cookie'
headers = {'Cookie': 'password=deluge'}

with pytest.raises(Exception):
await self.core.download_file(url)

file_content = await self.core.download_file(url, headers=headers)
assert file_content == _download_file_content

async def test_download_file_with_redirect(self, _download_file_content):
url = f'http://localhost:{self.listen_port}/redirect'

with pytest.raises(Exception):
await self.core.download_file(url, handle_redirects=False)

file_content = await self.core.download_file(url)
assert file_content == _download_file_content

async def test_download_file_with_callback(self, _download_file_content):
url = (
f'http://localhost:{self.listen_port}/ubuntu-9.04-desktop-i386.iso.torrent'
)
called_callback = False
data_valid = False

def on_retrieve_data(data, current_length, total_length):
nonlocal called_callback, data_valid
data_valid |= data in _download_file_content
called_callback = True

file_content = await self.core.download_file(url, callback=on_retrieve_data)
assert file_content == _download_file_content
assert data_valid
assert called_callback
12 changes: 12 additions & 0 deletions deluge/tests/test_metafile.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ def test_save_multifile(self):
os.close(tmp_fd)
os.remove(tmp_file)

def test_save_empty_file(self):
with tempfile.TemporaryDirectory() as tmp_dir:
with open(tmp_dir + '/empty', 'wb') as tmp_file:
pass
with open(tmp_dir + '/file', 'wb') as tmp_file:
tmp_file.write(b'c' * (11 * 1024))

tmp_torrent = tmp_dir + '/test.torrent'
metafile.make_meta_file(tmp_dir, '', 32768, target=tmp_torrent)

check_torrent(tmp_torrent)

def test_save_singlefile(self):
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_data = tmp_dir + '/testdata'
Expand Down
39 changes: 34 additions & 5 deletions deluge/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,45 @@ def _handle_complete_message(self, data):
:param data: a zlib compressed string encoded with rencode.
"""

def log_exception(exception):
log.warning(
'Failed to decompress (%d bytes) and load serialized data with rencode: %s',
len(data),
exception,
)

def build_part(part):
if isinstance(part, bytes):
try:
new_part = part.decode('UTF-8')
except UnicodeDecodeError:
new_part = part
elif isinstance(part, dict):
new_part = {}
for k, v in part.items():
new_part[build_part(k)] = build_part(v)
elif isinstance(part, list):
new_part = [build_part(i) for i in part]
elif isinstance(part, tuple):
new_part = [build_part(i) for i in part]
new_part = tuple(new_part)
else:
new_part = part
return new_part

try:
self.message_received(
rencode.loads(zlib.decompress(data), decode_utf8=True)
)
except UnicodeDecodeError:
try:
decoded_data = rencode.loads(zlib.decompress(data))
self.message_received(build_part(decoded_data))
except Exception as ex:
log_exception(ex)
except Exception as ex:
log.warning(
'Failed to decompress (%d bytes) and load serialized data with rencode: %s',
len(data),
ex,
)
log_exception(ex)

def get_bytes_recv(self):
"""
Expand Down
36 changes: 35 additions & 1 deletion deluge/ui/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def message_received(self, request):
return
if len(request) < 3:
log.debug(
'Received invalid message: number of items in ' 'response is %s',
'Received invalid message: number of items in response is %s',
len(request),
)
return
Expand Down Expand Up @@ -556,6 +556,7 @@ class Client:
"""

__event_handlers = {}
__callback_handlers = {}

def __init__(self):
self._daemon_proxy = None
Expand Down Expand Up @@ -598,6 +599,9 @@ def on_connect_fail(reason):

def on_authenticate(result, daemon_info):
log.debug('Authentication successful: %s', result)
self.register_event_handler(
'CallbackHandlingEvent', self._handle_callback_event
)
return result

def on_authenticate_fail(reason):
Expand All @@ -624,6 +628,10 @@ def disconnect(self):
"""
Disconnects from the daemon.
"""
self.deregister_event_handler(
'CallbackHandlingEvent', self._handle_callback_event
)

if self.is_standalone():
self._daemon_proxy.disconnect()
self.stop_standalone()
Expand Down Expand Up @@ -790,6 +798,32 @@ def deregister_event_handler(self, event, handler):
if self._daemon_proxy:
self._daemon_proxy.deregister_event_handler(event, handler)

@staticmethod
def register_callback_handler(callback_id: str, callback: callable):
"""
Registers a callback handler for supported methods on the daemon.
Args:
callback_id: a unique identifier for the callback
callback: the callback function to call
"""
Client.__callback_handlers[callback_id] = callback

@staticmethod
def deregister_callback_handler(callback_id: str):
"""
Deregisters a callback handler
Args:
callback_id: the identifier to remove
"""
if callback_id in Client.__callback_handlers:
Client.__callback_handlers.pop(callback_id)

def _handle_callback_event(self, callback_id, *args):
if callback_id in self.__callback_handlers:
self.__callback_handlers[callback_id](*(args[0]), **(args[1]))

def force_call(self, block=False):
# no-op for now.. we'll see if we need this in the future
pass
Expand Down
Loading
Loading