Hands on Lab Learning DynamoDB with Python Library PynamoDB¶
Tutorials¶
- Very Quick Overview on DynamoDB in 8 Minutes
- https://www.youtube.com/watch?v=kxAOskUGuw8
- Learn AWS DynamoDB in a easy Way in 55 Minutes | Indexes | Partition | Boto3
- https://www.youtube.com/watch?v=doocp3IMOWA
PynamoDB¶
PynamoDB is a Pythonic interface to Amazon’s DynamoDB. By using simple, yet powerful abstractions over the DynamoDB API, PynamoDB allows you to start developing immediately.
Python 3 support
- Support for Unicode, Binary, JSON, Number, Set, and UTC Datetime attributes
- Support for DynamoDB Local
- Support for all of the DynamoDB API
- Support for Global and Local Secondary Indexes
- Batch operations with automatic pagination
- Iterators for working with Query and Scan operations
- Fully tested
import os
import boto3
import json
from faker import Faker
import random
import pynamodb.attributes as at
import datetime
from datetime import datetime
from pynamodb.models import Model
from pynamodb.attributes import *
AWS_ACCESS_KEY = "XXX"
AWS_SECRET_KEY = "XXXXX"
AWS_REGION_NAME = "us-east-1"
Why PynamoDB?¶
It all started when author needed to use Global Secondary Indexes, a new and powerful feature of DynamoDB. I quickly realized that my go to library, dynamodb-mapper, didn’t support them. In fact, it won’t be supporting them anytime soon because dynamodb-mapper relies on another library, boto.dynamodb, which itself won’t support them. In fact, boto doesn’t support Python 3 either. If you want to know more, I blogged about it.
Installation¶
pip install pynamodb
!pip install pynamodb
faker = Faker()
Creating Model¶
class UserModel(Model):
class Meta:
table_name = 'youtube_tut'
aws_access_key_id = AWS_ACCESS_KEY
aws_secret_access_key = AWS_SECRET_KEY
email = UnicodeAttribute(null=True)
first_name = UnicodeAttribute(range_key=True)
last_name = UnicodeAttribute(hash_key=True)
Creating DynamoDB tables with pynamodb¶
- PROVISIONED
UserModel.create_table(
read_capacity_units=4,
write_capacity_units=4,
billing_mode='PROVISIONED')
- PAY_PER_REQUEST
UserModel.create_table(billing_mode='PAY_PER_REQUEST')
Insert Item¶
UserModel(
email='test@gmail.com',
first_name='test',
last_name='test name'
).save()
average = []
for i in range(1, 20):
starttime = datetime.now()
UserModel(email=faker.email(), first_name=faker.first_name(), last_name=faker.last_name()).save()
endtime = datetime.now()
delta = endtime-starttime
elapsed_time = int((delta.seconds * 1000) + (delta.microseconds / 1000))
average.append(elapsed_time)
print("Exection Time: {} MS ".format(elapsed_time))
averagetime = sum(average)/ len(average)
print("\nAverage Time in MS: {} ".format(averagetime))
Batch Inserts¶
bulk_items = [UserModel(
email=faker.email(),
first_name=faker.first_name(),
last_name=faker.last_name()
)
for i in range(1, 20)]
with UserModel.batch_write() as batch:
for item in bulk_items:
batch.save(item)
Query Partition Key¶
for user in UserModel.query("Cooper"):
print(user.email)
Querying Partition key and then by Sort Key¶
for user in UserModel.query("Cooper", UserModel.first_name.startswith("C")):
print(user.email)
Scan query¶
for item in UserModel.scan(UserModel.email.startswith('j'), limit=2):
print(item.email)
Update based on partition key¶
for user in UserModel.query("Cooper"):
user.email = "change email address"
user.save()
for user in UserModel.query("Cooper"):
print(user.email)
Deleting based on Partition key¶
for user in UserModel.query("Cooper"):
user.delete()
for user in UserModel.query("Cooper"):
print(user.email)
Delete table¶
UserModel.delete_table()