Best Practice: looking to structure my aws lambda function with pandas manipulations

0

Im new to AWS Lambda so please take it easy on me :)

Im getting a lot of errors in my code and Im not sure the best way to troubleshoot other than look at the cloudwatch console and adjust things as necessary. If anyone has any tips for troubleshooting Id appreciate it!

Heres my plan for what I want to do and please let me know if this makes sense:

upload file to s3 bucket -> 2. upload triggers a lambda to run (inside this lambda is a python script that modifies the data source. The source data is a messy file) -> 3. store the output to the same s3 bucket in a separate folder - > 4. (future state) perform analysis on the new json file. I have my s3 bucket created and I have setup the lambda to trigger when a new file is added. That part is working! I have added my python script (which works on my local drive) portion to the lambda function w/in the code section of lambda.

The errors am getting errors consist of saying that my 6 global variables (df_a1-df_aq) are not defined. If I move them out of the function then it works, however when I get to the merge portion of my code I am getting an error saying that says "cannot merge a series without a name" I gave them a name using the name= object and Im still getting this issue.

    import json
    import boto3
    import pandas as pd
    import time
    import io
    print("All Modules are ok ...")

except Exception as e:
    print("Error in Imports ")

s3_client = boto3.client('s3')

#df_a1 = pd.Series(dtype='object', name='test1')
#df_g1 = pd.Series(dtype='object', name='test2')
#df_j1 = pd.Series(dtype='object', name='test3')
#df_p1 = pd.Series(dtype='object', name='test4')
#df_r1 = pd.Series(dtype='object', name='test5')
#df_q1 = pd.Series(dtype='object', name='test6')


    def Add_A1 (xyz, RC, string):    
    #DATA TO GRAB FROM STRING
    global df_a1
    IMG = boolCodeReturn(string[68:69].strip())
    roa =  string[71:73].strip()
    #xyzName = string[71:73].strip()
    
    #ADD RECORD TO DATAFRAME
    series = pd.Series (data=[xyz, IMG, roa], index=['XYZ', 'IMG', 'Roa'])
    df_a1 = df_a1.append(series, ignore_index=True)

def Add_G1 (xyz, RC, string):
    
    global df_g1
    
    #DATA TO GRAB FROM STRING
    gcode = string[16:30].strip()
    ggname = string[35:95].strip()
    
    #ADD RECORD TO DATAFRAME
    series = pd.Series (data=[xyz, gcode, ggname], index=['XYZ', 'Gcode', 'Ggname'])
    df_g1 = df_g1.append(series, ignore_index=True)
       
def Add_J1 (xyz, RC, string):
    
    #DATA TO GRAB FROM STRING
    global df_j1
    xyzName = string[56:81].strip()
    
    #ADD RECORD TO DATAFRAME
    series = pd.Series (data=[xyz, xyzName], index=['XYZ', 'XYZName'])
    df_j1 = df_j1.append(series, ignore_index=True)
    
def Add_P01 (xyz, RC, string):
    global df_p1
    
    #DATA TO GRAB FROM STRING
    giname = string[50:90].strip()
        
    #ADD RECORD TO DATAFRAME
    series = pd.Series (data=[xyz, giname], index=['XYZ', 'Giname'])
    df_p1 = df_p1.append(series, ignore_index=True)

    
def Add_R01 (xyz, RC, string):
    global df_r1
    
    #DATA TO GRAB FROM STRING
    Awperr = boolCodeReturn(string[16:17].strip())
    #PPP= string[17:27].lstrip("0")
    AUPO = int(string[27:40].lstrip("0"))
    AUPO = AUPO / 100000
    AupoED = string[40:48]
          
    #ADD RECORD TO DATAFRAME
    series = pd.Series (data=[xyz, AUPO, Awperr, AupoED], index = ['XYZ', 'AUPO', 'Awperr', 'AupoED'])
    df_r1 = df_r1.append(series, ignore_index=True)

def Add_Q01 (xyz, RC, string):
    global df_q1

    #DATA TO GRAB FROM STRING
    #PPP= string[17:27].lstrip("0")
    UPPWA = int(string[27:40].lstrip("0"))
    UPPWA = UPPWA / 100000
    EDWAPPP = string[40:48]
            
    #ADD RECORD TO DATAFRAME
    series = pd.Series (data=[xyz, UPPWA, EDWAPPP], index = ['XYZ', 'UPPWA', 'EDWAPPPer'])
    df_q1 = df_q1.append(series, ignore_index=True)
  
  

def boolCodeReturn (code):
   
    if code == "X":
        return 1
    else:
        return 0

def errorHandler(xyz, RC, string): 

    pass


def lambda_handler(event, context):
    print(event)
    #Get Bucket Name
    bucket = event['Records'][0]['s3']['bucket']['name']

    #get the file/key name
    key = event['Records'][0]['s3']['object']['key']
    response = s3_client.get_object(Bucket=bucket, Key=key)
    print("Got Bucket! - pass")
    print("Got Name! - pass ")
    
    data = response['Body'].read().decode('utf-8')
    print('reading data')
    buf = io.StringIO(data)
    print(buf.readline())
    #data is the file uploaded
    fileRow = buf.readline()

    
    print('reading_row')
    while fileRow: 
            
        currentString = fileRow
        xyz = currentString[0:11].strip() 
        RC = currentString[12:15].strip() #this grabs the code the indicates what the data type is
    
        #controls which function to run based on the code
        switcher = {
        "A1": Add_A1,
        "J1": Add_J1,
        "G1": Add_G1,
        "P01": Add_P01,
        "R01": Add_R01,
        "Q01": Add_Q01        
        }
        runfunc = switcher.get(RC, errorHandler)
        runfunc (xyz, RC, currentString)
        fileRow = buf.readline()
        print(type(df_a1), "A1 FILE")
        print(type(df_g1), 'G1 FILE')
    buf.close()
    
    ##########STEP 3: JOIN THE DATA TOGETHER##########

        df_merge = pd.merge(df_a1, df_g1, how="left", on="XYZ")
        df_merge = pd.merge(df_merge, df_j1, how="left", on="XYZ")
        df_merge = pd.merge(df_merge, df_p1, how="left", on="XYZ")
        df_merge = pd.merge(df_merge, df_q1, how="left", on="XYZ")
        df_merge = pd.merge(df_merge, df_r1, how="left", on="XYZ")

    
    ##########STEP 4: SAVE THE DATASET TO A JSON FILE##########
    filename = 'Export-Records.json'
    json_buffer = io.StringIO()
    
    df_merge.to_json(json_buffer)
    
    s3_client.put_object(Buket='file-etl',Key=filename, Body=json_buffer.getvalue())
    
    t = time.localtime()
    current_time = time.strftime("%H:%M:%S", t)
    print("Finished processing at " + current_time)
        
        
    response = {
        "statusCode": 200,
        'body': json.dumps("Code worked!")
    }
    
    return response
`
1 Answer
0

Hello,

Thank you for reaching out via the Forum.

I understand that you are seeing an error in the code that you are importing to the Lambda function and you are seeing the error. As Code support is beyond scope of AWS Support. I may not be able to share the detail about troubleshooting the code. However as a General guidance, Kindly confirm if you are using the same version of the libraries in both the Lambda function as well as the local system.

From the closer inspection of the code, I could see that the following lines have been commented out in the beginning of the code, which may be leading to the error that you are seeing. Kindly uncomment the lines and try using the variables in the code to confirm if you are seeing the correct output.

#df_g1 = pd.Series(dtype='object', name='test2')
#df_j1 = pd.Series(dtype='object', name='test3')
#df_p1 = pd.Series(dtype='object', name='test4')
#df_r1 = pd.Series(dtype='object', name='test5')
#df_q1 = pd.Series(dtype='object', name='test6')

In addition to that, you can also test the function execution using the SAM CLI [1] to test how the Lambda function will behave when you transform the code for the Lambda environment.

Hope this helps.

[1] Invoking Lambda functions locally - https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-using-invoke.html

AWS
SUPPORT ENGINEER
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions