Automating AWS Step Functions: Programmatically Initiating Workflows for Seamless Orchestration
Recently, I came across a scenario that required using a programmatic approach, such as invoking Step Functions directly from a Lambda function over triggering Step Functions via API Gateway directly.
While developing the Cloudysky portal, I came across a scenario that basically processes the Sanity CMS webhook data and initiate a state machine workflow to cross post and publish my blogs into various blogging and logging platform such as medium, Devto, Hashnode and AWS opensearch for logging. Initial intent was to just trigger and pass the data from API gateway directly to the step function workflow and that didn't really scale as I ended up doing same repeated data transformation in each state machine lambda functions. So, I thought of adding a Lambda function between the API gateway and State Machine which can do that data transformation and initiate state machine workflow by passing the transformed data.
Here is how I was able to build this solution and I have used Cloudformation to build the required infrastructure.
The below code snippet creates the API gateway resource
SanityEventApiGateway:
Type: AWS::Serverless::Api
Properties:
StageName: Prod
GatewayResponses:
DEFAULT_2XX:
ResponseParameters:
Headers:
Access-Control-Allow-Origin: "'*'"
StatusCode: 200
ResponseTemplates:
application/json: '{ "message": "Sucessfully processed the event" }'
Here is the step function initiator lambda function that runs when any webhook event is received from Sanity CMS once configured. Also, note that StepFunctionInitiatorFunction will require StepFunctionsExecutionPolicy to allow permission to kick of the workflow. In addition, I have also provided SecretsManagerReadWrite which is used to fetch required credentials for data transformation.
StepFunctionInitiatorFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: stepfunction-initiator-lambda/
Handler: app.lambdaHandler
Runtime: nodejs16.x
Architectures:
- x86_64
Policies:
- SecretsManagerReadWrite
- StepFunctionsExecutionPolicy:
StateMachineName: !GetAtt StateMachine.Name
Environment:
Variables:
StateMachineArn: !GetAtt StateMachine.Arn
Events:
HelloWorld:
Type: Api # More info about API Event Source: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api
Properties:
RestApiId: !Ref "SanityEventApiGateway"
Path: /webhook
Method: post
Also, refer the state machine and StatesExecutionRole defined in the Cloudformation template. State machine will require the reference to Amazon State Language file that defines the workflow and has StatesExecutionRole that allows lambda:InvokeFunction for each of the lambda functions within the workflow.
StateMachine:
Type: AWS::Serverless::StateMachine # More info about State Machine Resource: https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-resource-statemachine.html
Properties:
DefinitionUri: statemachine/stateMachine.asl.json
DefinitionSubstitutions:
MediumArticleLambdaFunctionArn: !GetAtt MediumArticleLambdaFunction.Arn
DevtoArticleLambdaFunctionArn: !GetAtt DevtoArticleLambdaFunction.Arn
HashnodeArticleLambdaFunctionArn: !GetAtt HashnodeArticleLambdaFunction.Arn
OpenSearchLoadArticleFunctionArn: !GetAtt OpenSearchLoadArticleFunction.Arn
Role:
Fn::GetAtt: [ StatesExecutionRole, Arn ]
Logging:
Destinations:
- CloudWatchLogsLogGroup:
LogGroupArn: !GetAtt StateMachineLogGroup.Arn
IncludeExecutionData: false
Level: 'ALL'
StatesExecutionRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- !Sub states.${AWS::Region}.amazonaws.com
Action: "sts:AssumeRole"
Path: "/"
Policies:
- PolicyName: LambdaExecute
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- "lambda:InvokeFunction"
Resource:
- !GetAtt MediumArticleLambdaFunction.Arn
- !GetAtt DevtoArticleLambdaFunction.Arn
- !GetAtt HashnodeArticleLambdaFunction.Arn
- !GetAtt OpenSearchLoadArticleFunction.Arn
- Effect: Allow
Action:
- "cloudwatch:*"
- "logs:*"
Resource: "*"
Now, lets see the code which does the programatic call to start the workflow. This is just a snippet and can be refactored based on the need.
The below snippet for Lambda function receives the Sanity CMS webhook event data and trims down the data to required ones.
For easier reference, I have excluded the data transformation section and this is basically to show that any type of data manipulation can be done here.
Also, fetches additional data from sanity connecting through sanity client and gets required credentials from secrets manager.
Finally, it creates an instance to connect to Step Functions using AWS-SDK.
Last step of this flow is to call the stepfunctions.startExecution function with the data you would want to pass it to the step function workflow.
let response;
let blogData;
const AWS = require('aws-sdk');
const moment = require('moment');
const { createClient } = require('@sanity/client');
const secretManagerClient = new AWS.SecretsManager();
exports.lambdaHandler = async (event, context) => {
// Step Function execution section
const sfnArn = process.env.StateMachineArn;
const rightNow = moment().format('YYYYMMDD-hhmmss');
let blog = JSON.parse(event.body);
// Write data transformation logic
// Get the secret value for sanity
const secret_name = 'sanity-cms';
let response;
try {
response = await secretManagerClient
.getSecretValue({
SecretId: secret_name,
})
.promise();
} catch (error) {
throw error;
}
const secret = JSON.parse(response.SecretString);
const SANITY_PROJECT_ID = secret.SANITY_PROJECT_ID;
const SANITY_DATASET = secret.SANITY_DATASET;
// Create a sanity client
const config = {
dataset: SANITY_DATASET,
projectId: SANITY_PROJECT_ID,
apiVersion: '2022-11-16',
useCdn: false,
};
const client = createClient(config);
const query = `*[_type == 'post' && _id == '${blogID}']{
categories[] -> {
title,
slug
}
}`;
// Get the categories
const categories = await client.fetch(query);
const data = {
id: blog._id,
title: blog.title,
description: blog.description,
body: blog.body,
categories: categories[0].categories,
createdAt: blog._createdAt,
updatedAt: blog._updatedAt,
mainImage: blog.mainImage,
};
const params = {
stateMachineArn: sfnArn,
input: JSON.stringify(data),
name: blogID + '-' + rightNow,
};
const stepfunctions = new AWS.StepFunctions();
try {
response = await stepfunctions.startExecution(params).promise();
} catch (err) {
console.error(err);
}
return response;
};
There are several scenarios where using a programmatic approach, such as invoking Step Functions directly from a Lambda function, may be preferable over triggering Step Functions via API Gateway:
Fine-grained control: When you need precise control over the invocation of your Step Functions workflow and want to perform custom logic or transformations on the input data before triggering the workflow, a programmatic approach provides more flexibility.
Complex data transformations: If you have complex data transformations or data preprocessing requirements before triggering the workflow, using a Lambda function allows you to handle these tasks efficiently. You can perform format conversions, or data validation as needed within the Lambda function before passing the processed data to the Step Function.
Orchestration with multiple services: If your workflow involves coordination and interaction with multiple AWS services or external systems, a programmatic approach can be beneficial. The Lambda function can act as the central orchestrator, integrating with various services and systems, making API calls, retrieving data, and applying business logic to decide when and how to trigger the Step Functions workflow.
Watch out for more blogs in this space….
Subscribe to my newsletter
Read articles from Sandeep K Yaramchitti directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by