En este post vamos a ver como procesar un stream datos con Kinesis y Lambda.
Kinesis es un servicio que nos sirve para procesar datos en tiempo real. Algunas de sus caracteristicas son:
Esta compuesto por Shard.Un Shard es la capacidad de datos que se pueden consumir, equivale a un 1 MB por segundo o mil registros por segundo.
Los mensajes pueden ser almacenados entre 1 y 365 días.
Dependiendo de la cantidad de data a consumir los shard se pueden de dividir (split) para tener más capacidad de consumo o se pueden unir (merge) para disminuir la capacidad de consumo.
Partition Key es el identificador que se utiliza para decir en qué Shard se quiere almacenar el registro. Se define cuando se envía un mensaje a Kinesis.
Para este ejercicio vamos a utilizar el stream de wikipedia:Stream
qué va a ser procesado por Kinesis.
Empecemos creando un Data Stream. Vamos a buscar Kinesis en la barra de búsqueda, en la pantalla principal de Kinesis seleccionamos Kinesis Data Streams y damos click en Create data stream.
Ahora configuremos nuestro data stream. Vamos a darle el nombre de: Wikipedia-Stream, seleccionamos el modo Provisioned para aprovisionar un shard y damos click en el botón crear.
Una vez creado nuestro data stream en la pestaña de configuración podemos modificar: la cantidad de shards aprovisionados, la encripción, el periodo de retención o si queremos tener métricas mejoradas en CloudWatch.
Ya que tenemos configurado nuestro data stream debemos enviarle la información.
Vamos a utilizar el siguiente programa de Python para enviar los datos a Kinesis utilizando la librería boto3. El programa esta leyendo el siguiente stream de datos: https://stream.wikimedia.org/v2/stream/recentchange
import json
from sseclient import SSEClient as EventSource
import boto3
from datetime import datetime
clientKinesis = boto3.client('kinesis')
clientLambda = boto3.client('lambda')
url = 'https://stream.wikimedia.org/v2/stream/recentchange'
def get_object(change):
return json.dumps({"id": change["id"],"bot":change["bot"],"length":change["length"]["new"],
"timestamp":str(datetime.fromtimestamp(change["timestamp"])),"type":change["type"],"user":change["user"],
"wiki":change["wiki"],"server_name":change["server_name"]})
def put_kinesis(data):
return clientKinesis.put_record(
StreamName='Wikipedia-Stream',
Data=data,
PartitionKey='1'
)
for event in EventSource(url):
if event.event == 'message':
try:
change = json.loads(event.data)
if ("id" in change and "bot" in change and "bot" in change and "length" in change
and "timestamp" in change and "type" in change
and "user" in change and "wiki" in change and "server_name" in change):
put_kinesis(get_object(change))
except ValueError:
pass
Es necesario contar con las siguientes librerías instaladas para ejecutar el programa:
- boto3 pip install boto3
- sseclient pip install sseclient
- tener instalada y configurada la CLI
Ya que hemos configurado el productor de datos debemos configurar el consumidor que en este caso va a ser una función Lambda.
Vamos a buscar el servicio Lambda en el buscador y en la pagina principal damos click en el botón Create function.
Debemos definir el nombre de la función: lambda-stream, el runtime: python3.7 y dejamos el rol por defecto por ahora.
Creada nuestra función Lamba debemos configurar el rol de la función para que pueda acceder a Kinesis. Para esto vamos a la pestaña configuración y damos click en el rol de la función.
Debemos añadir la políticas necesarias: AmazonKinesisFullAccess y la política de CloudWatchFullAccess para poder monitorear los llamados a nuestra función.
Una vez creada nuestra función y asignado los permisos necesarios para su ejecución debemos configurar un trigger para que procese los datos de nuestro data stream.
EL siguiente paso es configurar el trigger, en donde el tipo de trigger es Kinesis, seleccionamos el nombre del data stream que ya hemos creado, definimos la cantidad de registros que queremos leer (en este caso vamos a leer de un solo registro) y definimos la posición en donde debemos comenzar a leer (Latest).
Ya que terminamos la configuración vamos a modificar nuestra función Lambda para que imprima los datos que se reciben desde Kinesis.
import json
import base64
def lambda_handler(event, context):
#Obtiene toda la infomación enviada por Kinesis
print("EVENTO: " ,event)
#Obtiene solamente los datos
print("DATA: ",json.loads(base64.b64decode(event["Records"][0]["kinesis"]["data"])))
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Kinesis!')
}
El siguiente paso es ejecutar (python3 nombrearchivo.py) el programa para que empiece a enviar los datos a Kinesis.
Para ver los datos que procesa la función Lambda pestaña Monitor
En la siguiente pantalla vemos los datos que enviamos a imprimir en el log de Cloud Watch
Ahora que sabemos como obtener los datos que enviamos a Kinesis los invito a guardar esta información en DynamoDB, RDS o procesarla con otro de servicio.
Me pueden encontrar en:
Top comments (0)