By using AWS re:Post, you agree to the AWS re:Post Terms of Use

How to disable Amazon Redshift user after X failed login attempts

8 minute read
Content level: Expert
0

If you have security requirements that need you to lock Amazon Redshift users after a configurable X number of failed login attempts, you can use the solution provided in this article.

This solution leverages Amazon Redshift's logging capabilities to Amazon CloudWatch Logs to monitor and respond to authentication failures. When an authentication failure occurs, the logs are forwarded to CloudWatch Logs, and a subscription filter triggers a Lambda function to take action. The Lambda function updates a Redshift table to track the authentication failure history for each user, and if the number of failed login attempts exceeds a configurable threshold, it automatically disables the user's account. This automated response helps prevent further unauthorized access attempts and protects the integrity of the Amazon Redshift data warehouse. Below is the architecture diagram Enter image description here

By implementing this solution, organizations can enhance the overall security of their Amazon Redshift environment, proactively detect and respond to credential compromise attempts, and maintain a comprehensive audit trail of authentication-related events. Below are the steps to implement this solution

Enable audit logging to Cloudwatch in Amazon Redshift

  1. Sign in to the AWS Management Console and open the Amazon Redshift console

  2. On the navigation menu, choose Clusters, then choose the cluster that you want to update.

  3. Choose the Properties tab. On the Database configurations panel, choose Edit, then Edit audit logging. Enter image description here

  4. On the Edit audit logging page, choose Turn on and select CloudWatch.

  5. Choose all the logs to export.

  6. To save your choices, choose Save changes. Enter image description here

Create an IAM Policy and IAM Role that AWS Lambda can assume

Create an IAM policy with permissions for Amazon Redshift

  1. Navigate to IAM Policies page
  2. Choose Create policy button
  3. Choose JSON in Specify permissions screen to open Policy Editor
  4. Paste the below policy in the Policy Editor and choose Next
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": [
				"redshift-data:*",
				"redshift:GetClusterCredentialsWithIAM",
				"redshift:GetClusterCredentials",
				"redshift-serverless:GetCredentials"
			],
			"Resource": "*"
		}
	]
}
  1. In the Review and create page, for Policy name enter a descriptive name like redshift-lambda-authentication-failure-policy and enter a Description to describe the policy
  2. Choose Create Policy button

Create an IAM role that AWS Lambda can assume and attach the policy created in previous step to it

  1. Navigate to IAM Roles page
  2. Choose Create role button
  3. For Trusted entity type choose AWS Service. For Use case choose Lambda and choose Next
  4. In Add Permissions page, choose the policy you created in previous step and choose Next
  5. In the Name, review, and create page, For Role name, enter a descriptive name like redshift-lambda-authentication-failure-role
  6. For *Description *enter a text that describes the role eg: Allows Lambda functions to call Amazon Redshift on your behalf
  7. Choose Create role button

Create table in Amazon Redshift that tracks authentication failures Create the following table in you Amazon Redshift data warehouse

CREATE TABLE failed_login_attempts (
    username            VARCHAR(100)   NOT NULL,  -- Username of the user who failed to log in
    consecutive_failures INT           NOT NULL,  -- Number of consecutive login failures
    last_failure_time   TIMESTAMP      NOT NULL,  -- Timestamp of the last login failure
    PRIMARY KEY (username)                        -- Assuming username uniquely identifies a user's login failures
);

Create a Lambda function that will be executed in the event of authentication failure

  1. Navigate to AWS Lambda console page
  2. Choose Create Function
  3. In the Create Function page, for Function name enter a descriptive name for Eg:RedshiftAuthenticationFailuresCompliance
  4. For Runtime choose the latest Python version eg: Python3.12 (at the time of this writing)
  5. Expand Change default execution role section and choose the IAM previously created
  6. Choose Create function
  7. Use the below code in Lambda and deploy changes. This code has NO_ATTEMPTS set to 10, which means that a user will be locked after 10 login attempts. You can change it as per your requirement
import time
import boto3
import base64
import gzip
import json
from io import BytesIO
from datetime import datetime
from dateutil import parser
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Initialize the Redshift Data API client
redshift_client = boto3.client('redshift-data')

# Parameters to your Redshift Cluster
CLUSTER_ID = 'redshift-cluster-1'
DATABASE_NAME = 'dev'
DB_USER = 'awsuser'
DISABLED_PASSWORD = 'DisabledPassword1'
DISABLED_VALID_UNTIL_DATE = '1990-01-01'
NO_ATTEMPTS = 10

def lambda_handler(event, context):
    logger.info(f"Raw Event is: {event}")
    # Step 1: Decode the base64 encoded data
    compressed_data = event['awslogs']['data']
    decoded_data = base64.b64decode(compressed_data)
    # Step 2: Decompress the gzip data
    with gzip.GzipFile(fileobj=BytesIO(decoded_data)) as gzipfile:
        decompressed_payload = gzipfile.read()
    # Step 3: Parse the decompressed data as JSON
    payload = json.loads(decompressed_payload)
    logger.info(f"Authentication failure event raw record is: {payload}. Starting processing")
    for log_event in payload['logEvents']:
        log_message = log_event['message']
        process_log_message(log_message)
        
def process_log_message(log_message):
    # Assuming the log message is pipe-delimited
    columns = log_message.split('|')
    # Ensure the message is an authentication failure
    failure_time_str = columns[1].strip()  # 2nd field is the timestamp of failure
    username = columns[6].strip()  # 6th field is the username
    logger.info(f"Data extracted from raw record. Username is: {username}. Authenication failure timestamp is:{failure_time_str}")
    # Convert the timestamp string to a datetime object
    failure_time = datetime.strptime(failure_time_str, "%a, %d %b %Y %H:%M:%S:%f")
    logger.info(f"Extracted data is standardized. Username is: {username}. Authenication failure timestamp is:{failure_time}")
    # Handle the authentication failure for the user
    handle_failure(username, failure_time)
        
def handle_failure(username, failure_time):
    # Step 1: Check if there has been a successful login
    logger.info(f"Checking if there is a successful login for Username: {username}")
    if has_successful_login(username, failure_time):
        # Reset the failure count if a successful login was detected
        logger.info(f"There is a successful login since last authentication failure. Resetting failure count for username: {username}")
        reset_failure_count(username)
    else:
        # Step 2: Get the current failure count from Redshift
        logger.info(f"There is no successful login since last authentication failure. Retrieving total number of authentication failures")
        current_failures = get_consecutive_failures(username)
        logger.info(f"Total number of authentication failures: {current_failures}")
        # Step 3: Increment the failure count
        new_failures = current_failures + 1
        # Step 4: Insert or update the failure count in Redshift
        if current_failures > 0:
            logger.info(f"Incrementing total failures by performing an update")
            update_failure_count(username, new_failures, failure_time)
        else:
            logger.info(f"Inserting a row for failure")
            insert_failure_record(username, new_failures, failure_time)
        # Step 5: Lock the user if consecutive failures reach threshold
        if new_failures >= NO_ATTEMPTS:
            logger.info(f"Total consecutive failures is greater than or equal to {NO_ATTEMPTS}. Locking user")
            lock_user(username)
            logger.info(f"Resetting the failure row for user: {username}")
            reset_failure_count(username)
            
def has_successful_login(username, last_failure_time):
    # Query Redshift's STL_CONNECTION_LOG table to check for successful logins
    query = f"""
        SELECT COUNT(*)
        FROM stl_connection_log
        WHERE username = '{username}'
        AND event = 'authenticated'
        AND recordtime > '{last_failure_time}';
    """
    response = execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
    records = response['sql_result']
    if records and records[0][0]['longValue'] > 0:
        return True
    return False
    
def get_consecutive_failures(username):
    query = f"SELECT consecutive_failures FROM failed_login_attempts WHERE username = '{username}';"
    response = execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
    records = response['sql_result']
    if records:
        return int(records[0][0]['longValue'])
    return 0

def insert_failure_record(username, failures, last_failure_time):
    query = f"""
        INSERT INTO failed_login_attempts (username, consecutive_failures, last_failure_time)
        VALUES ('{username}', {failures}, '{last_failure_time}');
    """
    execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
    
def update_failure_count(username, failures, last_failure_time):
    query = f"""
        UPDATE failed_login_attempts
        SET consecutive_failures = {failures}, last_failure_time = '{last_failure_time}'
        WHERE username = '{username}';
    """
    execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
    
def reset_failure_count(username):
    query = f"DELETE FROM failed_login_attempts WHERE username = '{username}';"
    execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
    
def lock_user(username):
    # Lock the user in Redshift by disabling their access
    query = f"ALTER USER {username} PASSWORD '{DISABLED_PASSWORD}' VALID UNTIL '{DISABLED_VALID_UNTIL_DATE}';"
    execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)

def execute_sql_data_api(redshift_data_api_client, redshift_database_name, query, redshift_user, redshift_cluster_id):
    logger.info(f"Submitted Query using data API: {query}")
    res = None
    try:
        res = redshift_data_api_client.execute_statement(Database=redshift_database_name, DbUser=redshift_user, Sql=query, ClusterIdentifier=redshift_cluster_id)
    except Exception as e:
        logger.info(f"An error occured: {e}")
        raise Exception('Error Initiating query:' + e)
    logger.info(f"Response from data API: {res}")
    query_id = res['Id']
    logger.info(f"Statement ID from response is: {query_id}")
    done = False
    result = {}

    # Wait until query is finished or max cycles limit has been reached.
    while not done:
        time.sleep(1)
        desc = redshift_data_api_client.describe_statement(Id=query_id)
        logger.info(f"Checking the status for query id: {query_id}")
        query_status = desc['Status']
        logger.info(f"Query status is: {query_status}")
        if query_status == "FAILED":
            raise Exception('SQL query failed:' + query_id + ": " + desc['Error'])
            
        elif query_status == "FINISHED":
            logger.info(f"Statement response: {desc}")
            done = True
            result['status'] = query_status
            # print result if there is a result (typically from Select statement)
            if desc['HasResultSet']:
                logger.info(f"Query has result rows. Retrieving them")
                response = redshift_data_api_client.get_statement_result(Id=query_id)
                logger.info(f"Printing result --> {response['Records']}")
                result['sql_result'] = response['Records']
    return result

Create a Cloudwatch subscription filter that executed the lambda function in the event of authentication failure

  1. Navigate to Cloudwatch Log Groups console page
  2. Enter your Amazon Redshift cluster name in the search bar to see the log groups associated to that cluster as shown in the following screenshot. Choose the log group that ends in connectionlog Choose cloudwatch log group
  3. In the page that opens, choose Actions, then Subscription filters and Create Lambda subscription filter as shown in the following screenshot Create Subscription Filter
  4. In the Create Lambda subscription filter page a. For Lambda function choose the lambda function you created in the previous step b. Chose Other for Log format c. Enter "authentication failure" for Subscription filter pattern d. For Subscription filter name, enter a descriptive name eg: Redshift-Cluster-1 Authentication Failures Filter e. Choose Start Streaming Lambda subscription Filter
AWS
EXPERT
published a month ago99 views