import json
import re
import smtplib
import dns.resolver
from disposable_email_domains import blocklist
from concurrent.futures import ThreadPoolExecutor
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
# Address used for SMTP MAIL FROM command
fromAddress = 'corn@bt.com'
# Comprehensive Regex for syntax checking
regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,4}$'
# List of known disposable email domains
disposable_domains = blocklist
# Function to check if the domain is disposable
def is_disposable_domain(domain):
return domain in disposable_domains
def verify_email(email):
# Syntax check
match = re.match(regex, email)
if match is None:
return 'Invalid Syntax'
# Get domain for DNS lookup
splitAddress = email.split('@')
domain = splitAddress[1]
# Check if the domain is disposable
if is_disposable_domain(domain):
return 'Disposable Domain'
try:
# MX record lookup with a timeout
resolver = dns.resolver.Resolver()
resolver.timeout = 5 # Set the DNS query timeout to 5 seconds
resolver.lifetime = 5
records = resolver.query(domain, 'MX')
mx_records = [str(record.exchange) for record in records]
for mxRecord in mx_records:
server = smtplib.SMTP()
server.set_debuglevel(0)
# SMTP Conversation
server.connect(mxRecord)
server.helo(server.local_hostname)
server.mail(fromAddress)
code, message = server.rcpt(email)
server.quit()
# Assume SMTP response 250 is success
if code in (250, 251, 252):
return 'Valid'
return 'Invalid'
except (dns.resolver.NoNameservers, dns.resolver.NoAnswer):
# Handle the case where no nameservers are found (MX records cannot be resolved)
return 'Invalid (DNS Error)'
except smtplib.SMTPConnectError:
return 'Invalid (SMTP Connect Error)'
except smtplib.SMTPServerDisconnected:
return 'Invalid (SMTP Server Disconnected)'
except smtplib.SMTPResponseException as e:
if e.smtp_code == 550:
return 'Invalid (SMTP Mailbox Unavailable)'
return f'Error: {str(e)}'
except Exception as e:
return f'Error: {str(e)}'
def lambda_handler(event, context):
try:
emails = [
"admin@atticusaccountants.com.au",
"anishs@accountingsns.com.au",
"bchungong@deloitte.com",
"bill.cole@bdo.com.au",
"bmurphy@deloitte.com",
"brendan@daleysfruit.com.au"]
# Parse input data
# if 'body' in event:
# event_dict = json.loads(event['body'])
# emails = event_dict.get('emails', [])
# else:
# emails = event.get('queryStringParameters', {}).get('emails', [])
# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(verify_email, emails))
data = {
"Email_Status": results
}
return {
'statusCode': 200,
'body': json.dumps(data, cls=SetEncoder)
}
except Exception as e:
error_message = f'Error: {str(e)}'
return {
'statusCode': 500,
'body': json.dumps({'error': error_message})
}
i have writtem this lambda function but it is giving this error: {
"statusCode": 200,
"body": "{"Email_Status": ["Error: [Errno 110] Connection timed out", "Error: [Errno 97] Address family not supported by protocol", "Error: [Errno 110] Connection timed out", "Error: [Errno 110] Connection timed out", "Error: [Errno 110] Connection timed out", "Error: [Errno 97] Address family not supported by protocol"]}"
}