Im new to AWS Lambda so please take it easy on me :)
Im getting a lot of errors in my code and Im not sure the best way to troubleshoot other than look at the cloudwatch console and adjust things as necessary. If anyone has any tips for troubleshooting Id appreciate it!
Heres my plan for what I want to do and please let me know if this makes sense:
upload file to s3 bucket -> 2. upload triggers a lambda to run (inside this lambda is a python script that modifies the data source. The source data is a messy file) -> 3. store the output to the same s3 bucket in a separate folder - > 4. (future state) perform analysis on the new json file.
I have my s3 bucket created and I have setup the lambda to trigger when a new file is added. That part is working! I have added my python script (which works on my local drive) portion to the lambda function w/in the code section of lambda.
The errors am getting errors consist of saying that my 6 global variables (df_a1-df_aq) are not defined. If I move them out of the function then it works, however when I get to the merge portion of my code I am getting an error saying that says "cannot merge a series without a name" I gave them a name using the name= object and Im still getting this issue.
import json
import boto3
import pandas as pd
import time
import io
print("All Modules are ok ...")
except Exception as e:
print("Error in Imports ")
s3_client = boto3.client('s3')
#df_a1 = pd.Series(dtype='object', name='test1')
#df_g1 = pd.Series(dtype='object', name='test2')
#df_j1 = pd.Series(dtype='object', name='test3')
#df_p1 = pd.Series(dtype='object', name='test4')
#df_r1 = pd.Series(dtype='object', name='test5')
#df_q1 = pd.Series(dtype='object', name='test6')
def Add_A1 (xyz, RC, string):
#DATA TO GRAB FROM STRING
global df_a1
IMG = boolCodeReturn(string[68:69].strip())
roa = string[71:73].strip()
#xyzName = string[71:73].strip()
#ADD RECORD TO DATAFRAME
series = pd.Series (data=[xyz, IMG, roa], index=['XYZ', 'IMG', 'Roa'])
df_a1 = df_a1.append(series, ignore_index=True)
def Add_G1 (xyz, RC, string):
global df_g1
#DATA TO GRAB FROM STRING
gcode = string[16:30].strip()
ggname = string[35:95].strip()
#ADD RECORD TO DATAFRAME
series = pd.Series (data=[xyz, gcode, ggname], index=['XYZ', 'Gcode', 'Ggname'])
df_g1 = df_g1.append(series, ignore_index=True)
def Add_J1 (xyz, RC, string):
#DATA TO GRAB FROM STRING
global df_j1
xyzName = string[56:81].strip()
#ADD RECORD TO DATAFRAME
series = pd.Series (data=[xyz, xyzName], index=['XYZ', 'XYZName'])
df_j1 = df_j1.append(series, ignore_index=True)
def Add_P01 (xyz, RC, string):
global df_p1
#DATA TO GRAB FROM STRING
giname = string[50:90].strip()
#ADD RECORD TO DATAFRAME
series = pd.Series (data=[xyz, giname], index=['XYZ', 'Giname'])
df_p1 = df_p1.append(series, ignore_index=True)
def Add_R01 (xyz, RC, string):
global df_r1
#DATA TO GRAB FROM STRING
Awperr = boolCodeReturn(string[16:17].strip())
#PPP= string[17:27].lstrip("0")
AUPO = int(string[27:40].lstrip("0"))
AUPO = AUPO / 100000
AupoED = string[40:48]
#ADD RECORD TO DATAFRAME
series = pd.Series (data=[xyz, AUPO, Awperr, AupoED], index = ['XYZ', 'AUPO', 'Awperr', 'AupoED'])
df_r1 = df_r1.append(series, ignore_index=True)
def Add_Q01 (xyz, RC, string):
global df_q1
#DATA TO GRAB FROM STRING
#PPP= string[17:27].lstrip("0")
UPPWA = int(string[27:40].lstrip("0"))
UPPWA = UPPWA / 100000
EDWAPPP = string[40:48]
#ADD RECORD TO DATAFRAME
series = pd.Series (data=[xyz, UPPWA, EDWAPPP], index = ['XYZ', 'UPPWA', 'EDWAPPPer'])
df_q1 = df_q1.append(series, ignore_index=True)
def boolCodeReturn (code):
if code == "X":
return 1
else:
return 0
def errorHandler(xyz, RC, string):
pass
def lambda_handler(event, context):
print(event)
#Get Bucket Name
bucket = event['Records'][0]['s3']['bucket']['name']
#get the file/key name
key = event['Records'][0]['s3']['object']['key']
response = s3_client.get_object(Bucket=bucket, Key=key)
print("Got Bucket! - pass")
print("Got Name! - pass ")
data = response['Body'].read().decode('utf-8')
print('reading data')
buf = io.StringIO(data)
print(buf.readline())
#data is the file uploaded
fileRow = buf.readline()
print('reading_row')
while fileRow:
currentString = fileRow
xyz = currentString[0:11].strip()
RC = currentString[12:15].strip() #this grabs the code the indicates what the data type is
#controls which function to run based on the code
switcher = {
"A1": Add_A1,
"J1": Add_J1,
"G1": Add_G1,
"P01": Add_P01,
"R01": Add_R01,
"Q01": Add_Q01
}
runfunc = switcher.get(RC, errorHandler)
runfunc (xyz, RC, currentString)
fileRow = buf.readline()
print(type(df_a1), "A1 FILE")
print(type(df_g1), 'G1 FILE')
buf.close()
##########STEP 3: JOIN THE DATA TOGETHER##########
df_merge = pd.merge(df_a1, df_g1, how="left", on="XYZ")
df_merge = pd.merge(df_merge, df_j1, how="left", on="XYZ")
df_merge = pd.merge(df_merge, df_p1, how="left", on="XYZ")
df_merge = pd.merge(df_merge, df_q1, how="left", on="XYZ")
df_merge = pd.merge(df_merge, df_r1, how="left", on="XYZ")
##########STEP 4: SAVE THE DATASET TO A JSON FILE##########
filename = 'Export-Records.json'
json_buffer = io.StringIO()
df_merge.to_json(json_buffer)
s3_client.put_object(Buket='file-etl',Key=filename, Body=json_buffer.getvalue())
t = time.localtime()
current_time = time.strftime("%H:%M:%S", t)
print("Finished processing at " + current_time)
response = {
"statusCode": 200,
'body': json.dumps("Code worked!")
}
return response
`