How do I use Java SDK to invoke an Bedrock Agent?

0

Hello,

I have created several bedrock agents, and using the console I'm able to chat with them to perform the actions I want. Now I want to invoke them from my Restful application.

I'm using the 2.29.6, and my pom.xml (maven) is as follows:

                <dependency>
			<groupId>software.amazon.awssdk</groupId>
			<artifactId>bedrockagentruntime</artifactId>
			<version>2.29.6</version>
		</dependency>
		<dependency>
			<groupId>software.amazon.awssdk</groupId>
			<artifactId>bedrock</artifactId>
			<version>2.29.6</version>
		</dependency>
		<dependency>
			<groupId>software.amazon.awssdk</groupId>
			<artifactId>bedrockruntime</artifactId>
			<version>2.29.6</version>
		</dependency>

I see in the docs that only the async client can be used to invoke agents, so I'm using a CompletableFuture<Void> type, then trying to call the method "invokeAgen" method. Below is a code example I'm trying but I can't seem to make this work and looking for guidance.

The two scenarios I'm running into is either the CompletableFuture blocks indefinitley when using the "get", or it fails with a connection pool

An Error Occurred While Invoking Agent. software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate.
Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate.
Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout.
If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests.

Code Example is as follows:


import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.bedrockagentruntime.BedrockAgentRuntimeAsyncClient;
import software.amazon.awssdk.services.bedrockagentruntime.BedrockAgentRuntimeClient;
import software.amazon.awssdk.services.bedrockagentruntime.model.*;
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeClient;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;


@Log4j2
@Service
public class AWSBedRockAgentService {

    BedrockAgentRuntimeAsyncClient client;

    private AWSBedRockAgentService()
    {
        client = BedrockAgentRuntimeAsyncClient.builder()
                .region(Region.US_EAST_1) // Replace with your desired region
                .credentialsProvider(ProfileCredentialsProvider.create())
                .build();
    }

    public ChatResponse DoSomething(String prompt, String authToken)
    {
        ChatResponse chatResponse = new ChatResponse();

        // Prepare the request
        String agentId = "<myAgentId>";
        String agentAliasId = "<myAlias>";
        String sessionId = "sameSessionId"; 
        String inputText = "What customers do I have access to?";

        chatResponse.originalPrompt = inputText;
        chatResponse.totalTokensUsed = 9999;
        chatResponse.responses = new ArrayList<String>();

        var map = new HashMap<String, String>();
        map.put("Authorization", authToken);

        SessionState ss = SessionState.builder()
                .sessionAttributes(map)
                .build();

        InvokeAgentRequest request = InvokeAgentRequest.builder()
                .agentId(agentId)
                .agentAliasId(agentAliasId)
                .sessionId(sessionId)
                .inputText(inputText)
                .enableTrace(true)
                .sessionState(ss)
                //.endSession(true) //will need to determine when to do this and when not to.

                .build();

        try {

            InvokeAgentResponse[] resp = {null};

            InvokeAgentResponseHandler handler = new InvokeAgentResponseHandler() {
                @Override
                public void responseReceived(InvokeAgentResponse invokeAgentResponse) {
                    log.info("Response Received from Agent: {}", invokeAgentResponse);

                    resp[0] = invokeAgentResponse;

                    //this.complete();
                }

                @Override
                public void onEventStream(SdkPublisher<ResponseStream> sdkPublisher) {
                    log.info("Agent - On Event Stream: {}", sdkPublisher);
                }

                @Override
                public void exceptionOccurred(Throwable throwable) {
                    log.warn("An Error Occurred While Invoking Agent. {}", throwable.getMessage());

                    //this.complete();
                }

                @Override
                public void complete() {
                    log.info("Agent Processing Completed.");
                }
            };
            // Invoke the agent
            CompletableFuture<Void> agentInvocation = client.invokeAgent(request, handler);

            //Wait for it to complete
            agentInvocation.completeOnTimeout(null, 30000, TimeUnit.SECONDS);

            if(resp.length > 0)
            {
                var agentResponse = resp[0];
                log.info("*************Agent Response************");
                log.info(agentResponse);
            }

            chatResponse.responses.add("No Exception Happened");

        } catch (Exception e) {
            log.error("Error invoking Bedrock agent: {}", e.getMessage());

            chatResponse.responses.add(e.getMessage());

        } finally {
            // Close the client
            client.close();
        }

        return chatResponse;
    }
}


Any thoughts, directions, examples appreciated!

3 Answers
0

To invoke a Bedrock Agent using the Java SDK, you can follow these steps:

  1. Set up the BedrockAgentRuntimeAsyncClient:

    BedrockAgentRuntimeAsyncClient client = BedrockAgentRuntimeAsyncClient.builder()
        .region(Region.US_EAST_1)
        .credentialsProvider(DefaultCredentialsProvider.create())
        .build();
  2. Create the InvokeAgentRequest:

    InvokeAgentRequest request = InvokeAgentRequest.builder()
        .agentId(agentId)
        .agentAliasId(agentAliasId)
        .sessionId(sessionId)
        .inputText(inputText)
        .enableTrace(true)
        .sessionState(SessionState.builder()
            .sessionAttributes(Map.of("Authorization", authToken))
            .build())
        .build();
  3. Implement the InvokeAgentResponseHandler:

    InvokeAgentResponseHandler handler = InvokeAgentResponseHandler.builder()
        .onResponse(response -> {
            log.info("Response Received from Agent: {}", response);
            // Process the response here
        })
        .onStream(publisher -> {
            publisher.subscribe(event -> {
                if (event instanceof CompletionEvent) {
                    CompletionEvent completion = (CompletionEvent) event;
                    log.info("Completion: {}", completion.completion());
                    chatResponse.responses.add(completion.completion());
                }
            });
        })
        .onError(error -> {
            log.error("Error occurred: ", error);
        })
        .build();
  4. Invoke the agent and handle the response:

    CompletableFuture<Void> future = client.invokeAgent(request, handler);
    
    try {
        future.get(30, TimeUnit.SECONDS);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        log.error("Error invoking Bedrock agent: ", e);
    }
  5. Don't forget to close the client when you're done:

    client.close();

To address the connection pool issue, you can configure the HTTP client used by the SDK:

BedrockAgentRuntimeAsyncClient client = BedrockAgentRuntimeAsyncClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(DefaultCredentialsProvider.create())
    .httpClientBuilder(
        NettyNioAsyncHttpClient.builder()
            .maxConcurrency(100)
            .connectionTimeout(Duration.ofSeconds(5))
    )
    .build();

This configuration increases the max connections and sets a connection timeout. Adjust these values based on your needs.

Remember to handle the response stream properly, as Bedrock agents may return partial results as they become available. The onStream method in the response handler allows you to process these partial results as they arrive.

Also, make sure you're using the latest version of the AWS SDK for Java, as improvements and bug fixes are regularly released.

Sources
Community | A Java developer's guide to Bedrock's new Converse API
Solving 'Read Timed Out' Error and High Latencies in Amazon Bedrock with AWS Java SDK Client | AWS re:Post
Amazon Bedrock Runtime examples using SDK for Java 2.x - AWS SDK Code Examples
Amazon Bedrock Runtime examples using SDK for Java 2.x - AWS SDK for Java 2.x

profile picture
answered 2 months ago
0

I was able to finally figure out what I was looking for. If anyone else stumbles across this maybe this will help. Essentially the way I understand it is, that you have to be able to handle the different types of events that come back from the agent. Typically (not always) the responses are chunked back as text. Hopefully this helps someone else.

InvokeAgentResponseHandler handler = InvokeAgentResponseHandler.builder()
                    .onResponse(response -> {
                        log.info("Response Received from Agent: {}", response);
                        // Process the response here
                    })
                    .onEventStream(publisher -> {
                        publisher.subscribe(event -> {
                            log.info("On Event Stream: {}", event);
                            log.info("On Event Stream SDK type: {}", event.sdkEventType());
                            log.info("On Event Stream Class: {}", event.getClass());
                            log.info("On Event Stream sdk Fields: {}", event.sdkFields());

                            event.accept(new InvokeAgentResponseHandler.Visitor() {
                                @Override
                                public void visitDefault(ResponseStream event) {
                                    InvokeAgentResponseHandler.Visitor.super.visitDefault(event);
                                    log.info("[visitDefault] - {}", event.toString());
                                }

                                @Override
                                public void visitChunk(PayloadPart event) {
                                    InvokeAgentResponseHandler.Visitor.super.visitChunk(event);
                                    log.info("[visitChunk] - {}", event.toString());
                                    String payloadAsString = event.bytes().asUtf8String();
                                    log.info("Chunked Data = {}", payloadAsString);
                                    chatResponse.responses.add(payloadAsString);
                                }

                                @Override
                                public void visitFiles(FilePart event) {
                                    InvokeAgentResponseHandler.Visitor.super.visitFiles(event);
                                    log.info("[visitFiles] - {}", event.toString());
                                }

                                @Override
                                public void visitReturnControl(ReturnControlPayload event) {
                                    InvokeAgentResponseHandler.Visitor.super.visitReturnControl(event);
                                    log.info("[visitReturnControl] - {}", event.toString());
                                }

                                @Override
                                public void visitTrace(TracePart event) {
                                    InvokeAgentResponseHandler.Visitor.super.visitTrace(event);
                                    log.info("[visitTrace] - {}", event.toString());
                                }
                            });
                        });
                    })
                    .onError(error -> {
                        log.error("Error occurred: ", error);
                    })
                    .build();
            // Invoke the agent
            CompletableFuture<Void> agentInvocation = client.invokeAgent(request, handler);

            //Wait for it to complete
            agentInvocation.get(agentTimeoutSeconds, TimeUnit.SECONDS);
answered 2 months ago
0

You can find more Java samples https://github.com/MichaelShapira/bedrock-java-samples/tree/main - including Agents example

AWS
answered 2 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions