I am trying to take all the tables from pdf with aws Textract and convert to CSV file each of them but when I try it doesn't recognize some of them. I tried the demo version of Textract and it is working fine so I think I have some mistake in my code and I couldn't find it.
this is lambda function:
def lambda_handler(event, context):
s3_client = boto3.client("s3")
textract = boto3.client("textract")
if event:
file_obj = event["Records"][0]
bucket_name = str(file_obj["s3"]["bucket"]["name"])
s3_key = unquote_plus(str(file_obj["s3"]["object"]["key"]))
file_name = s3_key.split('/')[-1].split('.')[0] # Extracting file name without extension
unique_folder = f"{file_name}_{uuid.uuid4().hex}"
output_prefix = f"analyze-output/{unique_folder}"
try:
response = textract.start_document_analysis(
DocumentLocation={'S3Object': {'Bucket': bucket_name, 'Name': s3_key}},
FeatureTypes=["TABLES"]
)
job_id = response['JobId']
logging.info(f"Started Textract job with Job ID: {job_id}")
status = ""
while True:
response = textract.get_document_analysis(JobId=job_id)
status = response['JobStatus']
if status in ["SUCCEEDED", "FAILED"]:
break
time.sleep(5)
if status == "SUCCEEDED":
finished = False
nextToken = None
while not finished:
response = textract.get_document_analysis(JobId=job_id, NextToken=nextToken) if nextToken else textract.get_document_analysis(JobId=job_id)
logging.info(f"Processing response batch for file: {output_prefix}")
process_textract_response(s3_client, response, output_prefix, bucket_name)
nextToken = response.get('NextToken')
finished = nextToken is None
else:
logging.error("Textract job failed")
return {
"statusCode": 500,
"body": json.dumps("Textract job failed"),
}
except Exception as e:
logging.error(f"Error processing file: {e}")
return {
"statusCode": 500,
"body": json.dumps("Error in processing PDF"),
}
return {"statusCode": 200, "body": json.dumps("Hello")}
if __name__ == "__main__":
lambda_handler(None, None)
this is helper function:
def get_rows_columns_map(table_result, blocks_map):
rows = {}
for relationship in table_result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
cell = blocks_map[child_id]
if cell['BlockType'] == 'CELL':
row_index = cell['RowIndex']
col_index = cell['ColumnIndex']
if row_index not in rows:
rows[row_index] = {}
rows[row_index][col_index] = get_text(cell, blocks_map)
return rows
def get_text(result, blocks_map):
text = ''
if 'Relationships' in result:
for relationship in result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
word = blocks_map[child_id]
if word['BlockType'] == 'WORD':
text += word['Text'] + ' '
elif word['BlockType'] == 'SELECTION_ELEMENT':
if word['SelectionStatus'] == 'SELECTED':
text += 'X '
return text.strip()
def upload_to_s3(s3_client, file_path, bucket_name, s3_key):
try:
s3_client.upload_file(file_path, bucket_name, s3_key)
logging.info(f"File {file_path} uploaded to {bucket_name}/{s3_key}")
except Exception as e:
logging.error(f"Error uploading file to S3: {e}")
def process_textract_response(s3_client, response, file_name, bucket_name):
blocks_map = {}
page_tables = {}
logging.info(f"Starting to process response for file: {file_name}")
for block in response['Blocks']:
blocks_map[block['Id']] = block
if block['BlockType'] == "TABLE":
page_number = block.get('Page', 1)
if page_number not in page_tables:
page_tables[page_number] = []
page_tables[page_number].append(block)
if not page_tables:
logging.info("No tables found")
return
output_dir = f"/tmp/{file_name}"
os.makedirs(output_dir, exist_ok=True)
for page_number, tables in page_tables.items():
for index, table in enumerate(tables):
try:
csv_content = generate_table_csv(table, blocks_map)
output_file = f"{output_dir}/table_page_{page_number}_index_{index}.csv"
with open(output_file, "w") as f:
f.write(csv_content)
logging.info(f"Table {index} on Page {page_number} saved to {output_file}")
s3_key = f"{file_name}/table_page_{page_number}_index_{index}.csv"
upload_to_s3(s3_client, output_file, bucket_name, s3_key)
except Exception as e:
logging.error(f"Error processing table on page {page_number}, index {index}: {e}")
logging.info(f"Completed processing for file: {file_name}")
def generate_table_csv(table_result, blocks_map):
rows = get_rows_columns_map(table_result, blocks_map)
csv = ''
for row_index, cols in sorted(rows.items()):
for col_index, text in sorted(cols.items()):
csv += f'"{text}",'
csv = csv[:-1] + '\n'
return csv
the error I am getting: