The boto library is the python bindings for working with AWS. So much that it even powers the AWS CLI itself. In this guide we'll take a look at some of the inner workings of boto.
boto and boto3
Originally boto was one big monolith package. For historical purposes you can find the original source code on GitHub. The issue that the original had was containing logic for every service known. When AWS really started to expand it became too much of a challenge to maintain every service.
Then comes in boto3. One of the first changes is that much of the low level logic was moved over to the botocore project. Instead of declaring all the service logic in python, services are abstracted to JSON data files. The boto3 library then deals with some exceptions which aren't well handled by service file abstraction such as batch writing in DynamoDB.
Service JSON
The actual service JSON files are generally comprised of:
- service JSON
- paginator JSON
- waiter JSON
To start off with the services. SQS is a pretty simple API so I'll use that for this example starting with the CreateQueue
call:
CreateQueue":{
"name":"CreateQueue",
"http":{
"method":"POST",
"requestUri":"/"
},
"input":{"shape":"CreateQueueRequest"},
"output":{
"shape":"CreateQueueResult",
"resultWrapper":"CreateQueueResult"
},
"errors":[
{"shape":"QueueDeletedRecently"},
{"shape":"QueueNameExists"}
],
"documentation":"<p>Creates a new standard or FIFO queue. You can pass one or more attributes in the request. Keep the following in mind:</p> <ul> <li> <p>If you don't specify the <code>FifoQueue</code> attribute, Amazon SQS creates a standard queue.</p> <note> <p>You can't change the queue type after you create it and you can't convert an existing standard queue into a FIFO queue. You must either create a new FIFO queue for your application or delete your <snip>"
},
Interestingly enough if you check the documentation for create_queue in boto3's API reference page you can see the contents of the documentation
key is shown. errors
also map to the exceptions shown at the bottom of the page. Essentially a good majority of boto documentation is generated via automation that parses the service files. The shapes part of input and output are the parameters and results structure for the API call. Looking at CreateQueueRequest
:
"CreateQueueRequest":{
"type":"structure",
"required":["QueueName"],
"members":{
"QueueName":{
"shape":"String",
"documentation":"<snip>"
},
"Attributes":{
"shape":"QueueAttributeMap",
"documentation":"<snip>",
"locationName":"Attribute"
},
"tags":{
"shape":"TagMap",
"documentation":"<snip>",
"locationName":"Tag"
}
},
"documentation":"<snip>"
},
The documentation parts are removed due to making the paste really long. There's also a declaration for required parameters (can't create a queue without a queue name). Attributes
also has its as a custom shape:
"QueueAttributeMap":{
"type":"map",
"key":{
"shape":"QueueAttributeName",
"locationName":"Name"
},
"value":{
"shape":"String",
"locationName":"Value"
},
"flattened":true,
"locationName":"Attribute"
},
"QueueAttributeName":{
"type":"string",
"enum":[
"All",
"Policy",
"VisibilityTimeout",
"MaximumMessageSize",
"MessageRetentionPeriod",
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesNotVisible",
"CreatedTimestamp",
"LastModifiedTimestamp",
"QueueArn",
"ApproximateNumberOfMessagesDelayed",
"DelaySeconds",
"ReceiveMessageWaitTimeSeconds",
"RedrivePolicy",
"FifoQueue",
"ContentBasedDeduplication",
"KmsMasterKeyId",
"KmsDataKeyReusePeriodSeconds",
"DeduplicationScope",
"FifoThroughputLimit",
"RedriveAllowPolicy",
"SqsManagedSseEnabled"
]
},
These types generally drill down into a low level type such as String or Integer. Results follow much of the same structure:
"CreateQueueResult":{
"type":"structure",
"members":{
"QueueUrl":{
"shape":"String",
"documentation":"<p>The URL of the created Amazon SQS queue.</p>"
}
},
"documentation":"<p>Returns the <code>QueueUrl</code> attribute of the created queue.</p>"
},
Paginators
This is essentially a collection of list like calls that support pagination. The backend boto call handles token management and results iterations:
{
"pagination": {
"ListDeadLetterSourceQueues": {
"input_token": "NextToken",
"limit_key": "MaxResults",
"output_token": "NextToken",
"result_key": "queueUrls"
},
"ListQueues": {
"input_token": "NextToken",
"limit_key": "MaxResults",
"output_token": "NextToken",
"result_key": "QueueUrls"
}
}
}
boto handles this behind the scenes via the botocore pagination module.
Waiters
As the name suggests, this is a collection of list like API calls which check if a resource is in a specific state. It handles not only the polling but also how much time between each poll and the max retries. For example the ec2 instance running waiter:
"InstanceRunning": {
"delay": 15,
"operation": "DescribeInstances",
"maxAttempts": 40,
"acceptors": [
{
"expected": "running",
"matcher": "pathAll",
"state": "success",
"argument": "Reservations[].Instances[].State.Name"
},
{
"expected": "shutting-down",
"matcher": "pathAny",
"state": "failure",
"argument": "Reservations[].Instances[].State.Name"
},
{
"expected": "terminated",
"matcher": "pathAny",
"state": "failure",
"argument": "Reservations[].Instances[].State.Name"
},
{
"expected": "stopping",
"matcher": "pathAny",
"state": "failure",
"argument": "Reservations[].Instances[].State.Name"
},
{
"matcher": "error",
"expected": "InvalidInstanceID.NotFound",
"state": "retry"
}
]
},
It runs DescribeInstances
and utilizes a JMESPath style query to drill down to the proper attribute in the resulting JSON.
Client
The client code is where a lot of the magic in making boto work happens. Of particular interest is the method mapping code:
def _create_methods(self, service_model):
op_dict = {}
for operation_name in service_model.operation_names:
py_operation_name = xform_name(operation_name)
op_dict[py_operation_name] = self._create_api_method(
py_operation_name, operation_name, service_model
)
return op_dict
def _create_name_mapping(self, service_model):
# py_name -> OperationName, for every operation available
# for a service.
mapping = {}
for operation_name in service_model.operation_names:
py_operation_name = xform_name(operation_name)
mapping[py_operation_name] = operation_name
return mapping
def _create_api_method(
self, py_operation_name, operation_name, service_model
):
def _api_call(self, *args, **kwargs):
# We're accepting *args so that we can give a more helpful
# error message than TypeError: _api_call takes exactly
# 1 argument.
if args:
raise TypeError(
f"{py_operation_name}() only accepts keyword arguments."
)
# The "self" in this scope is referring to the BaseClient.
return self._make_api_call(operation_name, kwargs)
_api_call.__name__ = str(py_operation_name)
# Add the docstring to the client method
operation_model = service_model.operation_model(operation_name)
docstring = ClientMethodDocstring(
operation_model=operation_model,
method_name=operation_name,
event_emitter=self._event_emitter,
method_description=operation_model.documentation,
example_prefix='response = client.%s' % py_operation_name,
include_signature=False,
)
_api_call.__doc__ = docstring
return _api_call
This creates the actual method mapping which links back to the relevant API call in question. Creating the more pythonic version of methods is done via the xform_name
function:
def xform_name(name, sep='_', _xform_cache=_xform_cache):
"""Convert camel case to a "pythonic" name.
If the name contains the ``sep`` character, then it is
returned unchanged.
"""
if sep in name:
# If the sep is in the name, assume that it's already
# transformed and return the string unchanged.
return name
key = (name, sep)
if key not in _xform_cache:
if _special_case_transform.search(name) is not None:
is_special = _special_case_transform.search(name)
matched = is_special.group()
# Replace something like ARNs, ACLs with _arns, _acls.
name = f"{name[: -len(matched)]}{sep}{matched.lower()}"
s1 = _first_cap_regex.sub(r'\1' + sep + r'\2', name)
transformed = _end_cap_regex.sub(r'\1' + sep + r'\2', s1).lower()
_xform_cache[key] = transformed
return _xform_cache[key]
Which takes the camel case DescribeInstances
type calls and makes them lower case _
separated calls such as describe_instances
with a few exceptions.
Request Sending
Despite all the mapping the end result of boto's code still needs to be sending an AWS API call. The boto AWS request module handles this along with some of the more interesting cases such as dealing with HTTP 100 CONTINUE handling. Singing related code can be found in the auth module such as SigV2 auth (which I will add is deprecated):
def calc_signature(self, request, params):
logger.debug("Calculating signature using v2 auth.")
split = urlsplit(request.url)
path = split.path
if len(path) == 0:
path = '/'
string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n"
lhmac = hmac.new(
self.credentials.secret_key.encode("utf-8"), digestmod=sha256
)
pairs = []
for key in sorted(params):
# Any previous signature should not be a part of this
# one, so we skip that particular key. This prevents
# issues during retries.
if key == 'Signature':
continue
value = str(params[key])
quoted_key = quote(key.encode('utf-8'), safe='')
quoted_value = quote(value.encode('utf-8'), safe='-_~')
pairs.append(f'{quoted_key}={quoted_value}')
qs = '&'.join(pairs)
string_to_sign += qs
logger.debug('String to sign: %s', string_to_sign)
lhmac.update(string_to_sign.encode('utf-8'))
b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8')
return (qs, b64)
Which thankfully handles the fun parts like HMAC signing for us. It also covers the even more complex SigV4 calls.
Credentials
With a few minor exceptions you can't do much with the AWS API unless you're authenticated. The credentials module handles this against a variety of sources. It also handles the ever important part of credentials refresh via STS in cases such as AssumeRole
:
def _refresh(self):
# In the common case where we don't need a refresh, we
# can immediately exit and not require acquiring the
# refresh lock.
if not self.refresh_needed(self._advisory_refresh_timeout):
return
# acquire() doesn't accept kwargs, but False is indicating
# that we should not block if we can't acquire the lock.
# If we aren't able to acquire the lock, we'll trigger
# the else clause.
if self._refresh_lock.acquire(False):
try:
if not self.refresh_needed(self._advisory_refresh_timeout):
return
is_mandatory_refresh = self.refresh_needed(
self._mandatory_refresh_timeout
)
self._protected_refresh(is_mandatory=is_mandatory_refresh)
return
finally:
self._refresh_lock.release()
elif self.refresh_needed(self._mandatory_refresh_timeout):
# If we're within the mandatory refresh window,
# we must block until we get refreshed credentials.
with self._refresh_lock:
if not self.refresh_needed(self._mandatory_refresh_timeout):
return
self._protected_refresh(is_mandatory=True)
Of particular interest is the credentials resolver:
env_provider = EnvProvider()
container_provider = ContainerProvider()
instance_metadata_provider = InstanceMetadataProvider(
iam_role_fetcher=InstanceMetadataFetcher(
timeout=metadata_timeout,
num_attempts=num_attempts,
user_agent=session.user_agent(),
config=imds_config,
)
)
profile_provider_builder = ProfileProviderBuilder(
session, cache=cache, region_name=region_name
)
assume_role_provider = AssumeRoleProvider(
load_config=lambda: session.full_config,
client_creator=_get_client_creator(session, region_name),
cache=cache,
profile_name=profile_name,
credential_sourcer=CanonicalNameCredentialSourcer(
[env_provider, container_provider, instance_metadata_provider]
),
profile_provider_builder=profile_provider_builder,
)
Boto tracks credentials from a number of sources including profile based credentials, environment variables, special container resolvers, instance metadata for EC2 instance profiles, and AssumeRole providers. As noted by the order environment variables have the highest resolution priority.
Resources
Resources are an abstraction around the lower level API calls encapsulating them into an easier to work with form. This part is actually handled on the boto3 side instead of the botocore one. Despite being an alternative to the client interface it still uses it on the backend:
if identifiers is None:
identifiers = []
#: (``list``) List of identifier names
self.identifiers = identifiers
#: (:py:class:`~botocore.client.BaseClient`) Low-level Botocore client
self.client = client
#: (``dict``) Loaded resource data attributes
self.data = data
# The resource model for that resource
self.resource_model = resource_model
Resources also have their own service file layout:
"Queue": {
"identifiers": [
{ "name": "Url" }
],
"shape": "GetQueueAttributesResult",
"load": {
"request": {
"operation": "GetQueueAttributes",
"params": [
{ "target": "QueueUrl", "source": "identifier", "name": "Url" },
{ "target": "AttributeNames[]", "source": "string", "value": "All" }
]
},
"path": "@"
},
There's a few things going on here. First is the load method which refreshes a queues attributes via GetQueueAttributes
. It also reads in the QueueUrl
to make it available as the url
property. There are also constructors for obtaining a resource type via a unique identifier. This is often shown as has
declarations in the resource JSON:
"has": {
"Queue": {
"resource": {
"type": "Queue",
"identifiers": [
{ "target": "Url", "source": "input" }
]
}
}
},
In this case a Queue
resource can be instantiated by providing the URL to the queue.
Collections
Collections are an abstraction around pagination logic. One of the more interesting things it does is provide iteration in the form of a python generator:
def __iter__(self):
"""
A generator which yields resource instances after doing the
appropriate service operation calls and handling any pagination
on your behalf.
Page size, item limit, and filter parameters are applied
if they have previously been set.
>>> bucket = s3.Bucket('boto3')
>>> for obj in bucket.objects.all():
... print(obj.key)
'key1'
'key2'
"""
limit = self._params.get('limit', None)
count = 0
for page in self.pages():
for item in page:
yield item
# If the limit is set and has been reached, then
# we stop processing items here.
count += 1
if limit is not None and count >= limit:
return
Which does mean you'll need to do list()
style casting to switch to greedy loading and operate on results using the list type format. Collections can be seen in the boto resource JSON files as hasMany
style declarations:
"hasMany": {
"Queues": {
"request": { "operation": "ListQueues" },
"resource": {
"type": "Queue",
"identifiers": [
{ "target": "Url", "source": "response", "path": "QueueUrls[]" }
]
}
}
}
The target also gives you the items at the level you generally need them at without having to reference the toplevel return key ("QueueUrls" for example). This is especially handy for the EC2 resource which has two layers of nesting:
"Instances": {
"request": { "operation": "DescribeInstances" },
"resource": {
"type": "Instance",
"identifiers": [
{ "target": "Id", "source": "response", "path": "Reservations[].Instances[].InstanceId" }
],
"path": "Reservations[].Instances[]"
}
},
Conclusion
I hope you enjoyed this look at how boto operates on the inside. It's a rather fascinating use of python and shows how a service heavy cloud provider like AWS can have a scalable API interface via service model mapping. Given that the AWS CLI uses it, it's also a nice peak at how the CLI works as well.
Top comments (0)