Generalmente usamos DynamoDB para soluciones que necesitan un tiempo de respuesta muy bajo y no para BI o analítica de datos debido a su naturaleza.
En este post vamos a utilizar DynamoDB Streams con el objetivo de filtrar los datos y crear una tabla con información especifica para que sea consultada por una herramienta de visualización de datos cómo puede ser QuickSight.
Vamos a hacer uso de los siguientes servicios en la implementación de la solución.
- DynamoDB
- Lambda
- Cloud9
Suponiendo que tuviéramos almacenada la información de los partidos de futbol de todas las ligas de futbol del mundo desde 1970, vamos a manejar dos tablas una para los resultados de los partidos de futbol (tabla principal) y en otra tabla (consulta) vamos a almacenar y actualizar las tablas de posiciones para posteriormente hacer nuestras visualizaciones.
Iniciemos por crear el ambiente en Cloud9 en donde vamos ejecutar nuestros scripts para poblar las tablas de DynamoDB.
Buscamos el servicio Cloud9 y damos click en crear entorno.
Vamos a ver las opciones de configuración de la maquina EC2 qué queremos utilizar para nuestro entorno. Que en este caso vamos a dejar las opciones por defecto. Damos click en el botón siguiente.
Vemos la configuración que definimos para nuestro entorno y damos click en crear.
Veremos una pantalla de espera cómo la siguiente mientras se crea nuestro entorno de trabajo.
Una vez nuestro entorno esta listo, debemos crear el archivo .py (python) que contiene el código fuente de nuestra tabla de DynamoDB. Damos click en crear archivo.
Debemos copiar el siguiente código en el nuevo archivo y guardarlo con extension .py, en este caso le vamos a dar el nombre de creartabla.py
import boto3
from boto3.dynamodb.conditions import Key
dynamodb = boto3.resource("dynamodb")
dynamodb.create_table(
TableName='ResultadosPartidos',
KeySchema=[
{
'AttributeName': 'PK',
'KeyType': 'HASH' # Partition key
},
{
'AttributeName': 'SK',
'KeyType': 'RANGE' # Sort key
}
],
#Se deben crear las llaves nuevamente con su tipo de dato
AttributeDefinitions=[
{
'AttributeName': 'PK',
'AttributeType': 'S' # Se utiliza S para tipos de datos String y N para numeros
},
{
'AttributeName': 'SK',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
dynamodb.create_table(
TableName='TablaPosiciones',
KeySchema=[
{
'AttributeName': 'Liga',
'KeyType': 'HASH' # Partition key
},
{
'AttributeName': 'Equipo',
'KeyType': 'RANGE' # Sort key
}
],
#Se deben crear las llaves nuevamente con su tipo de dato
AttributeDefinitions=[
{
'AttributeName': 'Liga',
'AttributeType': 'S' # Se utiliza S para tipos de datos String y N para numeros
},
{
'AttributeName': 'Equipo',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
Una vez creado el archivo debemos instalar boto3 ejecutando en la terminal el siguiente comando:
pip install boto3
Instalado boto3 debemos ejecutar nuestro archivo, ejecutando en la terminal el siguiente comando:
python creartabla.py
Una vez ejecutado nuestro script vamos a buscar el servicio de DynamoDB y ver las tablas que acabamos de crear.
Ahora vamos a crear la función Lambda que va a filtrar los datos de la tabla principal a la tabla que vamos a utilizar para nuestras visualizaciones. Para esto buscamos Lambda en el buscador de servicios y damos click en Create Function.
Le damos un nombre a la función Lambda, seleccionamos el lenguaje de programación (Runtime = Python3.8)y damos click en Create Function.
El siguiente paso es darle los permisos necesarios al rol asignado a la función Lambda. Para esto damos click en la pestaña Configuration en el menu Permissions y en el nombre del rol.
Agregamos las siguientes políticas al rol:
CloudWatchFullAccess: Nos sirve para acceder a logs de la función Lambda
AmazonDynamoDBFullAccess: Nos sirve para poder guardar información en la tabla y para qué DynamoDB Streams pueda invocar a la función. En otro tipo de entorno recuerden ser mas granulares con sus políticas.
En caso que únicamente se necesitara hacer invocaciones a la función Lambda desde DynamoDB Streams, se debe usar la política: AWSLambdaInvocation-DynamoDB.
Creada la función Lambda y asignado los permisos necesarios para su ejecución, vamos a habilitar el Stream en DynamoDb para que llame a la función Lambda cuando se ingrese un nuevo registro a la tabla.Para esto vamos a la pantalla principal de DynamoDB damos click en Tables, seleccionamos la tabla ResultadosPartidos y damos click en la pestaña Exports and streams.
Los siguientes pasos son habilitar el Stream y asignarle la función Lambda que ya creamos.
Habilitemos el Stream:
Seleccionamos el tipo de información que va a ser enviada la la función Lambda, en este caso solo seleccionamos la información del nuevo registro que se ingreso en la tabla.
Validamos que el stream haya quedado habilitado.
Asignamos la función al trigger la función Lambda.
Seleccionamos la función Lambda que va a procesar el Stream y la cantidad de registros que va a procesar en cada llamado.
Por ultimo validamos que el trigger este habilitado y que la función Lambda que va a procesar el stream sea la correcta.
El diseño de tabla de DynamoDb que vamos a utilizar en este post es de una sola tabla, necesitamos definir un filtro sobre los mensajes que van a ser enviados a la función Lambda con el objetivo que se procesen solamente los registros que necesitamos para construir la tabla de posiciones.
Para esto vamos de nuevo a nuestra función Lambda y damos click Configuration - Triggers. Podemos observar qué se agrego el trigger que creamos desde DynamoDB. Damos click en checkbox del trigger y después en el botón editar.
La siguiente pantalla nos muestra la configuración actual del trigger. Damos click Additional settings
Buscamos la sección Filter criteria y copiamos el siguiente filtro en el cuadro de texto.
{"eventName":["INSERT"],"dynamodb":{"NewImage":{"Local":{"S":[{ "exists": true }]}}},"dynamodb":{"NewImage":{"Visitante":{"S":[{ "exists": true }]}}}}
Guardamos los cambios.
Con los pasos anteriores configuramos el filtro del stream para que solo sean procesados por la función Lambda los eventos del tipo Insert y que dentro de la datos enviados se encuentre el campo Local y Visitante, los demás datos que no cumplan con esta condición son ignorados y la función Lambda no es invocada, lo que ayuda a reducir costos debido a que solo se procesa la información necesaria.
El siguiente paso es ponerle código fuente a nuestra función Lambda, vamos a la pestaña Code, reemplazamos el código existente por el siguiente y damos click en el botón Deploy.
import json
import boto3
from boto3.dynamodb.conditions import Key
dynamodb = boto3.resource("dynamodb")
tabla = dynamodb.Table("TablaPosiciones")
local = "local"
visitante = "visitante"
class Partido:
def __init__(self,local,visitante,marcadorLocal,marcadorVisitante,liga):
self.local = local
self.visitante = visitante
self.marcadorLocal = marcadorLocal
self.marcadorVisitante = marcadorVisitante
self.liga = liga
def getLocal(self):
return self.local
def getVisitante(self):
return self.visitante
def getMarcadorLocal(self):
return self.marcadorLocal
def getMarcadorVisitante(self):
return self.marcadorVisitante
def getLiga(self):
return self.liga
def setLocal(self,local):
self.local = local
def setVisitantel(self,visitante):
self.visitante = visitante
def setMarcadorLocal(self,marcadorLocal):
self.marcadorLocal = marcadorLocal
def setMarcadorVisitante(self,marcadorVisitante):
self.marcadorVisitante = marcadorVisitante
def setLiga(self,liga):
self.liga = liga
def obtenerPartido(data):
local = data["Local"]["S"]
visitante = data["Visitante"]["S"]
marcadorLocal = data["MarcadorLocal"]["N"]
marcadorVisitante = data["MarcadorVisitante"]["N"]
liga = data["PK"]["S"].split("#")[0]
return Partido(local,visitante,marcadorLocal,marcadorVisitante,liga)
def lambda_handler(event, context):
data = event["Records"][0]["dynamodb"]["NewImage"]
partido = obtenerPartido(data)
datosLocal = tabla.query(KeyConditionExpression=Key('Liga').eq(partido.getLiga()) & Key("Equipo").eq(partido.getLocal()))
datosVisitante = tabla.query(KeyConditionExpression=Key('Liga').eq(partido.getLiga()) & Key("Equipo").eq(partido.getVisitante()))
actualizarDatosEquipo(datosLocal["Items"],partido,local)
actualizarDatosEquipo(datosVisitante["Items"],partido,visitante)
print("ejecuto")
def actualizarDatosEquipo(datos,partido,localidad):
puntos = getPuntos(partido,localidad)
if len(datos) == 0:
tabla.put_item(Item={
"Liga": partido.getLiga(),
"Equipo":partido.getLocal() if localidad == local else partido.getVisitante(),
"PartidosJugados":1,
"GolesFavor": partido.getMarcadorLocal() if localidad == local else partido.getMarcadorVisitante(),
"GolesContra":partido.getMarcadorLocal() if localidad != local else partido.getMarcadorVisitante(),
"Puntos": puntos
})
else:
tabla.update_item(
Key={
"Liga": partido.getLiga(),
"Equipo": partido.getLocal() if localidad == local else partido.getVisitante()
},
UpdateExpression="set PartidosJugados=:partidos_jugados,GolesFavor=:goles_favor,GolesContra=:goles_contra,Puntos=:puntos",
ExpressionAttributeValues={
":partidos_jugados":datos[0]["PartidosJugados"] + 1,
":goles_favor":int(datos[0]["GolesFavor"]) + int(partido.getMarcadorLocal()) if localidad == local else int(datos[0]["GolesFavor"]) +int(partido.getMarcadorVisitante()),
":goles_contra":int(datos[0]["GolesContra"]) + int(partido.getMarcadorVisitante()) if localidad == local else int(datos[0]["GolesContra"])+int(partido.getMarcadorLocal()),
":puntos":int(datos[0]["Puntos"]) + puntos
},
ReturnValues="UPDATED_NEW"
)
def getPuntos(partido,localidad):
puntos = 1
if local == localidad:
if partido.getMarcadorLocal() > partido.getMarcadorVisitante():
puntos = 3
elif partido.getMarcadorLocal() < partido.getMarcadorVisitante():
puntos = 0
else:
if partido.getMarcadorLocal() < partido.getMarcadorVisitante():
puntos = 3
elif partido.getMarcadorLocal() > partido.getMarcadorVisitante():
puntos = 0
return puntos
Debemos validar el funcionamiento tanto del filtro como el del código de la función Lambda. Por lo tanto debemos ingresar nuevos items en la tabla ResultadosPartidos.
Nos dirigimos nuevamente a Cloud9 y creamos un nuevo archivo llamado data.py y lo ejecutamos utilizando en la terminal el comando python data.py.
import boto3
from boto3.dynamodb.conditions import Key
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("ResultadosPartidos")
table.put_item(Item={
"PK": "Premier League#2022/23",
"SK":"Arsenal#Liverpool#2022/10/09",
"Local":"Arsenal",
"Visitante":"Liverpool",
"MarcadorLocal":3,
"MarcadorVisitante": 2,
"Fecha":"2022/10/09"
})
table.put_item(Item={
"PK": "Premier League#2022/23",
"SK":"Manchester City#Southampton#2022/10/08",
"Local":"Manchester City",
"Visitante":"Southampton",
"MarcadorLocal":4,
"MarcadorVisitante": 0,
"Fecha":"2022/10/08"
})
table.put_item(Item={
"PK": "Premier League#2022/23",
"SK":"Everton#Manchester Utd#2022/10/08",
"Local":"Everton",
"Visitante":"Manchester Utd",
"MarcadorLocal":1,
"MarcadorVisitante": 2,
"Fecha":"2022/10/09"
})
table.put_item(Item={
"PK": "Premier League#2022/23",
"SK":"Manchester City#Manchester Utd#2022/10/08",
"Local":"Manchester City",
"Visitante":"Manchester Utd",
"MarcadorLocal":6,
"MarcadorVisitante": 3,
"Fecha":"2022/10/02"
})
table.put_item(Item={
"PK": "Premier League#2022/23",
"SK":"Liverpool#Brighton#2022/10/09",
"Local":"Liverpool",
"Visitante":"Brighton",
"MarcadorLocal":3,
"MarcadorVisitante": 3,
"Fecha":"2022/10/01"
})
table.put_item(Item={
"PK": "Manchester City#2022/23",
"SK":"Jugador#Erling Haaland",
"Nombre":"Erling Haaland",
"Goles":15,
"Partidos Jugados": 9
})
table.put_item(Item={
"PK": "Liverpool#2022/23",
"SK":"Jugador#Luis Diaz",
"Nombre":"Luis Diaz",
"Goles":3,
"Partidos Jugados": 8
})
table.put_item(Item={
"PK": "Tottenham#2022/23",
"SK":"Jugador#Kane",
"Nombre":"Kane",
"Goles":8,
"Partidos Jugados": 9
})
Por ultimo vamos a validar la información almacenada en las dos tablas.
Cómo podemos observar en la tabla ResultadoPartidos se almaceno la información de los resultados de cada partido más la información de los jugadores de los equipos. Esto se realiza de esta forma por el tipo de diseño de tabla que elegimos para el ejercicio que es de única tabla.
En la tabla TablaPosiciones podemos observar que únicamente tenemos almacenado la información de los equipos de futbol y no la de los jugadores porque no cumplen con el filtro creado. Si queremos confirmar que la función Lambda solo se haya llamado las veces correctas (5 que son el numero de partidos que ingresamos), debemos ir a CloudWatch y buscar el log group que termina con el nombre de la función Lambda: lambda/lambda_stream_dynamodb y observar el numero de veces que la función fue invocada.
La Arquitectura de nuestro post es la siguiente:
Como actividad adicional pueden agregar mas registros al script de inserción, tratar de crear nuevos tipos de filtros o mejorar el código fuente de la función Lambda.
En el proximo post vamos a ver cómo crear gráficos de los datos de la tabla TablaPosiciones utilizando el servicio de QuickSight.
Me pueden encontrar en
Top comments (0)