Blog de Amazon Web Services (AWS)

CQRS en AWS: Sincronizando los Servicios de Command y Query con el Estándar Transactional Outbox, la Técnica Transaction Log Tailing y el Amazon Database Migration Service

Por Roberto Perillo, arquitecto de soluciones empresariales en AWS Brasil.

Esta es la cuarta parte de la serie sobre como implementar CQRS en AWS. La primera parte fue una introducción al tema, así como el análisis de un caso de uso en el que tenemos una Amazon Aurora for PostgreSQL como base de datos del servicio de comandos y una Amazon Elasticache for Redis como base de datos del servicio de consultas. Para intercambiar información, se coloca un evento en una cola de Amazon SQS, que lee un componente de computación (una función de AWS Lambda, en el ejemplo presentado) que actualiza Redis con información precalculada que está lista para recuperarse.

En la segunda parte, analizamos el mismo caso de uso, pero empezamos a utilizar el estándar Transactional Outbox. Allí, exploramos la técnica de Polling Publisher, en la que buscábamos de tiempo en tiempo en la bandeja de salida y publicábamos los eventos que aún no se habían publicado. Para que esto funcione, después de publicar nuevos eventos, borramos la bandeja de salida para que el mismo evento no se publique dos veces (y, aun así, debemos estar preparados para gestionar los eventos publicados más de una vez de forma idempotente).

En la tercera parte, exploramos la técnica Transaction Log Tailing, que es otra forma de tratar el estándar Transactional Outbox. Persistimos, junto con el aggregate, en un evento que representa la situación que acaba de ocurrir. La diferencia es que ya no leemos los eventos de la bandeja de salida para publicarlos. En su lugar, tendremos un componente que observará el log de transacciones de la bandeja de salida de la base de datos para publicar los cambios.

En esta cuarta parte también se explora la técnica Transaction Log Tailing, pero ahora el componente que observará el log de transacciones desde la bandeja de salida será el Amazon Database Migration Service (Amazon DMS) y publicará los eventos en Amazon Kinesis Data Streams. El caso de uso es prácticamente el mismo, con las mismas bases de datos y el mismo dominio.

Introducción

La mayoría de las aplicaciones que creamos requieren una base de datos para almacenar los datos. Almacenamos el estado de nuestros objetos de dominio y los utilizamos para diversos fines, como procesar y generar informes. En ocasiones, es posible que nuestra base de datos no sea ideal para la recuperación de datos, ya sea por su naturaleza o debido a un modelo de dominio complejo, por ejemplo. Para estos casos, podemos usar el estándar arquitectural CQRS, que sugiere que, para un determinado bounded context (en español, algo así como «contexto delimitado») de nuestro dominio, podemos tener dos servicios, uno para recibir comandos (es decir, operaciones que cambian de estado) y otro para consultas (que solo recuperan datos). De este modo, cada servicio puede tener la base de datos que mejor se adapte. El desafío consiste en cómo mantener sincronizadas las dos bases de datos, lo que se puede hacer con los eventos publicados desde el servicio de comandos para que los consuma el servicio de consultas.

La publicación confiable de eventos relacionados con cosas que ya han sucedido en una aplicación puede ser un desafío. Tanto si utilizamos el estándar CQRS como si no, si no tenemos cuidado, es posible que acabemos publicando información que aún no existe o que no la publiquemos en cualquier momento, como ya comentamos en la primera parte de esta serie, y las fuentes de datos pueden perder la sincronía. De hecho, dado que publicamos eventos del servicio de comandos para que los consuma el servicio de consultas, ya existe cierta coherencia posible, pero si no tenemos cuidado, es posible que las fuentes de datos no estén sincronizadas durante más tiempo del esperado. En el contexto de una base de datos relacional, el estándar Transactional Outbox nos permite publicar eventos de forma fiable. Cuando se aplica, un evento se conserva en la bandeja de salida de la misma transacción en la que se conservan los datos aggregate principales y, en algún momento, se publica más adelante.

El estándar Transactional Outbox. En este ejemplo, un aggregate de pedidos se compone de la clase Pedido, que a su vez contiene una lista de ArticuloPedido. Al realizar un pedido, el aggregate se confirma junto con un registro que representa el evento de creación del pedido.

Figura 1. El estándar Transactional Outbox. En este ejemplo, un aggregate de pedidos se compone de la clase Pedido, que a su vez contiene una lista de ArticuloPedido. Al realizar un pedido, el aggregate se confirma junto con un registro que representa el evento de creación del pedido.

Caso de uso: Amazon Aurora PostgreSQL-Compatible Edition como base de datos del servicio de comandos y Amazon Elasticache for Redis como base de datos del servicio de consultas, utilizando el estándar Transactional Outbox y la técnica Transaction Log Tailing con el Amazon Database Migration Service

Como en el primer y segundo caso que analicé en esta serie de publicaciones, tenemos Aurora como base de datos del servicio de comando y Redis como base de datos del servicio de consultas. Aurora contendrá información relacionada con los clientes, los productos y los pedidos, y Redis contendrá información precalculada relacionada con los clientes. Para probar la arquitectura, utilizaremos dos endpoints, uno para guardar los pedidos y otro para recuperar la información relacionada con el cliente.

Para implementar esta técnica, además de Aurora y Redis, también necesitaremos un stream de datos de Amazon Kinesis Data Streams para recibir los eventos publicados en la bandeja de salida. El servicio responsable de llevar los eventos del log de transacciones de la bandeja de salida al stream de datos de Amazon Kinesis Data Streams es Amazon DMS. Además de realizar migraciones completas de bases de datos, también realiza replicaciones continuas e investiga el log de transacciones de una tabla. Y tal como se mostró en la publicación anterior, también utilizaremos un pipe de Amazon EventBridge para transferir los datos del flujo de datos de Amazon Kinesis Data Streams a un tema de Amazon SNS, de modo que los cambios puedan enviarse a varios consumidores.

Resumen de la Solución

En esta implementación, utilizaremos de una database migration task en Amazon DMS del tipo «Full load, ongoing replication». Esta task constará de una replication instance y dos endpoints: uno para leer los eventos de log de transacciones de la bandeja de salida y el otro para enviar los eventos a una stream de Amazon Kinesis Data Streams. Amazon DMS resume toda la complejidad necesaria para leer el log de transacciones de la bandeja de salida.

Es importante recordar que, al igual que hicimos con el Debezium Connector, para que la migration task de Amazon DMS funcione, en el servicio Aurora, necesitamos crear un parameter group de cluster, establecer el parámetro rds.logical_replication en 1 y asignar el parameter group a nuestra base de datos Aurora (si el cluster de base de datos se creó antes de asignar el parameter group al cluster, también necesitamos reiniciar las instancias del cluster). Para obtener instrucciones sobre cómo trabajar con la replicación lógica en Aurora, consulte la documentación Uso de la replicación lógica de PostgreSQL con Aurora.

Al igual que el Debezium Connector, nuestra migration task a Amazon DMS también publicará las inserciones de nuevos eventos en la bandeja de salida de una transmisión de Amazon Kinesis Data Streams siempre que se produzcan. También captura los cambios a nivel de registro en las tablas indicadas por el usuario en las bases de datos relacionales tan pronto como persisten. Cuando se conecta a la base de datos por primera vez, toma una instantánea del estado actual de esas tablas y la publica en uno stream de Amazon Kinesis Data Streams y, a continuación, continúa supervisando esas tablas y publicando los cambios a nivel de registro.

En nuestro ejemplo, se realizará una llamada POST al endpoint /orders, con la información de un pedido que realizará un cliente. Esta llamada será validada por un autorizador de Lambda (para simplificar, utilizaremos la autenticación básica) y, posteriormente, OrderReceiverLambda la procesará, que realizará la función de nuestro servicio de comandos. Esta función Lambda, en la misma transacción, introducirá datos en las tablas que contienen información relacionada con el aggregate del pedido y también en la tabla que corresponde a nuestra bandeja de salida. El log de transacciones de esa tabla se supervisará mediante la migration task de Amazon DMS, que publicará los eventos en un stream de Amazon Kinesis Data Streams.

El stream de Amazon Kinesis Data Streams, que recibe los cambios realizados en la bandeja de salida, será la fuente de un pipe de Amazon EventBridge. La segunda fase del pipe consistirá en una función Lambda, en fase de enrichment, cuya finalidad es limpiar el evento entregado por Amazon Kinesis Data y, además, decodificar el valor del evento, que está codificado en Base64. Esta función Lambda también desempeña la función de filtro. Este filtrado es importante porque, dado que estamos siguiendo el log de transacciones de la bandeja de salida, recibimos todos los eventos, incluidas las eliminaciones, y en el escenario que estamos estudiando, solo nos interesan las inserciones. Estas eliminaciones se producen porque la función OrderEventCleanerLambda Lambda limpia la bandeja de salida, que genera eventos de eliminación.

Es cierto que es posible definir un filtro en un pipe de EventBridge. El problema es que los datos del evento están codificados en Base64 y, para filtrarlos, necesitamos decodificar los datos y comprobar los metadatos del evento para ver si el campo de la operación es igual a «insert», y no podemos hacerlo en el filtro de un pipe de EventBridge, por lo que utilizamos la función Lambda de la fase de enrichment para ello. Para continuar procesando los eventos en la fase de enrichment, podríamos crear una lista, decodificar los eventos e incluir en la lista solo aquellos eventos cuyo campo de operación contenido en los metadatos de cada evento fuera igual a «insert» y devolver la lista en la función Lambda.

Finalmente, el evento se envía a un tema de SNS, que lo envía a dos colas de SQS: una para que la busque una función Lambda que envía un correo electrónico de notificación y la otra para que la lea una función Lambda que actualiza la clave del cliente en Redis (que, en nuestro ejemplo, es la base de datos de nuestro servicio de consultas). La función Lambda que envía correos electrónicos es solo un ejemplo que ilustra que es posible realizar diferentes acciones para cada evento.

A continuación, la función Lambda que actualiza Redis publica un evento en otra cola de SQS opcional, que es leída por una función Lambda que elimina los eventos de la bandeja de salida que ya se han procesado. El evento lo publica la función Lambda que actualiza Redis solo para garantizar que la función Lambda opcional que elimina un evento ya publicado de Aurora solo reciba un evento después de una actualización correcta de Redis. Eliminar los registros de la bandeja de salida después de procesar los eventos es una forma de mantenerla limpia. Otras formas incluyen usar la extensión pg_partman, disponible para PostgreSQL de Amazon RDS o Amazon Aurora para PostgreSQL en el caso de Postgres, o trabajar con el archivado de particiones en el caso de MySQL.

Un segundo endpoint, /clients/{clientId}, recupera la información relacionada con el cliente. La función Lambda que proporciona esta información representa nuestro servicio de consultas y recuperación de la clave de Redis que se actualizó anteriormente. La información devuelta contiene el nombre del cliente, su correo electrónico y el importe total que el cliente ya ha comprado.

Imagen que muestra la arquitectura propuesta, con Aurora como base de datos del servicio de comandos y Redis como base de datos del servicio de consultas. Tras insertar un evento en Aurora, el Amazon Database Migration Service lee el log de transacciones de la bandeja de salida y publica los eventos en uno stream de Amazon Kinesis Data Streams, que se lee a través de uno pipe de EventBridge que publica esos eventos en un tema de SNS. A continuación, el SNS distribuye los eventos en dos colas, una de las cuales es leída por una lambda que actualiza Redis con los datos de clientes más recientes relacionados con cada evento.

Figura 2. Arquitectura propuesta, con Amazon Aurora PostgreSQL-Compatible Edition como base de datos del servicio de comandos y Amazon Elasticache for Redis como base de datos del servicio de consultas.

La solución es casi exactamente la misma explorada en la tercera parte de esta serie, excepto que estamos utilizando una migration task de Amazon DMS en lugar del Debezium Connector para leer el log de transacciones de la bandeja de salida y estamos utilizando un stream de Amazon Kinesis Data Streams para recibir eventos en lugar de un tema de Amazon Managed Streaming for Apache Kafka.

Una de las ventajas de esta solución es que los cambios se extraen del log de transacciones de la bandeja de salida a un stream de Amazon Kinesis Data Streams, y la bandeja de salida puede ser limpiada en algún momento por un componente opcional que puede ejecutar cómputo (como una función Lambda) o combinando extensiones de Postgres, como pg_partman, con otras soluciones de AWS, como Amazon S3. Y Amazon DMS proporciona las herramientas necesarias para leer el log de transacciones de la bandeja de salida de forma transparente, para que no tengamos que instalar nada.

Otra ventaja es que, a diferencia de Kafka, el endpoint de destino que compone la migration task en Amazon DMS puede enviar datos a múltiples destinos. En este aspecto, esta solución es más flexible que la que utiliza Kafka, en la que necesitamos instalar diferentes connectors y sus plugins correspondientes para hacer esto. Estamos enviando datos a un stream de Amazon Kinesis Data Streams, pero podemos enviar datos a un bucket de S3, diferentes bases de datos NoSQL, etc.

Este es un punto a considerar al decidir entre un Kafka connector y Amazon DMS para leer el log de transacciones de la bandeja de salida. Podemos usar Debezium para transportar eventos desde la bandeja de salida a un tema de Kafka. Esta es la idea de llevar cambios de diferentes lugares a un tema de Kafka, y cada fuente de datos necesita un connector y un plugin correspondiente. De la misma manera, para llevar eventos del tema de Kafka a un destino, se necesita un nuevo connector y su plugin correspondiente. Para esto, primero necesitamos verificar si estos connectors están disponibles o incluso desarrollar los nuestros, lo que aumenta la complejidad general y el TCO de la solución. En este sentido, Amazon DMS es más flexible, ya que puede leer de varias fuentes de datos y también entregar cambios a muchos destinos diferentes.

Otro punto es que los eventos que se persistieron en la bandeja de salida se pueden reproducir, al igual que la solución presentada en la tercera parte de esta serie, que utiliza un tema de Kafka. Si queremos alimentar otra base de datos o reproducir eventos para intentar simular un bug, todos los eventos se pueden reproducir nuevamente.

La desventaja de esta arquitectura es que, aunque es más confiable, agrega más componentes y, por lo tanto, tenemos más complejidad y también es un poco más cara que las exploradas anteriormente.

Amazon DMS es más simple de usar que Debezium. Amazon DMS permite configurar fácilmente endpoints de varias fuentes y diversos destinos, mientras que Debezium requiere un connector y un plugin correspondiente para cada fuente y destino. Además, cuando se utiliza solo Amazon DMS, se utiliza un servicio administrado de AWS. Por otro lado, Debezium, al ser un proyecto open source, tiene más soporte de la comunidad y, por lo tanto, tiene más posibilidades de orígenes y destinos.

Ejecutando el Ejemplo

Para ejecutar el ejemplo, los lectores deben tener una cuenta de AWS y un usuario con permisos de administrador. A continuación, basta con ejecutar el paso a paso que se proporciona en el repositorio de código para esta serie de entradas de blog sobre CQRS, en AWS Samples, alojadas en Github. Al realizar el proceso paso a paso, los lectores dispondrán de la infraestructura que se presenta aquí en sus propias cuentas.

El ejemplo contiene dos endpoints, uno para recibir información relacionada con los pedidos (que representa nuestro servicio de comando) y el otro para recuperar información relacionada con los clientes (que representa nuestro servicio de consultas). Para comprobar que todo ha funcionado correctamente, ve a la API Gateway y, en la lista de API, introduce la API “OrdersAPI” y, a continuación, “Stages”. Solo habrá una stage llamada “prod”. Recupere el valor del campo Invoke URL y añada “/orders”. Este es el endpoint que recibe la información relacionada con los pedidos.

Hagamos una solicitud POST a ese endpoint. Podemos usar cualquier herramienta para realizar solicitudes, como cURL o Postman. Como este endpoint está protegido, también necesitamos añadir basic authentication. Si utilizas Postman, tendrás que recuperar el nombre de usuario y la contraseña generados al crear la infraestructura. En el API Gateway, vaya a “API Keys” y copie el valor de la columna “API Key” de “admin_key”. Este valor contiene el nombre de usuario y la contraseña separados por el carácter “:”, pero está codificado en Base64. Decodifique el valor con una herramienta en línea o con el comando “base64” de Linux. El nombre de usuario está a la izquierda del carácter “:” y la contraseña, a la derecha. Añada una “Authorization” del tipo “Basic Auth” y rellene los campos “Username” y “Password” con los valores recuperados. Añade también un header “Content-Type”, con el valor “application/json”.

Si está utilizando, por ejemplo, cURL, no será necesario decodificar el valor de la clave API. Simplemente agregue un header “Authorization” con el valor “Basic <valor de la API key copiado de la columna API key>”. También añade un header “Content-Type” con el valor “application/json”.

El payload para realizar solicitudes a este endpoint es el siguiente:

{
    "id_client": 1,
    "products": [{
        "id_product": 1,
        "quantity": 1
    }, {
        "id_product": 2,
        "quantity": 3
    }]
}

Esto representa un pedido realizado por el cliente con el identificador 1 y que contiene productos con los identificadores 1 y 2. El total de ese pedido es de $3000. Toda esta información se almacenará en Aurora. Al realizar esta solicitud POST, si todo funcionó según lo esperado, debería ver el siguiente resultado:

{
    "statusCode": 200,
    "body": "Order created successfully!"
}

Ahora, verifiquemos que la información relacionada con el cliente se haya enviado a Redis. Al endpoint de API Gateway, que se recuperó anteriormente, añada “/clients/1”. Este es el endpoint que recupera la información relacionada con el cliente. Hagamos una solicitud GET para ese endpoint. Al igual que hicimos con el endpoint “/orders”, necesitamos añadir basic authentication. Siga los pasos explicados anteriormente y realice la solicitud GET. Si todo ha funcionado según lo esperado, verás un resultado similar al siguiente:

{
    "name": "Bob",
    "email": "bob@anemailprovider.com",
    "total": 3000.0,
    "last_purchase": 1700836837
}

Esto significa que pudimos alimentar correctamente a Redis con información lista para ser leída, mientras la misma información está en Aurora, en otro formato.

Limpiando los Recursos

Para eliminar la infraestructura que se aprovisionó para no incurrir en costos, se necesitan algunos pasos. Como iniciar la migration task de Amazon DMS es un paso realizado después de la creación de la infraestructura, será necesario seguir algunos pasos. Para eliminar la infraestructura creada, en la consola, accede a la página de Amazon DMS. Luego, en el panel, debería haber una task activa. Navega hasta ella. En “Database migration tasks”, selecciona la task en ejecución y elige “Stop” en el menú “Actions” y confirma. Navega hasta la página de Amazon CloudFormation, elige la stack “cqrsOnAws”, elige “Delete” y confirma. Esto tomará aproximadamente 40 minutos. Luego, ve a la página de Amazon EC2 y, en “Network & Security”, ve a “Network Interfaces”. Elige la interfaz con la descripción “DMSNetworkInterface” y, en el menú “Actions”, elige “Delete” y confirma. Luego, accede a la página de Amazon VPC. Elige la VPC cuyo nombre comienza con “cqrsOnAws” y, en el menú “Actions”, elige “Delete” y confirma. Finalmente, regresa a la página de CloudFormation, elige la stack cuyo nombre comienza con “cqrsOnAws”, elige “Retry Delete”, luego “Retry delete this entire stack” y luego “Delete”.

Conclusión

En esta entrada de blog, exploramos otra forma de utilizar la técnica Transaction Log Tailing. La solución abordada aquí es casi la misma presentada en la entrada anterior de esta serie, excepto por la introducción de Amazon DMS, para leer el log de transacciones de la bandeja de salida, y también de un stream de Amazon Kinesis Data Streams, para recibir los eventos a ser publicados en otros servicios.

Para conectar el tema de SNS, que entrega el evento a dos colas SQS diferentes, utilizamos una canalización de EventBridge. La fuente de la canalización es el stream de Amazon Kinesis Data Streams, pasando por una función Lambda que limpia y filtra datos en la fase de enriquecimiento y, finalmente, se entrega al tema de SNS. A partir de ahí, el evento se entrega a dos colas SQS, que son leídas por funciones Lambda, siendo una de ellas la que actualiza Redis, que es la base de datos de nuestro servicio de consultas.

Con el estándar Transactional Outbox, cada vez que estamos a punto de cambiar el estado de un agregado en el servicio de comandos, persistimos en una bandeja de salida un evento que representa la situación que acaba de ocurrir con el aggregate, junto con el propio aggregate. Para publicar los cambios, utilizamos un componente que lee el log de transacciones de la bandeja de salida en la base de datos y publica los cambios a nivel de registro en otro componente. En el ejemplo explorado en esta publicación, utilizamos una migration task de Amazon DMS, que es el componente que lee el log de transacciones de una tabla y publica los cambios en un stream de Amazon Kinesis Data Streams.

Con el estándar Transactional Outbox, en algún momento todos los eventos serán publicados, por lo que no hay riesgo de no publicarlos. Es un enfoque relativamente simple, pero Transaction Log Tailing es un poco más complejo que los otros enfoques y también es más costoso. Sin embargo, es más resistente y ofrece la capacidad de reproducir eventos, ya sea para alimentar otra base de datos o para intentar reproducir un bug.

Ahora que hemos explorado diferentes formas de implementar CQRS en AWS, llevando cambios en una base de datos relacional a una base de datos NoSQL, abordaremos lo contrario en la próxima publicación: haremos los cambios de una base de datos NoSQL (Amazon DynamoDB, en nuestro caso) a una base de datos relacional (Aurora, en nuestro caso). ¡Ese es el caso que exploraremos en la próxima entrada de esta serie!

Este contenido és una traduccíon del blog original en Portugués (enlace acá).

Acerca del Autor

Roberto Perillo Roberto Perillo es un arquitecto de soluciones empresariales en AWS Brasil, especializado en sistemas serverless, que presta servicios a clientes del sector financiero y ha estado en la industria del software desde 2001. Trabajó durante casi 20 años como arquitecto de software y desarrollador Java antes de unirse a AWS en 2022. Es licenciado en Ciencia de la Computación, tiene una especialización en Ingeniería de Software y un máster también en Ciencia de la Computación. Un aprendiz eterno. En su tiempo libre, le gusta estudiar, tocar la guitarra y también ¡jugar a los bolos y al fútbol de mesa con su hijo, Lorenzo!

Acerca del Colaborador

Luiz Santos Luiz Santos trabaja actualmente como Technical Account Manager (TAM) en AWS y es un entusiasta de la tecnología, siempre busca nuevos conocimientos y tiene una mayor experiencia en el desarrollo de software, el análisis de datos, la seguridad, la tecnología serverless y DevOps. Anteriormente, tenía experiencia como arquitecto de soluciones de AWS y SDE.

Acerca de las Traductoras

Gabriela Guimarães Gabriela Guimarães es actualmente una de las 10 jóvenes que participan en el primer Tech Apprentice Program – Black Women Edition en AWS. Licenciado en Análisis y Desarrollo de Sistemas, estudia el mundo de la nube con enfoque en backend.
Maria Lucio Maria Lucio es Aprendiz de Tecnología, formando parte del primer Tech Apprentice Program – Black Women Edition en AWS. Licenciada en Técnico en Computación y Licenciada en Sistemas de Información, le apasiona la programación y orienta su trabajo como aprendiz hacia proyectos y estudios que le permitan evolucionar en el área del desarrollo de software.

Acerca de los Revisores

Daniel ABIB Daniel ABIB es un arquitecto de soluciones sénior en AWS y lleva más de 25 años trabajando en la gestión de proyectos, arquitecturas de soluciones escalables, desarrollo de sistemas y CI/CD, microservicios, arquitectura sin servidor y contenedores y seguridad. Trabaja apoyando a los clientes corporativos, ayudándolos en su transición a la nube.
Erika Nagamine Erika Nagamine es arquitecta de soluciones de datos en AWS. Tiene una sólida formación académica, con un título en Sistemas de Información, un posgrado en Administración de Bases de Datos, Ingeniería de Datos y una especialización en Minería de Datos Complejos de la Unicamp. Trabaja con clientes de varios segmentos y ha participado en más de 200 proyectos en Brasil y en todo el mundo. Actualmente cuenta con múltiples certificaciones en datos y computación en la nube, todas las certificaciones de AWS, y le encanta compartir conocimientos en las comunidades y dar conferencias en eventos técnicos destacados en Brasil y el mundo.
Gonzalo Vásquez Gonzalo Vásquez es Senior Solutions Architect de AWS Chile para clientes de los segmentos Independent software vendor (ISV) y Digital Native Business (DNB) de Argentina, Paraguay y Uruguay. Antes de sumarse a AWS, se desempeñó como desarrollador de software, arquitecto de sistemas, gerente de investigación y desarrollo y CTO en compañías basadas en Chile.