AWS neptune db documentation example lambda function raise an error

0

Hello, I referred the Neptune DB documentation and followed the steps in here carefully. I set all relevant env variables as well. But when I test the lambda function it raises an error.

{
  "errorMessage": "'GraphTraversal' object is not callable",
  "errorType": "TypeError",
  "requestId": "23f890ec-1ba6-4599-829e-ad7eb38e058d",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 111, in lambda_handler\n    return doQuery(event)\n",
    "  File \"/var/task/lambda_function.py\", line 108, in doQuery\n    return query(id=str(9))\n",
    "  File \"/var/task/backoff/_sync.py\", line 105, in retry\n    ret = target(*args, **kwargs)\n",
    "  File \"/var/task/lambda_function.py\", line 99, in query\n    return (g.V(id)\n"
  ]
}
LOGS	Name: cloudwatch_lambda_agent	State: Subscribed	Types: [Platform]
Creating remote connection
EXTENSION	Name: cloudwatch_lambda_agent	State: Ready	Events: [INVOKE,SHUTDOWN]
START RequestId: 23f890ec-1ba6-4599-829e-ad7eb38e058d Version: $LATEST
<class 'gremlin_python.driver.driver_remote_connection.DriverRemoteConnection'>
[ERROR] TypeError: 'GraphTraversal' object is not callable
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 111, in lambda_handler
    return doQuery(event)
  File "/var/task/lambda_function.py", line 108, in doQuery
    return query(id=str(9))
  File "/var/task/backoff/_sync.py", line 105, in retry
    ret = target(*args, **kwargs)
  File "/var/task/lambda_function.py", line 99, in query
    return (g.V(id)END RequestId: 23f890ec-1ba6-4599-829e-ad7eb38e058d
REPORT RequestId: 23f890ec-1ba6-4599-829e-ad7eb38e058d	Duration: 86.36 ms	Billed Duration: 87 ms	Memory Size: 128 MB	Max Memory Used: 92 MB	Init Duration: 1098.93 ms
  • Hello! As discussed in this StackOverflow post it is not Immediately obvious what is causing the error you are seeing. We tested both the code you posted and the code from the documentation and both worked fine. I think it is unlikely to matter but for completeness, which version of the Gremlin Python client are you using? We will continue to investigate but it's not clear yet why you are seeing the error and the exact same code is working in our tests.

  • One thing we did spot is that the example in the documentation uses the older 3.4.1 Gremlin Python client (which is Tornado based). It would need to be modified to work with the newer clients that are AIOHttp based (to use a library like requests for the HTTP operations rather than the Tornado equivalent. However that should not cause the error you are seeing. Also to clarify, in my comment above when I wrote g() I should have used something like g.V('a').out() as the example as the former is a GraphTraversalSource and the latter a GraphTraversal. We have. todo to update the documentation but I don't think that would cause this error (Python would complain sooner).

  • @AWS-KRL gremlin python version is 3.6.2

asked a year ago416 views
1 Answer
0

As @AWS-KRL pointed out, the example in the docs uses 3.4.1, which uses Tornado for websocket connections. The example below uses Gremlin Python 3.5.2, which replaces Tornado with AIOHTTP. There have been some changes to the connection error handling in the example to take account of this change. We'll be updating the docs soon.

import os, sys, backoff, math
from random import randint
from gremlin_python import statics
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.driver.protocol import GremlinServerError
from gremlin_python.driver import serializer
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.process.traversal import T
from aiohttp.client_exceptions import ClientError
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import ReadOnlyCredentials
from types import SimpleNamespace

import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)


reconnectable_err_msgs = [ 
    'ReadOnlyViolationException',
    'Server disconnected',
    'Connection refused',
    'Connection was already closed'
]

retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs

network_errors = [OSError]

retriable_errors = [GremlinServerError, RuntimeError] + network_errors      

def prepare_iamdb_request(database_url):
        
    service = 'neptune-db'
    method = 'GET'

    access_key = os.environ['AWS_ACCESS_KEY_ID']
    secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
    region = os.environ['AWS_REGION']
    session_token = os.environ['AWS_SESSION_TOKEN']
    
    creds = SimpleNamespace(
        access_key=access_key, secret_key=secret_key, token=session_token, region=region,
    )

    request = AWSRequest(method=method, url=database_url, data=None)
    SigV4Auth(creds, service, region).add_auth(request)
    
    return (database_url, request.headers.items())
        
def is_retriable_error(e):

    is_retriable = False
    err_msg = str(e)
    
    if isinstance(e, tuple(network_errors)):
        is_retriable = True
    else:
        is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs)
    
    logger.error('error: [{}] {}'.format(type(e), err_msg))
    logger.info('is_retriable: {}'.format(is_retriable))
    
    return is_retriable

def is_non_retriable_error(e):      
    return not is_retriable_error(e)
        
def reset_connection_if_connection_issue(params):
    
    is_reconnectable = False

    e = sys.exc_info()[1]
    err_msg = str(e)
    
    if isinstance(e, tuple(network_errors)):
        is_reconnectable = True
    else:
        is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs)
        
    logger.info('is_reconnectable: {}'.format(is_reconnectable))
        
    if is_reconnectable:
        global conn
        global g
        conn.close()
        conn = create_remote_connection()
        g = create_graph_traversal_source(conn)
     
@backoff.on_exception(backoff.constant,
    tuple(retriable_errors),
    max_tries=5,
    jitter=None,
    giveup=is_non_retriable_error,
    on_backoff=reset_connection_if_connection_issue,
    interval=1)
def query(**kwargs):
    
    id = kwargs['id']
    
    return (g.V(id)
        .fold()
        .coalesce(
            __.unfold(), 
            __.addV('User').property(T.id, id)
        )
        .id().next())
        
def doQuery(event):
    return query(id=str(randint(0, 10000)))

def lambda_handler(event, context):
    result = doQuery(event)
    logger.info('result – {}'.format(result))
    return result
    
def create_graph_traversal_source(conn):
    return traversal().withRemote(conn)
    
def create_remote_connection():
    logger.info('Creating remote connection')
    
    (database_url, headers) = connection_info()
    
    return DriverRemoteConnection(
        database_url,
        'g',
        pool_size=1,
        message_serializer=serializer.GraphSONSerializersV2d0(),
        headers=headers)
    
def connection_info():
    
    database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort'])
    
    if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true':
        return prepare_iamdb_request(database_url)
    else:
        return (database_url, {})
    
conn = create_remote_connection()
g = create_graph_traversal_source(conn)
AWS-ISR
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions