Step function intro and Pass multiple inputs into Map State

Sat, Apr 22, 2023

Read in 3 minutes

What are you going to learn in this blog ?

Overview about step function , step function elements and how to pass multiple input to the step function map with a scenario. How to read all objects in a s3 bucket using lambda in python.

What is step function ?

Step function is a orchestration service which helps connecting various AWS services like lambda, SQS, DynamoDB etc .

What are Elements of the Step function ?

States are elements in your state machine. Various functions performed by state ?

  1. Pass
  2. Task
  3. Wait
  4. Success
  5. Fail
  6. Parallel
  7. Map

For more details each function ref AWS doc - https://docs.aws.amazon.com/step-functions/latest/dg/concepts-states.html

Create step function for the below scenario

Reads the objects from S3 and each S3 object is a file which has a data and each row in that file needs to be added into dynamodb

Pseudocode :

ReadS3ObjectsLambda :

Import datetime
Import logging
From os import listdir
S3 = boto3.client('s3')

Def lambda_handler (event, context):
Keys = []
resp = s3.list_objects_V2(Bucket='testdatabucket')
# reads all the objects from the s3 bucket "testdatabucket" and print all the keys
For obj in resp['Contents']
Keys.append(obj['Key'])
Print("keys are")
Print(keys)
retrun {
'files_array' : keys ,
'bucket' : 'testdatabucket'
}

Stepfunction :


{
"Comment":"Step function to read s3 objects and update dynamoDB"
"StartAt":"ReadS3ObjectsLambda "
"States": (
"ReadS3ObjectsLambda ":
"Type": "Task"
"Resource": "ReadS3ObjectsLambda "
"Next": "Iterating S3 objects"
},
"Iterating S3 objects":{
"Type": "Map",
"ItemsPath": "'$. files_array",
"Parameters": {
"bucket.$": "$. bucket"
"current_file.$": $$.Map. Item. Value"
},
"MaxConcurrency": 1,
"Iterator": {
"StartAt":"Process file",
"States":
{
"Process file": {
"Type": "Task"
"Resource":"ProcessFileAndUpdateDynamoDBLambda "
"Next": "Wait"
"Retry": [
{
"ErrorEquals":
"Lambda. ServiceException"
"Lambda. AWSLambdaException"
"Lambda. SdkClientException"
],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate":1
}
],

"Catch": [
{
"ErrorEquals": [
"States. ALL"
].
"ResultPath": null,
"Next": "process file Failed"
}
]
},
"wait":{
"Type":"Wait",
"Seconds":30,
"End" : true
},
"process file Failed" : {
"Type" : "Pass",
"End" : true
}
}
},
"ResultPath":null,
"End":true
}
}
}


ProcessFileAndUpdateDynamoDBLambda


import datetime
import json
import boto3
from botocore. exceptions import ClientError
S3 = boto3. client ('s3")
boto3_ddb_client = boto3.client (' dynamodb")

import time

batchStartTime = time.time()
batchCurrentSize=0
maxUpdatePerSec 
batchStartTime = time.time()

#Lambda handler receives s3 current_file , parse the current_file and process each line to update DDB.

Def lambda_handler(event,bucket):
Global alreadyUpdatedRowsNumber
Count = 0
Result = s3.get_object(Bucket=event['bucket'],Key=event['current_file'])

For line in result['Body'].read.splitlines():
Each_line = line.decode('utf-8')
Print(each_line)
update_ddb_row(each_line.strip())
Count + =1

Return count


# udating dynamoDB

Def update__ddb_row(row_text):
Global boto3_ddb_client
Try:
Part_key ,range_key = get_keys(row_text)
boto3_ddb_client.update_item(
TableName = 'testTable' ,
Key = {
'product_partition_key' : {'S' : Part_key},
'product_range_key': {'S': range_key},
},

AttributeUpdates = {
'comment' : {'value':{'S': 'test comment updated  by lambda'}},
},

ReturnValues="UDTATED" 


### get the line in the file and get the partition key and range key

Def get_keys(raw_file_file)
Js = json.loads(raw_file_line.strip())
Try:
this_partition_key = js["paroduct_partition_key"]["s"]
this_range_key = js["product_range_key"]["s"]
Retrun this_partition_key  , this_range_key