Error with AWS step function input parameters

0

Hi Team,

I'm working on a step function and need to use a choice state to determine whether to return to a previous step. Below is my workflow configuration: { "Comment": "Placement data pipeline step function", "StartAt": "concurrency_check", "States": { "concurrency_check": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "ResultPath": "$.concurrency", "Parameters": { "FunctionName": ${concurrency_lambda}, "Payload": { "stepFunctionArn.$": "$$.StateMachine.Id", "otherStepFunctionArns.$": "$.concurrency.Payload.otherStepFunctionArns", "waitingInstanceDetails.$": "$.concurrency.Payload.waitingInstanceDetails", "parentExecutionArn.$": "$$.Execution.Id", "retryCount.$": "$.concurrency.Payload.retryCount" } }, "Retry": [ { "ErrorEquals": [ "SynchronizationError", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": ${step_retry_seconds}, "MaxAttempts": ${step_retry_attempts}, "BackoffRate": ${step_retry_backoff} } ], "Catch": [ { "ErrorEquals": ["States.ALL"], "Next": "FailureNotifier", "ResultPath": "$" } ], "Next": "is_execution_running" }, "is_execution_running": { "Type": "Choice", "Choices": [ { "Variable": "$.concurrency.Payload.action", "StringEquals": "PROCEED", "Next": "fivetran_sync" }, { "Not": { "Variable": "$.concurrency.Payload.action", "StringEquals": "PROCEED" }, "Next": "End" } ] }, "fivetran_sync": { "Type": "Wait", "Seconds": 10, "Next": "source_tests" }, "source_tests": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.sync", "Parameters": { "Cluster": "${ecs_cluster}", "TaskDefinition": "${ecs_task_name}", "LaunchType": "FARGATE", "PropagateTags": "TASK_DEFINITION", "NetworkConfiguration": { "AwsvpcConfiguration": { "Subnets": ${subnets}, "AssignPublicIp": "ENABLED", "SecurityGroups": [ "${security_group_1}" ] } }, "Overrides": { "ContainerOverrides": [ { "Name": "${ecs_task_name}", "Command.$": "$.source_tests", "Environment": [ { "Name":"PIPELINE_IDENTIFIER", "Value":"data-pipeline" }, { "Name": "ORIGINAL_INPUT", "Value": "$$.Execution.Input" } ] } ] } }, "Retry": [ { "ErrorEquals": ["States.ALL"], "IntervalSeconds": ${step_retry_seconds}, "BackoffRate": ${step_retry_backoff}, "MaxAttempts": ${step_retry_attempts} } ], "Catch": [ { "ErrorEquals": ["States.ALL"], "Next": "FailureNotifier", "ResultPath": "$" } ], "Next": "placement_transform", "ResultPath": "$.source_tests" }, "placement_transform": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.sync", "Parameters": { "Cluster": "${ecs_cluster}", "TaskDefinition": "${ecs_task_name}", "LaunchType": "FARGATE", "PropagateTags": "TASK_DEFINITION", "NetworkConfiguration": { "AwsvpcConfiguration": { "Subnets": ${subnets}, "AssignPublicIp": "ENABLED", "SecurityGroups": ["${security_group_1}"] } }, "Overrides": { "ContainerOverrides": [ { "Name": "${ecs_task_name}", "Command.$": "$.placement_transform", "Environment": [ { "Name":"PIPELINE_IDENTIFIER", "Value":"placement-pipeline" }, { "Name": "ORIGINAL_INPUT", "Value": "$$.Execution.Input" } ] } ] } }, "Retry": [ { "ErrorEquals": ["States.ALL"], "IntervalSeconds": ${step_retry_seconds}, "BackoffRate": ${step_retry_backoff}, "MaxAttempts": ${step_retry_attempts} } ], "Catch": [ { "ErrorEquals": ["States.ALL"], "Next": "FailureNotifier", "ResultPath": "$" } ], "Next": "placement_post_transformation_tests", "ResultPath": "$.placement_transform" }, "placement_post_transformation_tests": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.sync", "Parameters": { "Cluster": "${ecs_cluster}", "TaskDefinition": "${ecs_task_name}", "LaunchType": "FARGATE", "PropagateTags": "TASK_DEFINITION", "NetworkConfiguration": { "AwsvpcConfiguration": { "Subnets": ${subnets}, "AssignPublicIp": "ENABLED", "SecurityGroups": ["${security_group_1}"] } }, "Overrides": { "ContainerOverrides": [ { "Name": "${ecs_task_name}", "Command.$": "$.placement_post_transformation_tests", "Environment": [ { "Name":"PIPELINE_IDENTIFIER", "Value":"data-pipeline" }, { "Name": "ORIGINAL_INPUT", "Value": "$$.Execution.Input" } ] } ] } }, "Retry": [ { "ErrorEquals": ["States.ALL"], "IntervalSeconds": ${step_retry_seconds}, "BackoffRate": ${step_retry_backoff}, "MaxAttempts": ${step_retry_attempts} } ], "Catch": [ { "ErrorEquals": ["States.ALL"], "Next": "FailureNotifier", "ResultPath": "$" } ], "ResultPath": "$.placement_post_transformation_tests", "Next": "continue_loop" }, "continue_loop": { "Type": "Choice", "Choices": [ { "Variable": "$.continue_loop_flag", "StringEquals": "true", "Next": "fivetran_sync" } ], "Default": "End" }, "End": { "Type": "Succeed" }, "FailureNotifier": { "Type": "Pass", "Parameters": { "Cause.$": "States.StringToJson($.Cause)" }, "Next": "Failure" }, "Failure": { "Type": "Fail" } } } And below are my parameters: { "continue_loop_flag" : "${CONTINUE_LOOP}", "concurrency": { "Payload": { "retryCount": 0, "waitingInstanceDetails": [], "otherStepFunctionArns": ["${LINKED_STATE_MACHINE_ARN}"] } }, "source_tests": [ "./dbtWrapper.sh", "test", "--select", "${SOURCE_TEST_SELECT}", "--target", "${ENV}", "--vars", "{source_database: ${SOURCE_DATABASE}, content_source_database: ${CONTENT_SOURCE_DATABASE}, research_grd_database: ${RESEARCH_GRD_DATABASE}}" ], "placement_transform": [ "./dbtWrapper.sh", "run", "--select", "${RUN_SELECT}", "--target", "${ENV}", "--vars", "{source_database: ${SOURCE_DATABASE}, content_source_database: ${CONTENT_SOURCE_DATABASE}, research_grd_database: ${RESEARCH_GRD_DATABASE}}" ], "placement_post_transformation_tests": [ "./dbtWrapper.sh", "test", "--select", "${RUN_SELECT}", "--exclude", "${POST_TRANSFORM_TEST_EXCLUDE}", "--target", "${ENV}", "--vars", "{ source_database: ${SOURCE_DATABASE}, content_source_database: ${CONTENT_SOURCE_DATABASE}, research_grd_database: ${RESEARCH_GRD_DATABASE}, validate_since_synced_days_ago: 2, dbt_constraints_enabled: ${DBT_CONSTRAINTS_ENABLED} }" ] } When I run the step function, the first iteration executes as expected. However, the second iteration encounters an error at the source_tests step. Below is the error message: "Cpu":"1024","CreatedAt":1746656695733,"DesiredStatus":"STOPPED","EnableExecuteCommand":false,"EphemeralStorage":{"SizeInGiB":20},"ExecutionStoppedAt":1746656775389,"Group":"family:dev-chan-insights-data-dbt","InferenceAccelerators":[],"LastStatus":"STOPPED","LaunchType":"FARGATE","Memory":"2048","Overrides":{"ContainerOverrides":[{"Command":["./dbtWrapper.sh","test","--select","source:mongo_test_event_source","--target","dev","--vars","{source_database: DISTRICT_CHAN_RAW_DB, content_source_database: DISTRICT_DEV_RAW_CONTENT_SUPPLY, research_grd_database: RESEARCH_DEV_GRD_DB}"],"Environment":[{"Name":"PIPELINE_IDENTIFIER","Value":"data-pipeline"},{"Name":"ORIGINAL_INPUT","Value":"$$.Execution.Input"}],"EnvironmentFiles":[],"Name":"dev-chan-insights-data-dbt","ResourceRequirements":[]}],"InferenceAcceleratorOverrides":[]},"PlatformVersion":"1.4.0","PullStartedAt":1746656707370,"PullStoppedAt":1746656727744,"StartedAt":1746656731950,"StartedBy":"AWS Step Functions","StopCode":"EssentialContainerExited","StoppedAt":1746656798448,"StoppedReason":"Essential container in task exited","StoppingAt":1746656785420,"Tags":[],"TaskArn":"arn:aws:ecs:us-west-2:952817440625:task/dev-chan-insights-data-dbt/5cbd7ced8d7541fa8b0bf7d3937d067d","TaskDefinitionArn":"arn:aws:ecs:us-west-2:952817440625:task-definition/dev-chan-insights-data-dbt:629","Version":5}}]}}' could not be used to start the Task: [Cannot deserialize value of type java.util.ArrayList<java.lang.String> from Object value (token JsonToken.START_OBJECT)]

asked 6 days ago100 views
7 Answers
0

Yep, this issue happens a lot with Step Functions when you're trying to increment a counter and accidentally wrap it inside another object. What you're seeing: "iteration_count": { "iteration_count": 1 } is because the Pass state is nesting the result by creating a new object inside the existing iteration_count key.

The fix is simple and works: Instead of writing the result like this:

"ResultPath": "$.iteration_count", "Parameters": { "iteration_count.$": "States.MathAdd($.iteration_count, 1)" } You just want to replace the whole iteration_count field directly with the new value. Do this instead:

"ResultPath": "$.iteration_count", "Parameters": { "Value.$": "States.MathAdd($.iteration_count, 1)" }

That’ll give you:

"iteration_count": 2 If you want to avoid that "Value" wrapper too, another clean option is to just do:

"ResultPath": "$.iteration_count", "Parameters": { "iteration_count.$": "States.MathAdd($.iteration_count.iteration_count, 1)" }

=================================================== Like, Replace just the iteration_count field (cleanway):

"increment_counter": { "Type": "Pass", "ResultPath": "$.iteration_count", "Parameters": { "iteration_count.$": "States.MathAdd($.iteration_count, 1)" }, "Next": "continue_loop" } This will ensure that the value under $.iteration_count is directly updated to a number (e.g., 2, 3, etc.), not an object.

======================================================

answered 4 days ago
  • resource"aws_iam_role""step_function_role"{name="${var.prefix}-insights-data-pipeline"description="AllowStepFunctionstoaccessAWSresources"assume_role_policy=jsonencode({Version:"2012-10-17",Statement:[{Effect:"Allow",Principal:{Service:"states.amazonaws.com"},Action:"sts:AssumeRole"}]})}resource"aws_iam_policy""step_function_policy"{name="${var.prefix}-step-function-policy"policy=jsonencode({Version:"2012-10-17",Statement:[{Action:["ecs:DescribeTasks","ecs:RunTask","ecs:StopTask"],Effect:"Allow",Resource:""},{Action:["iam:PassRole"],Effect:"Allow",Resource:""},{Action:["events:DescribeRule","events:PutTargets","events:PutRule"],Effect:"Allow",Resource:""},{Action:["lambda:InvokeFunction",],Effect:"Allow",Resource:""}]})}resource"aws_iam_role_policy_attachment""attach_policy_step_function_role"{role=aws_iam_role.step_function_role.namepolicy_arn=aws_iam_policy.step_function_policy.arn}#Allowingstepfunctiontriggerstorunthestepfunctionsdata"aws_iam_policy_document""step_function_assume"{statement{actions=["sts:AssumeRole"]principals{type="Service"identifiers=["states.amazonaws.com","events.amazonaws.com"]}}}resource"aws_iam_role""allow-step-function-execution"{name="${var.prefix}-allow-step-function-execution-role"assume_role_policy=data.aws_iam_policy_document.step_function_assume.json}

  • resource"aws_iam_role_policy""state-execution"{name="${var.prefix}-allow-step-function-execution-policy"role=aws_iam_role.allow-step-function-execution.idpolicy=jsonencode({Version:"2012-10-17",Statement:[{Effect:"Allow",Action:["states:StartExecution"],Resource:["arn:aws:states:${var.aws_region}:${var.nwea_aws_account_id}:stateMachine:${aws_sfn_state_machine.data_pipeline_dbt_workflow.name}","arn:aws:states:${var.aws_region}:${var.nwea_aws_account_id}:stateMachine:${aws_sfn_state_machine.sfn_dbt_freshness_workflow.name}","arn:aws:states:${var.aws_region}:${var.nwea_aws_account_id}:stateMachine:${aws_sfn_state_machine.placement_pipeline_dbt_workflow.name}",]}]})}resource"aws_iam_role_policy_attachment""attach_state_execution_policy"{role=aws_iam_role.allow-step-function-execution.namepolicy_arn=aws_iam_role_policy.state-execution.policy}

  • I was able to work this out by passing and comparing against the value field. Now, I am trying to execute another instance of the Step Function when it reaches the counter limit and pass the same original input, as shown below: "start_new_exec": { "Type": "Task", "Resource": "arn:aws:states:::states:startExecution", "Parameters": { "StateMachineArn.$": "$.step_function_arn" }, "End": true } However, I am encountering an error indicating that the assume role does not have access to execute my Step Function. The step_function_execution_policy.tf I am using it (as shown in other comments), but this has not resolved the issue. What changes do I need to make to get this working? Below is the error that I was getting: User: arn:aws:sts::952817440625:assumed-role/dev-chan-insights-data-insights-data-pipeline/bHxKXeLerfFMOaueEsTeMeMhXWHYBWGc is not authorized to perform: states:StartExecution on resource: arn:aws:states:us-west-2:952817440625:stateMachine:dev-chan-insights-data-pipeline because no identity-based policy allows the states:StartExecution action (Service: AWSStepFunctions; Status Code: 400; Error Code: AccessDeniedException; Request ID: 4e923f8c-c171-478d-a45b-64cc16551d94; Proxy: null)

0

This is what I am thinking for now:

  1. Avoid indefinite looping in a single execution. Even though Standard Step Functions can run for up to 1 year, each execution is capped at 25,000 state transitions.

Once that limit is hit, your workflow will fail with a HistoryLimitExceeded error, and debugging gets harder as the execution grows in size.

  1. Use loop batching + handoff after N iterations. Your idea of ending the current execution after, say, 100 loops, then starting a new Step Function with the same input, is a solid pattern.

This keeps the execution history manageable and gives you better observability and control.

  1. Trigger new executions cleanly You can trigger the next batch using:

A Lambda inside your Step Function using StartExecution

Or use an EventBridge rule if you prefer separating orchestration logic

Track the iteration count in your state input (e.g., iterationCount) so you know when to restart.

Further, you can do like: Add an iteration counter to your state input: "iterationCount": 0 In your loop logic, increment the count and include a Choice state to:

Continue loop if under threshold (e.g., < 100)

Trigger StartExecution with the same input (or updated input) if the threshold is reached.

Use "End" to gracefully close out the current execution.

answered 6 days ago
  • Thank you for the inputs. I will explore both options: using a Lambda inside the Step Function with StartExecution and using an EventBridge rule to separate orchestration logic. For now, I prefer to start with Lambda execution and pass the same input.

    Additionally, I am using a scheduler to trigger the Step Function daily to check if it is running, and if it is, I exit the loop. I wonder if triggering a new Step Function will result in a condition where both the scheduled and loop triggers stop executing, or will it start new instances? Or is using an EventBridge rule a better option here?

  • I added an increment step block after the placement_post_transformation_tests block to keep incrementing the loop after each iteration, like this: { "increment_counter": { "Type": "Pass", "Parameters": { "iteration_count.$": "States.MathAdd($.iteration_count, 1)" }, "ResultPath": "$.iteration_count", "Next": "continue_loop" }, "continue_loop": { "Type": "Choice", "Choices": [ { "And": [ { "Variable": "$.continue_loop_flag", "StringEquals": "true" }, { "Variable": "$.iteration_count", "NumericEquals": 2 } ], "Next": "fivetran_sync" } ], "Default": "End" } } This is passing the input for the next iteration as: { "continue_loop_flag": "true", "iteration_count": { "iteration_count": 1 } } instead of: { "continue_loop_flag": "true", "iteration_count": 1 } How can I pass the incremented iteration_count for each loop with the same JSON input?

0

Nice, sounds like your loop is coming together well now.

Regarding the states:StartExecution error ,that one’s pretty common when chaining Step Function executions from within another Step Function.

The error means the IAM role that your current Step Function is running under (in this case, the role dev-chan-insights-data-insights-data-pipeline) doesn’t have permission to start another execution of the same Step Function, or any other, depending on your config.

In your IAM policy attached to that role, make sure you’ve added this permission: { "Effect": "Allow", "Action": "states:StartExecution", "Resource": "arn:aws:states:us-west-2:952817440625:stateMachine:dev-chan-insights-data-pipeline" } If you want to allow it to start any Step Function in your account (e.g., for chaining different workflows), you can loosen it slightly:

"Resource": "arn:aws:states:us-west-2:952817440625:stateMachine:*"

Where to add it: Either in your role’s inline policy Or in the step_function_execution_policy.tf file (assuming you're using Terraform) Make sure the role assumed by the Step Function (the one shown in the error) actually has this policy attached sometimes the role is created separately from the state machine.

Additionally, I'm thinking, but please cross-check once. If you're passing input, don't forget to add the Input.$ field in your startExecution step too, like: "Parameters": { "StateMachineArn.$": "$.step_function_arn", "Input.$": "$$.Execution.Input" } That’ll make sure the new execution gets the same starting payload.

answered 4 days ago
  • Hi again — thank you so much for your help earlier! I updated my step_function_execution_policy.tf with the following code, and it’s working as expected now:resource "aws_iam_policy" "step_function_start_execution_policy" {name= "${var.prefix}-step-function-start-execution" description = "Allow starting Step Function executions" policy = jsonencode({Version = "2012-10-17" Statement = [{Effect = "Allow"Action = ["states:StartExecution"]Resource=[ "arn:aws:states:${var.aws_region}:${var.nwea_aws_account_id}:stateMachine:${aws_sfn_state_machine.placement_pipeline_dbt_workflow.name}*"]}]})} resource "aws_iam_role_policy_attachment" "step_function_start_execution_attachment" {role = aws_iam_role.step_function_role.name policy_arn = aws_iam_policy.step_function_start_execution_policy.arn} Quick question: Once the loop counter reaches its limit, I'm triggering a new Step Function execution using startExecution. At the beginning of the Step Function, I have a concurrency check like this:"is_execution_running": {"Type": "Choice","Choices": [{"Variable": "$.concurrency.Payload.action","StringEquals": "PROCEED","Next": "increment_counter"},{"Not": {"Variable": "$.concurrency.Payload.action", "StringEquals": "PROCEED"}, "Next": "End"}]} I'm wondering what are the chances that the current Step Function execution reaches the End state immediately after triggering the new Step Function, before the new one gets fully starts running?In some cases,I notice this

  • Also when I pass input to the next execution how can I pass the iteration count value to 0 instead of the incremented counter: "iteration_count": { "Value": 0 }, to below state of the step function "start_new_exec": { "Type": "Task", "Resource": "arn:aws:states:::states:startExecution", "Parameters": { "StateMachineArn.$": "$.step_function_arn", "Input.$": "$" }, "End": true },

0

The error you're encountering in the second iteration of your Step Function, specifically at the source_tests step, appears to be related to a data type mismatch in the input parameters. The error message suggests that the system is expecting an array of strings for a certain parameter, but it's receiving an object instead.

This issue likely stems from how the data is being passed between states in your Step Function. After the first iteration, it seems that the structure of your input data is changing, causing the source_tests step to receive an unexpected data type.

To resolve this, you should:

  1. Check the output of the previous step (placement_post_transformation_tests or continue_loop) to ensure it's maintaining the correct data structure.

  2. Verify that the source_tests step is receiving the expected input format. You might need to add a Pass state before source_tests to reshape the data if necessary.

  3. Ensure that the "Command. The error you're encountering in the second iteration of your Step Function, specifically at the source_tests step, appears to be related to a data type mismatch in the input parameters. The error message suggests that the system is expecting an array of strings for a certain parameter, but it's receiving an object instead.

This issue likely stems from how the data is being passed between states in your Step Function. After the first iteration, it seems that the structure of your input data is changing, causing the source_tests step to receive an unexpected data type.

To resolve this, you should:

  1. Check the output of the previous step (placement_post_transformation_tests or continue_loop) to ensure it's maintaining the correct data structure.

  2. Verify that the source_tests step is receiving the expected input format. You might need to add a Pass state before source_tests to reshape the data if necessary.

  3. Ensure that the "Command.$" parameter in the source_tests task is correctly referencing an array of strings. You might need to adjust the JsonPath expression to select the correct part of your state data.

  4. Consider adding more robust error handling and data validation at each step to catch and handle unexpected data structures.

  5. Use the Step Functions console to examine the exact input and output of each state during execution. This can help pinpoint where the data structure is changing unexpectedly.

  6. If the data structure is intentionally changing between iterations, you may need to add logic to handle different input formats in your source_tests task.

Remember, when working with Step Functions, it's crucial to ensure that the output of each state matches the expected input of the next state, especially when dealing with loops or multiple iterations. quot; parameter in the source_tests task is correctly referencing an array of strings. You might need to adjust the JsonPath expression to select the correct part of your state data.

  1. Consider adding more robust error handling and data validation at each step to catch and handle unexpected data structures.

  2. Use the Step Functions console to examine the exact input and output of each state during execution. This can help pinpoint where the data structure is changing unexpectedly.

  3. If the data structure is intentionally changing between iterations, you may need to add logic to handle different input formats in your source_tests task.

Remember, when working with Step Functions, it's crucial to ensure that the output of each state matches the expected input of the next state, especially when dealing with loops or multiple iterations.

Sources
Empty payload in the Pass step of Step Function Step Machine when triggered by an EventBridge event | AWS re:Post
Step Function Error Handling | AWS re:Post

profile picture
answered 6 days ago
  • Thank you for the input. I want to pass the original input to the next iteration of source_tests to ensure the loop continues. I see that the actual parameters are in JSON format. Could you please advise on the best way to capture the original input JSON and pass it in continue_loop as input to the next iteration of source_tests? Since the first iteration is passing successfully, I want to ensure that subsequent iterations receive the same input as the first iteration of source_tests so the loop continues smoothly.

0

The issue comes down to how Step Functions is handling your source_tests input after the first run.

Form the error: Cannot deserialize value of type java.util.ArrayList<java.lang.String> from Object value (token JsonToken.START_OBJECT)

Cannot deserialize value of type java.util.ArrayList<java.lang.String> from Object value (token JsonToken.START_OBJECT) …it looks like the ECS task is expecting a list of strings for the Command, but it’s receiving an object instead. That usually means your original array (the list of command arguments) got overwritten somewhere along the way, likely by the result of a previous step.

What’s likely happening: In your first run, source_tests is a clean list like: [ "./dbtWrapper.sh", "test", "--select", "source:mongo_test_event_source", "--target", "dev", "--vars", "{...}" ]

But after the ECS task finishes, if you're using something like "ResultPath": "$.source_tests", then Step Functions will replace source_tests with the ECS task result, which is a JSON object, and not valid for Command.

So in the second loop, it tries to use that object as the command, and ECS throws the deserialization error.

How to fix it: To avoid that conflict, you can:

  1. Change the ResultPath in your ECS task step to store the result somewhere else, like $.source_tests_result

  2. Or keep your command input inside a nested object like $.input.source_tests

For example: "ResultPath": "$.source_tests_result", "Command.$": "$.input.source_tests"

and update ypur json to

"input": { "source_tests": [ ... ] }

This keeps your original command array intact across multiple loops.

answered 6 days ago
  • Okay, thanks again. I took the approach of keeping the "ResultPath": "$.source_tests_result" for the following steps, and it is successful now.

    I have a question regarding continuously running this loop in production to serve data faster. Is it advisable to loop continuously this way, or are there any execution limits on Step Function instances or the history of executions that we can't see after 90 days in case of errors?

    Another approach I'm considering is running a separate instance after 100 loops and triggering a new Step Function instance with the same input, then ending the current instance. Are there any other ways to achieve this more efficiently? Or is it a good idea to spin up a different instance for each iteration, which may result in 50 or more instances per day?

    Thanks for your time and I appreciate your response.

0

Hey, glad to hear the StartExecution permissions are working now, and great question about chaining executions and resetting the counter.

  1. When the current Step Function ends immediately after triggering a new one, there’s a chance the new execution might not start before the old one exits. Step Functions' StartExecution is asynchronous, meaning the parent just triggers the new one and moves on.

If you're using a concurrency check at the start of the next execution, it's possible that the original execution hasn't ended yet when the new one checks.

In practice, this usually resolves itself quickly, but if strict ordering matters, you can add a short Wait state (e.g., 5–10 seconds) after the StartExecution to give the new run a buffer before doing the check.

  1. If you're passing the whole input (Input.$: "$"), and iteration_count is already incremented in the current run, that value will carry over. To override it, just use Parameters to rebuild the input with the fields you want: "start_new_exec": { "Type": "Task", "Resource": "arn:aws:states:::states:startExecution", "Parameters": { "StateMachineArn.$": "$.step_function_arn", "Input": { "iteration_count": 0, "continue_loop_flag": "true", "concurrency": { "Payload": { "retryCount": 0, "waitingInstanceDetails": [], "otherStepFunctionArns.$": "$.concurrency.Payload.otherStepFunctionArns" } }, "source_tests.$": "$.source_tests", "placement_transform.$": "$.placement_transform", "placement_post_transformation_tests.$": "$.placement_post_transformation_tests" } }, "End": true }

Basically: rebuild the Input payload manually and just reset the fields you want, like iteration_count = 0.

answered 2 days ago
  • Hi, thanks again. Yes, I have implemented all these steps and saved the input for the initial iteration. To handle parallel executions of step functions, I am considering using an EventBridge approach. This will allow me to manage the messages so that the next step function runs correctly. I have a scheduler that initially triggers the step function, but these executions are causing parallel processing issues. Therefore, regardless of the approach, I will need to address these parallel processing challenges. Could you suggest the best way to handle these scenarios?

0

Great to hear you've implemented all the core steps. When it comes to managing parallel Step Function executions (especially when triggered by a scheduler or chaining logic), you're right, it can cause race conditions or unexpected behavior if not controlled.

Here are a few practical options to manage and avoid unwanted parallel executions:

  1. Use a Concurrency Guard via DynamoDB or S3 Set a "lock" at the beginning of the Step Function using a DynamoDB item or a marker file in S3. Your flow would:

Check if an execution is already running (based on timestamp or status flag) If yes, exit early or wait If no, write the lock and continue Clear the lock at the end or upon failure (with Catch block fallback) This works well with EventBridge triggers or scheduled runs.

  1. Use a Custom EventBridge Rule with Input Filtering Instead of triggering your Step Function directly, you can send an event to EventBridge with metadata (e.g., timestamp, job ID), and have a Lambda in between that:

Validates whether a Step Function is already running (maybe by listing executions with DescribeExecutions)

If nothing is active → trigger Step Function

If one is running → skip or reschedule (e.g., via SQS DLQ or another EventBridge rule)

This gives you more centralized control and decouples the trigger logic.

  1. Use Step Functions' Built-in Execution ID Control (if applicable). If you're triggering via StartExecution, you can provide a custom name for the execution. AWS will return an error if one with the same name is already running. You can catch that error and decide to skip or retry later.

Example: "ExecutionName": "my-daily-run-{{timestamp}}" Or use a static name like "daily-job" and enforce one-at-a-time runs.

  1. Move the Scheduler Outside and Use Conditional Triggering If the scheduler is what's introducing duplicates, consider replacing it with:

An EventBridge rule that triggers based on a condition (e.g., CloudWatch metric, status check)

Or use a control Step Function that decides whether to launch the actual processing flow (this is sometimes called a "controller–worker" model)

answered 2 days ago
  • Thank you for sharing these options. I’ll need some time to explore them further and better understand them. Are there any architectural documents or references that could help me proceed with a proof of concept (POC)? I’ll review the materials and follow up if I have any questions. I appreciate your continued support—thank you!

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions