Skip to content

Commit

Permalink
Feature implementation, docs, and tests for picklepete#467
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Edwards committed Nov 13, 2024
1 parent 622cd16 commit aefd5e4
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 3 deletions.
30 changes: 30 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,36 @@ The ``upload`` method can be used to send a file-like object to the iCloud Drive
It is strongly suggested to open file handles as binary rather than text to prevent decoding errors
further down the line.

You can also interact with files in the ``trash``:

.. code-block:: pycon
>>> delete_output = api.drive['Holiday Photos']['2013']['Sicily']['DSC08116.JPG'].delete()
>>> api.drive.trash.dir()
['DSC08116.JPG']
>>> delete_output = api.drive['Holiday Photos']['2013']['Sicily']['DSC08117.JPG'].delete()
>>> api.drive.trash.dir()
['DSC08116.JPG', 'DSC08117.JPG']
You can interact with the ``trash`` similar to a standard directory, with some restrictions. In addition, files in the ``trash`` can be recovered back to their original location, or deleted forever:

.. code-block:: pycon
>>> api.drive['Holiday Photos']['2013']['Sicily'].dir()
[]
>>> recover_output = api.drive.trash['DSC08116.JPG'].recover()
>>> api.drive['Holiday Photos']['2013']['Sicily'].dir()
['DSC08116.JPG']
>>> api.drive.trash.dir()
['DSC08117.JPG']
>>> purge_output = api.drive.trash['DSC08117.JPG'].delete_forever()
>>> api.drive.trash.dir()
[]
Photo Library
=======================

Expand Down
108 changes: 106 additions & 2 deletions pyicloud/services/drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import mimetypes
import os
import time
import uuid
from re import search
from requests import Response

Expand All @@ -24,6 +25,7 @@ def __init__(self, service_root, document_root, session, params):
self.session = session
self.params = dict(params)
self._root = None
self._trash = None

def _get_token_from_cookie(self):
for cookie in self.session.cookies:
Expand Down Expand Up @@ -51,6 +53,17 @@ def get_node_data(self, node_id):
self._raise_if_error(request)
return request.json()[0]

def custom_request(self, method, path, data=None):
"""Raw function to allow for custom requests"""
request = self.session.request(
method,
self._service_root + f"/{path}",
params=self.params,
data=json.dumps(data) if data else None,
)
self._raise_if_error(request)
return request.json()

def get_file(self, file_id, **kwargs):
"""Returns iCloud Drive file."""
file_params = dict(self.params)
Expand Down Expand Up @@ -216,13 +229,68 @@ def move_items_to_trash(self, node_id, etag):
self._raise_if_error(request)
return request.json()

def recover_items_from_trash(self, node_id, etag):
"""Restores an iCloud Drive node from the trash bin"""
request = self.session.post(
self._service_root + "/putBackItemsFromTrash",
params=self.params,
data=json.dumps(
{
"items": [
{
"drivewsid": node_id,
"etag": etag
}
],
}
),
)
self._raise_if_error(request)
return request.json()

def delete_forever_from_trash(self, node_id, etag):
"""Permanently deletes an iCloud Drive node from the trash bin"""
request = self.session.post(
self._service_root + "/deleteItems",
params=self.params,
data=json.dumps(
{
"items": [
{
"drivewsid": node_id,
"etag": etag
}
],
}
),
)
self._raise_if_error(request)
return request.json()

@property
def root(self):
"""Returns the root node."""
if not self._root:
self._root = DriveNode(self, self.get_node_data("root"))
return self._root

@property
def trash(self):
"""Returns the trash node."""
if not self._trash:
self._trash = DriveNode(self, self.get_node_data("TRASH_ROOT"))
return self._trash

def refresh_root(self):
"""Refreshes and returns a fresh root node."""
self._root = DriveNode(self, self.get_node_data("root"))
return self._root

def refresh_trash(self):
"""Refreshes and returns a fresh trash node."""
self._trash = DriveNode(self, self.get_node_data("TRASH_ROOT"))
return self._trash

def __getattr__(self, attr):
return getattr(self.root, attr)

Expand All @@ -249,14 +317,29 @@ def __init__(self, conn, data):
@property
def name(self):
"""Gets the node name."""
# check if name is undefined, return drivewsid instead if so.
node_name = self.data.get("name")
if not node_name:
# use drivewsid as name if no name present.
node_name = self.data.get("drivewsid")
# Clean up well-known drivewsid names
if node_name == "FOLDER::com.apple.CloudDocs::root":
node_name = "root"
# if no name still, return unknown string.
if not node_name:
node_name = "<UNKNOWN>"

if "extension" in self.data:
return "{}.{}".format(self.data["name"], self.data["extension"])
return self.data["name"]
return "{}.{}".format(node_name, self.data["extension"])
return node_name

@property
def type(self):
"""Gets the node type."""
node_type = self.data.get("type")
# handle trash which has no node type
if not node_type and self.data.get("drivewsid") == "TRASH_ROOT":
node_type = "trash"
return node_type and node_type.lower()

def get_children(self):
Expand Down Expand Up @@ -330,6 +413,27 @@ def delete(self):
self.data["drivewsid"], self.data["etag"]
)

def recover(self):
"""Recovers an iCloud Drive item from trash."""
# check to ensure item is in the trash - it should have a "restorePath" property
if self.data.get("restorePath"):
return self.connection.recover_items_from_trash(
self.data["drivewsid"], self.data["etag"]
)
else:
raise ValueError(f"'{self.name}' does not appear to be in the Trash.")

def delete_forever(self):
"""Permanently deletes an iCloud Drive item from trash."""
# check to ensure item is in the trash - it should have a "restorePath" property
if self.data.get("restorePath"):
return self.connection.delete_forever_from_trash(
self.data["drivewsid"], self.data["etag"]
)
else:
raise ValueError(f"'{self.name}' does not appear to be in the Trash. Please 'delete()' it first before "
f"trying to 'delete_forever()'.")

def get(self, name):
"""Gets the node child."""
if self.type == "file":
Expand Down
26 changes: 26 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
DRIVE_SUBFOLDER_WORKING,
DRIVE_ROOT_WORKING,
DRIVE_FILE_DOWNLOAD_WORKING,
DRIVE_TRASH_WORKING,
DRIVE_TRASH_RECOVER_WORKING,
DRIVE_TRASH_DELETE_FOREVER_WORKING
)
from .const_findmyiphone import FMI_FAMILY_WORKING

Expand Down Expand Up @@ -133,6 +136,8 @@ def request(self, method, url, **kwargs):
return ResponseMock(DRIVE_ROOT_WORKING)
if data[0].get("drivewsid") == "FOLDER::com.apple.CloudDocs::documents":
return ResponseMock(DRIVE_ROOT_INVALID)
if data[0].get("drivewsid") == "FOLDER::com.apple.CloudDocs::TRASH_ROOT":
return ResponseMock(DRIVE_TRASH_WORKING)
if (
data[0].get("drivewsid")
== "FOLDER::com.apple.CloudDocs::1C7F1760-D940-480F-8C4F-005824A4E05B"
Expand All @@ -143,6 +148,27 @@ def request(self, method, url, **kwargs):
== "FOLDER::com.apple.CloudDocs::D5AA0425-E84F-4501-AF5D-60F1D92648CF"
):
return ResponseMock(DRIVE_SUBFOLDER_WORKING)

# Drive Trash Recover
if (
"putBackItemsFromTrash" in url
and method == "POST"
and data.get("items")[0].get("drivewsid")
):
if (data.get("items")[0].get("drivewsid") ==
"FOLDER::com.apple.CloudDocs::2BF8600B-5DCC-4421-805A-1C28D07197D5"):
return ResponseMock(DRIVE_TRASH_RECOVER_WORKING)

# Drive Trash Delete Forever
if (
"deleteItems" in url
and method == "POST"
and data.get("items")[0].get("drivewsid")
):
if (data.get("items")[0].get("drivewsid") ==
"FOLDER::com.apple.CloudDocs::478AEA23-42A2-468A-ABC1-1A04BC07F738"):
return ResponseMock(DRIVE_TRASH_DELETE_FOREVER_WORKING)

# Drive download
if "com.apple.CloudDocs/download/by_id" in url and method == "GET":
if params.get("document_id") == "516C896C-6AA5-4A30-B30E-5502C2333DAE":
Expand Down
Loading

0 comments on commit aefd5e4

Please sign in to comment.