Data Aggregation in AWS Lambda using Kumologica

Kumologica
4 min readOct 5, 2020

--

Data aggregation is a very common task found in the integration space. It is the process of gathering data from different datasource or systems in order to summarise and analyse it later. The popular enterprise integration pattern (EIP) followed for achieving this is Scatter-Gather.

Scatter Gather pattern

In this article we will see how to build a data aggregation service in AWS lambda using Kumologica flow. Kumologica is a free low-code development tool to build serverless integrations. You can learn more about Kumologica in this medium article or subscribe to our YouTube channel for the latest videos.

The flow will demonstrate how the service sends the request to multiple datasources and collating the responses from each datasource before processing it for the final response.

Use case

An electronics department store is having an online portal for the staff to check the inventory. The database used by the portal needs to be updated regularly by picking the current stock details from the 3 vendor systems. A service is required to invoke the 3 vendor systems simultaneously and send the aggregated response to a queue. The service that picks the message from the queue and update the portal database is not in the scope of this use case.

Flow Logic

Prerequisite

  1. Kumologica designer installed in your machine. https://kumologica.com/download.html
  2. Create an Amazon SQS queue with the name Warehousequeue

3. For simulating the vendor systems we need 3 vendor APIs. Import the following flow to your Kumologica designer and deploy. This flow will simulate the APIs endpoints from the 3 vendors.

Note: Keep a copy of the URL after the deployment of the flow. This URL will be used in our data aggregation service implementation.

Implementation

Steps:

  1. Open Kumologica Designer, click the Home button and choose Create New Kumologica Project.
  2. Enter name (for example DataAggregationFlow), select directory for project and switch Source into From Existing Flow …
  3. Copy and Paste the following flow
  4. press Create Button.

You should be seeing flow as given below on the designer canvas.

Data Aggregation flow implementation

Understanding the flow

  1. GET /inventory is the EventListener node is configured to have the EventSource as Amazon API gateway. The node will have the following configuration.
verb : GET
path : /inventory

2. Log is the Logger node to print the message in Amazon CloudWatch on entry of the flow.

Message : Request recieved

3. Scatter is the scatter node which will be sending the request to all the 3 vendors HTTP endpoints simultaneously.

4. VendorA, VendorB, VendorC are the HTTP request nodes for invoking the vendor APIs.

VendorA
Method : GET
Url : https://<<instanceid>>.execute-api.ap-southeast-2.amazonaws.com/test/vendorA/inventory
Return : a parsed JSON object
VendorB
Method : GET
Url : https://<<instanceid>>.execute-api.ap-southeast-2.amazonaws.com/test/vendorB/inventory
Return : a parsed JSON object
VendorC
Method : GET
Url : https://<<instanceid>>.execute-api.ap-southeast-2.amazonaws.com/test/vendorC/inventory
Return : a parsed JSON object

5. Gather is the gather node to aggregate response from all the 3 vendor APIs.The flow will wait till all the response are received from the 3 vendors.

6. JSON is the JSON node to convert JSON message payload to String in order to send to Amazon SQS queue.

Payload : msg.payload

7. SQS is the node to send the message to Amazon SQS queue.

Operation : SendMessage
QueueUrl : <<Provide the url for the created Warehousequeue>>
MessageBody : msg.payload

8. Success is the EventListener End node, the final node to complete the flow.

Status Code : 200
Content-Type : application/json
Payload : {"status" : "completed"}

Deployment

  1. Select CLOUD tab on the right panel of Kumologica designer, select your AWS Profile.
  2. Set the Memory to 512mb and Timeout as 20 seconds.
  3. Go to “Trigger” section under cloud tab and select the Amazon API Gateway trigger.
API trigger setting

Try it

  1. Invoke the following endpoint using the any REST client of your choice.
https://<<gateway instance id>>.execute-api.ap-southeast-2.amazonaws.com/test/inventory

You should see the following message in the Amazon SQS queue (Warehousequeue).

Aggregated payload in SQS queue

Conclusion

This article has shown how easy to develop a serverless data aggregation API using Kumologica Designer.

Remember Kumologica is totally free to download and use. Go ahead and give it a try, we would love to hear your feedback.

--

--

Kumologica
Kumologica

Written by Kumologica

Kumologica is the first low-code development solution that makes your integration services run on serverless compute regardless the cloud provider.

No responses yet