Skip to content
This repository has been archived by the owner on Sep 25, 2019. It is now read-only.

Http request #10

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build
*.pyc
.idea
4 changes: 2 additions & 2 deletions asyncdynamo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/env python
#
#
# Copyright 2010 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
Expand All @@ -25,7 +25,7 @@
raise ImportError("tornado library not installed. Install tornado. https://github.com/facebook/tornado")
try:
import boto
assert tuple(map(int,boto.Version.split('.'))) >= (2,3,0), "Boto >= 2.3.0 required."
assert tuple(map(int,boto.Version.split('.'))) >= (2,39,0), "Boto >= 2.39.0 required."
except ImportError:
raise ImportError("boto library not installed. Install boto. https://github.com/boto/boto")

Expand Down
24 changes: 13 additions & 11 deletions asyncdynamo/async_aws_sts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/env python
#
#
# Copyright 2012 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
Expand Down Expand Up @@ -38,12 +38,12 @@ class AsyncAwsSts(STSConnection):
'''
Class that manages session tokens. Users of AsyncDynamoDB should not
need to worry about what goes on here.

Usage: Keep an instance of this class (though it should be cheap to
re instantiate) and periodically call get_session_token to get a new
Credentials object when, say, your session token expires
'''

def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None, debug=0,
Expand All @@ -55,24 +55,24 @@ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
proxy_user, proxy_pass, debug,
https_connection_factory, region, path, converter)
self.http_client = AsyncHTTPClient(io_loop=ioloop)

def get_session_token(self, callback):
'''
Gets a new Credentials object with a session token, using this
instance's aws keys. Callback should operate on the new Credentials obj,
or else a boto.exception.BotoServerError
'''
return self.get_object('GetSessionToken', {}, Credentials, verb='POST', callback=callback)

def get_object(self, action, params, cls, path="/", parent=None, verb="GET", callback=None):
'''
Get an instance of `cls` using `action`
'''
if not parent:
parent = self
self.make_request(action, params, path, verb,
self.make_request(action, params, path, verb,
functools.partial(self._finish_get_object, callback=callback, parent=parent, cls=cls))

def _finish_get_object(self, response_body, callback, cls=None, parent=None, error=None):
'''
Process the body returned by STS. If an error is present, convert from a tornado error
Expand All @@ -88,27 +88,29 @@ def _finish_get_object(self, response_body, callback, cls=None, parent=None, err
h = boto.handler.XmlHandler(obj, parent)
xml.sax.parseString(response_body, h)
return callback(obj)

def make_request(self, action, params={}, path='/', verb='GET', callback=None):
'''
Make an async request. This handles the logic of translating from boto params
to a tornado request obj, issuing the request, and passing back the body.

The callback should operate on the body of the response, and take an optional
error argument that will be a tornado error
'''
request = HTTPRequest('https://%s' % self.host,
request = HTTPRequest('https://%s' % self.host,
method=verb)
request.params = params
request.auth_path = '/' # need this for auth
request.host = self.host # need this for auth
request.port = 443
request.protocol = self.protocol
if action:
request.params['Action'] = action
if self.APIVersion:
request.params['Version'] = self.APIVersion
self._auth_handler.add_auth(request) # add signature
self.http_client.fetch(request, functools.partial(self._finish_make_request, callback=callback))

def _finish_make_request(self, response, callback):
if response.error:
return callback(response.body, error=response.error)
Expand Down
107 changes: 66 additions & 41 deletions asyncdynamo/asyncdynamo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/env python
#
#
# Copyright 2012 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
Expand Down Expand Up @@ -28,10 +28,11 @@
from collections import deque
import time
import logging
from urlparse import urlparse

from boto.connection import AWSAuthConnection
from boto.exception import DynamoDBResponseError
from boto.auth import HmacAuthV3HTTPHandler
from boto.auth import HmacAuthV4Handler
from boto.provider import Provider

from async_aws_sts import AsyncAwsSts, InvalidClientTokenIdError
Expand All @@ -41,12 +42,12 @@
class AsyncDynamoDB(AWSAuthConnection):
"""
The main class for asynchronous connections to DynamoDB.

The user should maintain one instance of this class (though more than one is ok),
parametrized with the user's access key and secret key. Make calls with make_request
or the helper methods, and AsyncDynamoDB will maintain session tokens in the background.


As in Boto Layer1:
"This is the lowest-level interface to DynamoDB. Methods at this
layer map directly to API requests and parameters to the methods
Expand All @@ -55,52 +56,72 @@ class AsyncDynamoDB(AWSAuthConnection):
All responses are direct decoding of the JSON response bodies to
Python data structures via the json or simplejson modules."
"""

DefaultHost = 'dynamodb.us-east-1.amazonaws.com'
"""The default DynamoDB API endpoint to connect to."""

ServiceName = 'DynamoDB'
"""The name of the Service"""
Version = '20111205'

Version = '20120810'
"""DynamoDB API version."""

ThruputError = "ProvisionedThroughputExceededException"
"""The error response returned when provisioned throughput is exceeded"""

ExpiredSessionError = 'com.amazon.coral.service#ExpiredTokenException'
"""The error response returned when session token has expired"""

UnrecognizedClientException = 'com.amazon.coral.service#UnrecognizedClientException'
'''Another error response that is possible with a bad session token'''

def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
host=None, debug=0, session_token=None,
host=None, debug=0, session_token=None, endpoint=None,
authenticate_requests=True, validate_cert=True, max_sts_attempts=3, ioloop=None):
if not host:
host = self.DefaultHost
if endpoint is not None:
self.url = endpoint
parse_url = urlparse(self.url)
self.host = parse_url.hostname
self.port = parse_url.port
self.protocol = parse_url.scheme
else:
self.protocol = 'https' if is_secure else 'http'
self.host = host
self.port = port

url = '{0}://{1}'.format(self.protocol, self.host)

if self.port:
url += ':{}'.format(self.port)

self.url = url
self.validate_cert = validate_cert
self.authenticate_requests = authenticate_requests
AWSAuthConnection.__init__(self, host,
self.authenticate_requests = authenticate_requests
AWSAuthConnection.__init__(self, self.host,
aws_access_key_id,
aws_secret_access_key,
is_secure, port, proxy, proxy_port,
debug=debug, security_token=session_token)
is_secure, self.port, proxy, proxy_port,
debug=debug, security_token=session_token,
validate_certs=self.validate_cert)
self.ioloop = ioloop or IOLoop.instance()
self.http_client = AsyncHTTPClient(io_loop=self.ioloop)
self.pending_requests = deque()
self.sts = AsyncAwsSts(aws_access_key_id, aws_secret_access_key, ioloop=self.ioloop)
self.sts = AsyncAwsSts(aws_access_key_id,
aws_secret_access_key,
is_secure, self.port, proxy, proxy_port)
assert (isinstance(max_sts_attempts, int) and max_sts_attempts >= 0)
self.max_sts_attempts = max_sts_attempts

def _init_session_token_cb(self, error=None):
if error:
logging.warn("Unable to get session token: %s" % error)

def _required_auth_capability(self):
return ['hmac-v3-http']
return ['hmac-v4']

def _update_session_token(self, callback, attempts=0, bypass_lock=False):
'''
Begins the logic to get a new session token. Performs checks to ensure
Expand All @@ -113,14 +134,14 @@ def _update_session_token(self, callback, attempts=0, bypass_lock=False):
self.provider.security_token = PENDING_SESSION_TOKEN_UPDATE # invalidate the current security token
return self.sts.get_session_token(
functools.partial(self._update_session_token_cb, callback=callback, attempts=attempts))

def _update_session_token_cb(self, creds, provider='aws', callback=None, error=None, attempts=0):
'''
Callback to use with `async_aws_sts`. The 'provider' arg is a bit misleading,
it is a relic from boto and should probably be left to its default. This will
take the new Credentials obj from `async_aws_sts.get_session_token()` and use
it to update self.provider, and then will clear the deque of pending requests.

A callback is optional. If provided, it must be callable without any arguments,
but also accept an optional error argument that will be an instance of BotoServerError.
'''
Expand Down Expand Up @@ -151,21 +172,21 @@ def raise_error():
creds.secret_key,
creds.session_token)
# force the correct auth, with the new provider
self._auth_handler = HmacAuthV3HTTPHandler(self.host, None, self.provider)
self._auth_handler = HmacAuthV4Handler(self.host, None, self.provider)
while self.pending_requests:
request = self.pending_requests.pop()
request()
if callable(callback):
return callback()

def make_request(self, action, body='', callback=None, object_hook=None):
'''
Make an asynchronous HTTP request to DynamoDB. Callback should operate on
the decoded json response (with object hook applied, of course). It should also
accept an error argument, which will be a boto.exception.DynamoDBResponseError.

If there is not a valid session token, this method will ensure that a new one is fetched
and cache the request when it is retrieved.
and cache the request when it is retrieved.
'''
this_request = functools.partial(self.make_request, action=action,
body=body, callback=callback,object_hook=object_hook)
Expand All @@ -187,18 +208,22 @@ def cb_for_update(error=None):
self.Version, action),
'Content-Type' : 'application/x-amz-json-1.0',
'Content-Length' : str(len(body))}
request = HTTPRequest('https://%s' % self.host,
request = HTTPRequest(self.url,
method='POST',
headers=headers,
body=body,
validate_cert=self.validate_cert)
request.path = '/' # Important! set the path variable for signing by boto (<2.7). '/' is the path for all dynamodb requests
request.auth_path = '/' # Important! set the auth_path variable for signing by boto(>2.7). '/' is the path for all dynamodb requests
request.params = {}
request.port = self.port
request.protocol = self.protocol
request.host = self.host
if self.authenticate_requests:
self._auth_handler.add_auth(request) # add signature to headers of the request
self.http_client.fetch(request, functools.partial(self._finish_make_request,
callback=callback, orig_request=this_request, token_used=self.provider.security_token, object_hook=object_hook)) # bam!

def _finish_make_request(self, response, callback, orig_request, token_used, object_hook=None):
'''
Check for errors and decode the json response (in the tornado response body), then pass on to orig callback.
Expand All @@ -219,7 +244,7 @@ def _finish_make_request(self, response, callback, orig_request, token_used, obj
return orig_request() # make_request will handle logic to get a new token if needed, and queue until it is fetched
else:
# because some errors are benign, include the response when an error is passed
return callback(json_response, error=DynamoDBResponseError(response.error.code,
return callback(json_response, error=DynamoDBResponseError(response.error.code,
response.error.message, json_response))

if json_response is None:
Expand All @@ -233,10 +258,10 @@ def get_item(self, table_name, key, callback, attributes_to_get=None,
'''
Return a set of attributes for an item that matches
the supplied key.

The callback should operate on a dict representing the decoded
response from DynamoDB (using the object_hook, if supplied)

:type table_name: str
:param table_name: The name of the table to delete.

Expand All @@ -261,12 +286,12 @@ def get_item(self, table_name, key, callback, attributes_to_get=None,
data['ConsistentRead'] = True
return self.make_request('GetItem', body=json.dumps(data),
callback=callback, object_hook=object_hook)

def batch_get_item(self, request_items, callback):
"""
Return a set of attributes for a multiple items in
multiple tables using their primary keys.

The callback should operate on a dict representing the decoded
response from DynamoDB (using the object_hook, if supplied)

Expand All @@ -277,7 +302,7 @@ def batch_get_item(self, request_items, callback):
data = {'RequestItems' : request_items}
json_input = json.dumps(data)
self.make_request('BatchGetItem', json_input, callback)

def put_item(self, table_name, item, callback, expected=None, return_values=None, object_hook=None):
'''
Create a new item or replace an old item with a new
Expand All @@ -286,7 +311,7 @@ def put_item(self, table_name, item, callback, expected=None, return_values=None
key, the new item will completely replace the old item.
You can perform a conditional put by specifying an
expected rule.

The callback should operate on a dict representing the decoded
response from DynamoDB (using the object_hook, if supplied)

Expand All @@ -306,7 +331,7 @@ def put_item(self, table_name, item, callback, expected=None, return_values=None
name-value pairs before then were changed. Possible
values are: None or 'ALL_OLD'. If 'ALL_OLD' is
specified and the item is overwritten, the content
of the old item is returned.
of the old item is returned.
'''
data = {'TableName' : table_name,
'Item' : item}
Expand All @@ -317,7 +342,7 @@ def put_item(self, table_name, item, callback, expected=None, return_values=None
json_input = json.dumps(data)
return self.make_request('PutItem', json_input, callback=callback,
object_hook=object_hook)

def query(self, table_name, hash_key_value, callback, range_key_conditions=None,
attributes_to_get=None, limit=None, consistent_read=False,
scan_index_forward=True, exclusive_start_key=None,
Expand All @@ -326,7 +351,7 @@ def query(self, table_name, hash_key_value, callback, range_key_conditions=None,
Perform a query of DynamoDB. This version is currently punting
and expecting you to provide a full and correct JSON body
which is passed as is to DynamoDB.

The callback should operate on a dict representing the decoded
response from DynamoDB (using the object_hook, if supplied)

Expand Down
Loading