Skip to content

Eventbridge Pipe failures after deploy, settle over time

1

I've got an application that uses EventBridge Pipes to read from SQS Queues, and has various targets depending on need. I recently needed to destroy and re-deploy all stacks as I start to move from development to production, and I've encountered a problem where the pipe invocation (regardless of target/enrichment) reports a failure to invoke. For example, here is a pipe log showing the error for a Queue to EventBridge pipe:

{
    "resourceArn": "arn:aws:pipes:eu-west-1:<REDACTED>:pipe/DevL8LoriotIngestMqttPipe-8184ea23",
    "timestamp": 1720777701120,
    "executionId": "549c3e65-4a09-406f-8a5f-1f7d87e44dfa",
    "messageType": "TargetInvocationFailed",
    "logLevel": "ERROR",
    "error": {
        "message": "Target invocation failed with error from Events.",
        "httpStatusCode": 400,
        "awsService": "events",
        "requestId": "e6da9182-7010-48b9-b83d-2424ca16a95f",
        "exceptionType": "BadRequest",
        "resourceArn": "arn:aws:events:eu-west-1:<REDACTED>:event-bus/DevL8State-3a0a9147"
    },
    "awsRequest": "{\"entries\":[{\"time\":null,\"source\":\"io.loriot\",\"resources\":null,\"detailType\":\"Loriot.Notification.Raw\",\"detail\":\"{\\\"notification\\\":{\\\"cmd\\\":\\\"gw\\\",\\\"seqno\\\":8458540,\\\"EUI\\\":\\\"353438397C397108\\\",\\\"ts\\\":1720777640582,\\\"fcnt\\\":370930,\\\"port\\\":9,\\\"freq\\\":868100000,\\\"toa\\\":139,\\\"dr\\\":\\\"SF7 BW125 4/5\\\",\\\"ack\\\":false,\\\"gws\\\":[{\\\"rssi\\\":-54,\\\"snr\\\":9.8,\\\"ts\\\":1720777640582,\\\"time\\\":\\\"2024-07-12T09:47:20.582Z\\\",\\\"gweui\\\":\\\"647FDAFFFE00D314\\\",\\\"ant\\\":0,\\\"lat\\\":50.73119734999999,\\\"lon\\\":-3.283016919999999}],\\\"bat\\\":255,\\\"offline\\\":false,\\\"confirmed\\\":false,\\\"devaddr\\\":\\\"31569F59\\\",\\\"data\\\":\\\"6690fbbd7511f711f607ff11f711f607ff11f711f607ff11f711f607ff11f711f607ff11f711f607ff11f711f607ff11f711f607ff11f711f607ff11f811f607ff\\\"}}\",\"eventBusName\":\"arn:aws:events:eu-west-1:907963324750:event-bus/DevL8State-3a0a9147\",\"traceHeader\":null}],\"endpointId\":null}",
    "awsResponse": "The security token included in the request is invalid. (Service: EventBridge, Status Code: 400, Request ID: e6da9182-7010-48b9-b83d-2424ca16a95f)"
}

The same problem exists with other pipes, some of which have Lambda enrichments or targets, but the root cause is the same 'The security token included in the request is invalid'.

The problem didn't exist in the stacks as I'd developed and updated them over time, and the problem also only affects SOME of the messages, but seemingly it initially affects a large number of the messages (>90%) but this resolves over time, such that after an hour or so all of the pipes are working as expected.

I've checked all the roles/policies and the correct permissions are in place from the outset, and clearly the fact the problem only affects a decreasing proportion of the pipe traffic suggests that the problem is not one of configuration.

My working theory is that the internal EB pollers that are reading from the queue are misconfigured, perhaps during initial creation the pollers get started too soon before the other resources are created, and only when the poller instance goes through a restart do things settle down. I presume that I need to delay the creation/start of the pipe until all other resources have settled.

It doesn't seem to matter if I create the pipes enmasse, or whether I deploy segments of the app (which is split into multiple stacks) one at a time. The Event Bus is in one stack, and the pipe and queue are in the same stack (but with multiple stacks containing pipes affected).

Making other changes to the pipes/queues whilst the problem persists does seem to help sort things out, and I assume this is because the pollers go through a cold start after a change to the pipe or queue. For example, before i spotted the security token log entry, I tried increasing the lambda timeout (by making a CDK change and deploying) thinking that the lambda cold start time might be the problem, and the issue immediately went away for the affected pipe, and stayed working after reverting the lambda timeout to the initial value.

1 Answer
0

Greeting

Hi Dave,
Thanks for sharing such a detailed description of your issue! I can see you've put significant effort into diagnosing this, and I appreciate your persistence in identifying potential causes. Let’s delve into your problem and see if we can clarify what's happening and how to address it. 😊


Clarifying the Issue

From your explanation, it sounds like you're encountering intermittent failures in EventBridge Pipes after redeploying your stacks, especially when transitioning from development to production. The error, "The security token included in the request is invalid," affects a large proportion of the messages initially but resolves over time. Your observation that modifying resources seems to "reset" things supports the theory of misconfigured internal pollers or a timing issue during deployment. It's intriguing that the issue gradually resolves without manual intervention, suggesting potential retries or restarts of internal processes.

You're on the right track thinking about resource dependencies and timing during deployment. Let’s walk through how you can address this systematically.


Key Terms

  • EventBridge Pipes: A service for integrating sources (e.g., SQS queues) with targets (e.g., Lambda functions) via a simple, event-driven pipeline.
  • Security Token Invalid Error: A failure indicating the request's authentication or IAM role permissions are not recognized or valid.
  • Cold Start: The initialization time required for AWS services (like Lambda or EventBridge pollers) when starting from scratch.

The Solution (Our Recipe)

Steps at a Glance:

  1. Ensure IAM roles and policies are correct and fully propagated.
  2. Introduce explicit resource dependencies to delay pipe creation.
  3. Validate EventBridge and SQS resource configurations post-deployment.
  4. Consider adding a retry mechanism or interim "warm-up" period after deployment.
  5. Test resource creation in isolation to identify potential timing issues.
  6. Investigate and refine debugging configurations for deeper error analysis.

Step-by-Step Guide:

  1. Verify IAM Roles and Policies:
    Double-check that your IAM roles include all necessary permissions for the pipe, SQS, and EventBridge targets. Permissions should include events:PutEvents for the event bus and sqs:ReceiveMessage for the queue. Ensure propagation time is accounted for in your deployment process.

    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage"
      ],
      "Resource": "*"
    }

  1. Add Resource Dependencies:
    Modify your deployment to ensure that the EventBridge event bus and SQS queues are fully initialized before the pipes are created. In AWS CDK, you can add dependencies explicitly:

    const queue = new sqs.Queue(this, 'Queue');
    const eventBus = new events.EventBus(this, 'EventBus');
    
    const pipe = new pipes.CfnPipe(this, 'Pipe', {
      source: queue.queueArn,
      target: eventBus.eventBusArn,
      roleArn: pipeRole.roleArn
    });
    
    pipe.node.addDependency(queue);
    pipe.node.addDependency(eventBus);

  1. Validate Resources Post-Deployment:
    After deploying, manually validate that the EventBridge bus, SQS queues, and associated IAM roles are configured correctly. Use the AWS CLI or SDK to confirm:

    aws events.describe-event-bus --name DevL8State-3a0a9147
    aws sqs.get-queue-attributes --queue-url <Queue URL>

  1. Add a Warm-Up Period:
    Introduce a delay after deployment to allow services like EventBridge pollers to stabilize. You can use a Lambda function or CloudFormation WaitCondition for this purpose.

    const warmUpFunction = new lambda.Function(this, 'WarmUpFunction', {
      runtime: lambda.Runtime.NODEJS_18_X,
      handler: 'index.handler',
      code: lambda.Code.fromInline(`
        exports.handler = async () => {
          console.log('Warm-up complete');
          return;
        };
      `)
    });
    
    warmUpFunction.node.addDependency(pipe);

  1. Isolate and Test Deployment Timing:
    Break down your stack deployment into smaller steps and monitor the behavior of individual resources. This helps pinpoint where the timing issue originates.

  1. Investigate and Refine Debugging Configurations:

    Step-by-Step Enhancements:

    • Token Expiry or Invalidity Causes: Temporary credentials provided by IAM roles (e.g., through AssumeRole or using temporary session tokens) might expire during long-running deployments. To mitigate:

      • Use the AWS Security Token Service (STS) to confirm the validity of tokens:
        aws sts get-caller-identity
      • If deploying through automation (e.g., CI/CD pipelines), ensure tokens are refreshed or use long-lived credentials for deployment.
      • Check the default session duration for roles. Extend it if needed using the DurationSeconds parameter when assuming a role:
        aws sts assume-role --role-arn <ROLE_ARN> --role-session-name <SESSION_NAME> --duration-seconds 3600
    • IAM Trust Relationships and Role Permissions: Ensure all roles assumed by EventBridge, SQS, and your pipelines explicitly trust the services and users involved. For instance:

      • The trust relationship for EventBridge should include:
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "events.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    • Region or Account Misconfiguration: If resources span multiple regions or accounts, ensure the roles and policies grant cross-account or cross-region access explicitly. Use Resource ARNs carefully to avoid restricting access unnecessarily.

    Enable Verbose Debugging:

    • Turn on CloudWatch detailed logs for EventBridge Pipes to gain insights into token failures:
      • Use this policy snippet to allow logs:
        {
          "Effect": "Allow",
          "Action": "logs:CreateLogStream",
          "Resource": "arn:aws:logs:<REGION>:<ACCOUNT_ID>:log-group:/aws/events/*"
        }
      • Analyze logs for specific errors such as UnrecognizedClientException or ExpiredToken.

    Test Token Lifecycles:

    • Run targeted tests to simulate token behavior under deployment stress using the following script:
    import boto3
    import time
    
    # Constants
    ROLE_ARN = "arn:aws:iam::<ACCOUNT_ID>:role/<ROLE_NAME>"  # Replace with your role ARN
    DURATION = 3600  # Session duration in seconds (1 hour)
    INTERVAL = 600   # Time between validation checks in seconds (10 minutes)
    
    def assume_role(role_arn, duration):
        """
        Assumes the specified IAM role and returns temporary credentials.
        """
        sts_client = boto3.client('sts')
        response = sts_client.assume_role(
            RoleArn=role_arn,
            RoleSessionName="TokenLifecycleTest",
            DurationSeconds=duration
        )
        return response['Credentials']
    
    def validate_token(credentials):
        """
        Uses the temporary credentials to validate the token's validity
        by calling STS GetCallerIdentity.
        """
        session = boto3.Session(
            aws_access_key_id=credentials['AccessKeyId'],
            aws_secret_access_key=credentials['SecretAccessKey'],
            aws_session_token=credentials['SessionToken']
        )
        sts_client = session.client('sts')
        try:
            # Validate the token by making an authenticated API call
            response = sts_client.get_caller_identity()
            print(f"Token is valid. Caller: {response['Arn']}")
        except Exception as e:
            print(f"Token validation failed: {e}")
    
    if __name__ == "__main__":
        print("Starting token lifecycle diagnostic...")
        # Step 1: Assume the IAM role and retrieve temporary credentials
        credentials = assume_role(ROLE_ARN, DURATION)
        print(f"Token assumed. Validating every {INTERVAL} seconds for {DURATION} seconds.")
        
        # Step 2: Validate token periodically for the duration of its validity
        for _ in range(DURATION // INTERVAL):
            validate_token(credentials)
            time.sleep(INTERVAL)  # Wait for the specified interval before next validation
    
        print("Token lifecycle diagnostic complete.")

Closing Thoughts

I hope these steps provide a clear path forward, Dave. Timing issues in AWS resource creation can be tricky, but adding explicit dependencies and validating configurations should help mitigate this issue. If the problem persists, consider reaching out to AWS Support for deeper investigation into the behavior of EventBridge Pipes.

For further reading, you might find these documentation links helpful:

Let me know how it goes or if you have more questions! Good luck with your production rollout! 🚀✨


Cheers,

Aaron 😊

answered a year ago
  • Thanks for the response. After quite a protracted business level support request, AWS confirmed that this behaviour was due to caching of security tokens on their side. In effect, when a pipe was created the tokens were cached based on the resource name alone, so a if a destroy/deployment cycle used the same name, the outdated security tokens would be used initially, causing the issues. They implemented a fix which I confirmed to be working.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.