Programming Amazon DynamoDB with Python and Boto3

This guide provides an orientation to programmers wanting to use Amazon DynamoDB with Python. Learn about the different abstraction layers, configuration management, error handling, controlling retry policies, managing keep-alive, and more.

Topics

About Boto

You can access DynamoDB from Python by using the official AWS SDK for Python, commonly referred to as Boto3. The name Boto (pronounced boh-toh) comes from a freshwater dolphin native to the Amazon River. The Boto3 library is the library’s third major version, first released in 2015. The Boto3 library is quite large, as it supports all AWS services, not just DynamoDB. This orientation targets only the parts of Boto3 relevant to DynamoDB.

Boto is maintained and published by AWS as open-source project hosted on GitHub. It’s split into two packages: Botocore and Boto3 .

Because these projects are hosted on GitHub, you can view the source code, track open issues, or submit your own issues.

Using the Boto documentation

Get started with the Boto documentation with the following resources:

Understanding the client and resource abstraction layers

The two interfaces you'll be working with are the client interface and the resource interface.

Here’s an example of inserting an item using the client interface. Notice how all values are passed as a map with the key indicating their type ('S' for string, 'N' for number) and their value as a string. This is known as DynamoDB JSON format.

import boto3 dynamodb = boto3.client('dynamodb') dynamodb.put_item( TableName='YourTableName', Item= 'pk': 'S': 'id#1'>, 'sk': 'S': 'cart#123'>, 'name': 'S': 'SomeName'>, 'inventory': 'N': '500'>, # . more attributes . > ) 

Here's the same PutItem operation using the resource interface. The data typing is implicit:

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item= 'pk': 'id#1', 'sk': 'cart#123', 'name': 'SomeName', 'inventory': 500, # . more attributes . > ) 

If needed, you can convert between regular JSON and DynamoDB JSON using the TypeSerializer and TypeDeserializer classes provided with boto3:

def dynamo_to_python(dynamo_object: dict) -> dict: deserializer = TypeDeserializer() return  k: deserializer.deserialize(v) for k, v in dynamo_object.items() > def python_to_dynamo(python_object: dict) -> dict: serializer = TypeSerializer() return  k: serializer.serialize(v) for k, v in python_object.items() > 

Here is how to perform a query using the client interface. It expresses the query as a JSON construct. It uses a KeyConditionExpression string which requires variable substitution to handle any potential keyword conflicts:

import boto3 client = boto3.client('dynamodb') # Construct the query response = client.query( TableName='YourTableName', KeyConditionExpression='pk = :pk_val AND begins_with(sk, :sk_val)', FilterExpression='#name = :name_val', ExpressionAttributeValues= ':pk_val': 'S': 'id#1'>, ':sk_val': 'S': 'cart#'>, ':name_val': 'S': 'SomeName'>, >, ExpressionAttributeNames= '#name': 'name', > ) 

The same query operation using the resource interface can be shortened and simplified:

import boto3 from boto3.dynamodb.conditions import Key, Attr dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') response = table.query( KeyConditionExpression=Key('pk').eq('id#1') & Key('sk').begins_with('cart#'), FilterExpression=Attr('name').eq('SomeName') ) 

As a final example, imagine you want to get the approximate size of a table (which is metadata kept on the table that is updated about every 6 hours). With the client interface, you do a describe_table() operation and pull the answer from the JSON structure returned:

import boto3 dynamodb = boto3.client('dynamodb') response = dynamodb.describe_table(TableName='YourTableName') size = response['Table']['TableSizeBytes'] 

With the resource interface, the table performs the describe operation implicitly and presents the data directly as an attribute:

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') size = table.table_size_bytes 
Note

When considering whether to develop using the client or resource interface, be aware that new features will not be added to the resource interface per the resource documentation : “The AWS Python SDK team does not intend to add new features to the resources interface in boto3. Existing interfaces will continue to operate during boto3’s lifecycle. Customers can find access to newer service features through the client interface.”

Using the table resource batch_writer

One convenience available only with the higher-level table resource is the batch_writer . DynamoDB supports batch write operations allowing up to 25 put or delete operations in one network request. Batching like this improves efficiency by minimizing network round trips.

With the low-level client library, you use the client.batch_write_item() operation to run batches. You must manually split your work into batches of 25. After each operation, you also have to request to receive a list of unprocessed items (some of the write operations may succeed while others could fail). You then have to pass those unprocessed items again into a later batch_write_item() operation. There's a significant amount of boilerplate code.

The Table.batch_writer method creates a context manager for writing objects in a batch. It presents an interface where it seems as if you're writing items one at a time, but internally it's buffering and sending the items in batches. It also handles unprocessed item retries implicitly.

dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') movies = # long list of movies in 'pk': 'val', 'sk': 'val', etc> format with table.batch_writer() as writer: for movie in movies: writer.put_item(Item=movie) 

Additional code examples that explore the client and resource layers

You can also refer to the following code sample repositories that explore usage of the various functions, using both client and resource:

Understanding how the Client and Resource objects interact with sessions and threads

The Resource object is not thread safe and should not be shared across threads or processes. Refer to the guide on Resource for more details.

The Client object, in contrast, is generally thread safe, except for specific advanced features. Refer to the guide on Clients for more details.

The Session object is not thread safe. So, each time you make a Client or Resource in a multi-threaded environment you should create a new Session first and then make the Client or Resource from the Session. Refer to the guide on Sessions for more details.

When you call the boto3.resource() , you’re implicitly using the default Session. This is convenient for writing single-threaded code. When writing multi-threaded code, you’ll want to first construct a new Session for each thread and then retrieve the resource from that Session:

# Explicitly create a new Session for this thread session = boto3.Session() dynamodb = session.resource('dynamodb') 

Customizing the Config object

When constructing a Client or Resource object, you can pass optional named parameters to customize behavior. The parameter named config unlocks a variety of functionality. It’s an instance of botocore.client.Config and the reference documentation for Config shows everything it exposes for you to control. The guide to Configuration provides a good overview.

Note

You can modify many of these behavioral settings at the Session level, within the AWS configuration file, or as environment variables.

Config for timeouts

One use of a custom config is to adjust networking behaviors:

Timeouts of 60 seconds are excessive for DynamoDB. It means a transient network glitch will cause a minute’s delay for the client before it can try again. The following code shortens the timeouts to a second:

import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0 ) dynamodb = boto3.resource('dynamodb', config=my_config) 

For more discussion about timeouts, see Tuning AWS Java SDK HTTP request settings for latency-aware DynamoDB applications . Note the Java SDK has more timeout configurations than Python.

Config for keep-alive

If you're using botocore 1.27.84 or later, you can also control TCP Keep-Alive:

Setting TCP Keep-Alive to True can reduce average latencies. Here's sample code that conditionally sets TCP Keep-Alive to true when you have the right botocore version:

import botocore import boto3 from botocore.config import Config from distutils.version import LooseVersion required_version = "1.27.84" current_version = botocore.__version__ my_config = Config( connect_timeout = 0.5, read_timeout = 0.5 ) if LooseVersion(current_version) > LooseVersion(required_version): my_config = my_config.merge(Config(tcp_keepalive = True)) dynamodb = boto3.resource('dynamodb', config=my_config) 
Note

TCP Keep-Alive is different than HTTP Keep-Alive. With TCP Keep-Alive, small packets are sent by the underlying operating system over the socket connection to keep the connection alive and immediately detect any drops. With HTTP Keep-Alive, the web connection built on the underlying socket gets reused. HTTP Keep-Alive is always enabled with boto3.

There's a limit to how long an idle connection can be kept alive. Consider sending periodic requests (say every minute) if you have an idle connection but want the next request to use an already-established connection.

Config for retries

The config also accepts a dictionary called retries where you can specify your desired retry behavior. Retries happen within the SDK when the SDK receives an error and the error is of a transient type. If an error is retried internally (and a retry eventually produces a successful response), there's no error seen from the calling code's perspective, just a slightly elevated latency. Here are the values you can specify:

Note
With exponential backoff, the last attempt will wait almost 13 seconds.

An expanded definition of these retry modes can be found in the guide to retries as well as in the Retry behavior topic in the SDK reference.

Here’s an example that explicitly uses the legacy retry policy with a maximum of 3 total requests (2 retries):

import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0, retries =  'mode': 'legacy', 'total_max_attempts': 3 > ) dynamodb = boto3.resource('dynamodb', config=my_config) 

Because DynamoDB is a highly-available, low-latency system, you may want to be more aggressive with the speed of retries than the built-in retry policies allow. You can implement your own retry policy by setting the max attempts to 0, catching the exceptions yourself, and retrying as appropriate from your own code instead of relying on boto3 to do implicit retries.

If you manage your own retry policy, you'll want to differentiate between throttles and errors:

Config for max pool connections

Lastly, the config lets you control the connection pool size:

This option controls the maximum number of HTTP connections to keep pooled for reuse. A different pool is kept per Session. If you anticipate more than 10 threads going against clients or resources built off the same Session, you should consider raising this, so threads don't have to wait on other threads using a pooled connection.

import boto3 from botocore.config import Config my_config = Config( max_pool_connections = 20 ) # Setup a single session holding up to 20 pooled connections session = boto3.Session(my_config) # Create up to 20 resources against that session for handing to threads # Notice the single-threaded access to the Session and each Resource resource1 = session.resource('dynamodb') resource2 = session.resource('dynamodb') # etc 

Error handling

AWS service exceptions aren’t all statically defined in Boto3. This is because errors and exceptions from AWS services vary widely and are subject to change. Boto3 wraps all service exceptions as a ClientError and exposes the details as structured JSON. For example, an error response might be structured like this:

 'Error':  'Code': 'SomeServiceException', 'Message': 'Details/context around the exception or error' >, 'ResponseMetadata':  'RequestId': '1234567890ABCDEF', 'HostId': 'host ID data will appear here as a hash', 'HTTPStatusCode': 400, 'HTTPHeaders': 'header metadata key/values will appear here'>, 'RetryAttempts': 0 > > 

The following code catches any ClientError exceptions and looks at the string value of the Code within the Error to determine what action to take:

import botocore import boto3 dynamodb = boto3.client('dynamodb') try: response = dynamodb.put_item(. ) except botocore.exceptions.ClientError as err: print('Error Code: >'.format(err.response['Error']['Code'])) print('Error Message: >'.format(err.response['Error']['Message'])) print('Http Code: >'.format(err.response['ResponseMetadata']['HTTPStatusCode'])) print('Request ID: >'.format(err.response['ResponseMetadata']['RequestId'])) if err.response['Error']['Code'] in ('ProvisionedThroughputExceededException', 'ThrottlingException'): print("Received a throttle") elif err.response['Error']['Code'] == 'InternalServerError': print("Received a server error") else: raise err 

Some (but not all) exception codes have been materialized as top-level classes. You can choose to handle these directly. When using the Client interface, these exceptions are dynamically populated on your client and you catch these exceptions using your client instance, like this:

except ddb_client.exceptions.ProvisionedThroughputExceededException:

When using the Resource interface, you have to use .meta.client to traverse from the resource to the underlying Client to access the exceptions, like this:

except ddb_resource.meta.client.exceptions.ProvisionedThroughputExceededException:

To review the list of materialized exception types, you can generate the list dynamically:

ddb = boto3.client("dynamodb") print([e for e in dir(ddb.exceptions) if e.endswith('Exception') or e.endswith('Error')]) 

When doing a write operation with a condition expression, you can request that if the expression fails the value of the item should be returned in the error response.

try: response = table.put_item( Item=item, ConditionExpression='attribute_not_exists(pk)', ReturnValuesOnConditionCheckFailure='ALL_OLD' ) except table.meta.client.exceptions.ConditionalCheckFailedException as e: print('Item already exists:', e.response['Item']) 

For further reading on error handling and exceptions:

Logging

The boto3 library integrates with Python's built-in logging module for tracking what happens during a session. To control logging levels, you can configure the logging module:

import logging logging.basicConfig(level=logging.INFO) 

This configures the root logger to log INFO and above level messages. Logging messages which are less severe than level will be ignored. Logging levels include DEBUG , INFO , WARNING , ERROR , and CRITICAL . The default is WARNING .

Loggers in boto3 are hierarchical. The library uses a few different loggers, each corresponding to different parts of the library. You can separately control the behavior of each:

Other libraries log as well. Internally, boto3 uses the third party urllib3 for HTTP connection handling. When latency is important, you can watch its logs to ensure your pool is being well utilized by seeing when urllib3 establishes a new connection or closes an idle one down.

The following code snippet sets most logging to INFO with DEBUG logging for endpoint and connection pool activity:

import logging logging.getLogger('boto3').setLevel(logging.INFO) logging.getLogger('botocore').setLevel(logging.INFO) logging.getLogger('botocore.endpoint').setLevel(logging.DEBUG) logging.getLogger('urllib3.connectionpool').setLevel(logging.DEBUG) 

Event hooks

Botocore emits events during various parts of its execution. You can register handlers for these events so that whenever an event is emitted, your handler will be called. This lets you extend the behavior of botocore without having to modify the internals.

For instance, let's say you want to keep track of every time a PutItem operation is called on any DynamoDB table in your application. You might register on the 'provide-client-params.dynamodb.PutItem' event to catch and log every time a PutItem operation is invoked on the associated Session. Here's an example:

import boto3 import botocore import logging def log_put_params(params, **kwargs): if 'TableName' in params and 'Item' in params: logging.info(f"PutItem on table params['TableName']>: params['Item']>") logging.basicConfig(level=logging.INFO) session = boto3.Session() event_system = session.events # Register our interest in hooking in when the parameters are provided to PutItem event_system.register('provide-client-params.dynamodb.PutItem', log_put_params) # Now, every time you use this session to put an item in DynamoDB, # it will log the table name and item data. dynamodb = session.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item= 'pk': '123', 'sk': 'cart#123', 'item_data': 'YourItemData', # . more attributes . > ) 

Within the handler, you can even manipulate the params programmatically to change behavior:

params['TableName'] = "NewTableName"

Pagination and the Paginator

Some requests, such as Query and Scan, limit the size of data returned on a single request and require you to make repeated requests to pull subsequent pages.

You can control the maximum number of items to be read for each page with the limit parameter. For example, if you want the last 10 items, you can use limit to retrieve only the last 10. Note the limit is how much should be read from the table before any filtering is applied. There's no way to specify you want exactly 10 after filtering; you can only control the pre-filtered count and check client-side when you've actually retrieved 10. Regardless of the limit, every response always has a maximum size of 1 MB.

If the response includes a LastEvaluatedKey , it indicates the response ended because it hit a count or size limit. The key is the last key evaluated for the response. You can retrieve this LastEvaluatedKey and pass it to a follow-up call as ExclusiveStartKey to read the next chunk from that starting point. When there's no LastEvaluatedKey returned that, means there are no more items matching the Query or Scan.

Here's a simple example (using the Resource interface, but the Client interface has the same pattern) that reads at most 100 items per page and loops until all items have been read.

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') query_params =  'KeyConditionExpression': Key('pk').eq('123') & Key('sk').gt(1000), 'Limit': 100 > while True: response = table.query(**query_params) # Process the items however you like for item in response['Items']: print(item) # No LastEvaluatedKey means no more items to retrieve if 'LastEvaluatedKey' not in response: break # If there are possibly more items, update the start key for the next page query_params['ExclusiveStartKey'] = response['LastEvaluatedKey'] 

For convenience, boto3 can do this for you with Paginators. However, it only works with the Client interface. Here's the code rewritten to use Paginators:

import boto3 dynamodb = boto3.client('dynamodb') paginator = dynamodb.get_paginator('query') query_params =  'TableName': 'YourTableName', 'KeyConditionExpression': 'pk = :pk_val AND sk > :sk_val', 'ExpressionAttributeValues':  ':pk_val': 'S': '123'>, ':sk_val': 'N': '1000'>, >, 'Limit': 100 > page_iterator = paginator.paginate(**query_params) for page in page_iterator: # Process the items however you like for item in page['Items']: print(item) 
Note

Paginators also have their own configuration settings named MaxItems , StartingToken , and PageSize . For paginating with DynamoDB, you should ignore these settings.

Waiters

Waiters provide the ability to wait for something to complete before proceeding. At present, they only support waiting for a table to be created or deleted. In the background, the waiter operation does a check for you every 20 seconds up to 25 times. You could do this yourself, but using a waiter is elegant when writing automation.

This code shows how to wait for a particular table to have been created:

# Create a table, wait until it exists, and print its ARN response = client.create_table(. ) waiter = client.get_waiter('table_exists') waiter.wait(TableName='YourTableName') print('Table created:', response['TableDescription']['TableArn']