-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
20 changed files
with
422 additions
and
857 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
from email.parser import BytesParser | ||
from email.policy import default | ||
from .payload import Payload | ||
|
||
class MultipartParser: | ||
def __init__(self, multipart_bytes, content_type): | ||
self.multipart_bytes = multipart_bytes | ||
self.content_type = content_type | ||
self.parts = {} | ||
self.parse() | ||
|
||
def parse(self): | ||
# Create a message object | ||
headers = f'Content-Type: {self.content_type}\r\n\r\n'.encode('ascii') | ||
msg = BytesParser(policy=default).parsebytes(headers + self.multipart_bytes) | ||
|
||
# Process each part | ||
for part in msg.walk(): | ||
if part.is_multipart(): | ||
continue | ||
|
||
# Get the name from Content-Disposition | ||
content_disposition = part.get("Content-Disposition", "") | ||
name = part.get_param("name", header="content-disposition") | ||
if not name: | ||
name = f"unnamed_part_{len(self.parts)}" | ||
|
||
# Store the parsed data | ||
self.parts[name] = { | ||
"contents": part.get_payload(decode=True), | ||
"headers": dict(part.items()) | ||
} | ||
|
||
def to_dict(self): | ||
result = {} | ||
for name, part in self.parts.items(): | ||
if name == "responseBody": | ||
result[name] = Payload.from_binary(part["contents"]) | ||
elif name == "responseHeaders": | ||
headers_str = part["contents"].decode('utf-8', errors='replace') | ||
result[name] = dict(line.split(": ", 1) for line in headers_str.split("\r\n") if line) | ||
elif name == "responseStatusCode": | ||
result[name] = int(part["contents"]) | ||
elif name == "duration": | ||
result[name] = float(part["contents"]) | ||
else: | ||
result[name] = part["contents"].decode('utf-8', errors='replace') | ||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
from typing import Optional, Dict, Any | ||
import os, json | ||
|
||
class Payload: | ||
size: int | ||
filename: Optional[str] = None | ||
|
||
_path: Optional[str] = None | ||
_data: Optional[bytes] = None | ||
|
||
def __init__(self, path: Optional[str] = None, data: Optional[bytes] = None, filename: Optional[str] = None): | ||
if not path and not data: | ||
raise ValueError("One of path or data must be provided") | ||
|
||
self._path = path | ||
self._data = data | ||
|
||
self.filename = filename | ||
if not self._data: | ||
self.size = os.path.getsize(self._path) | ||
else: | ||
self.size = len(self._data) | ||
|
||
def to_binary(self, offset: Optional[int] = 0, length: Optional[int] = None) -> bytes: | ||
if not length: | ||
length = self.size | ||
|
||
if not self._data: | ||
with open(self._path, 'rb') as f: | ||
f.seek(offset) | ||
return f.read(length) | ||
|
||
return self._data[offset:offset + length] | ||
|
||
def to_string(self) -> str: | ||
return str(self.to_binary()) | ||
|
||
def to_json(self) -> Dict[str, Any]: | ||
return json.loads(self.to_string()) | ||
|
||
def to_file(self, path: str) -> None: # in the client SDKs, this is def to_file() -> File: | ||
with open(path, 'wb') as f: | ||
return f.write(self.to_binary()) | ||
|
||
@classmethod | ||
def from_binary(cls, data: bytes, filename: Optional[str] = None) -> 'Payload': | ||
return cls(data=data, filename=filename) | ||
|
||
@classmethod | ||
def from_string(cls, data: str) -> 'Payload': | ||
return cls(data=data.encode()) | ||
|
||
@classmethod | ||
def from_file(cls, path: str, filename: Optional[str] = None) -> 'Payload': | ||
if not os.path.exists(path): | ||
raise FileNotFoundError(f"File {path} not found") | ||
if not filename: | ||
filename = os.path.basename(path) | ||
return cls(path=path, filename=filename) | ||
|
||
@classmethod | ||
def from_json(cls, json: Dict[str, Any]) -> 'Payload': | ||
return cls(data=json.dumps(json)) |
Oops, something went wrong.