Create custom alerts in Amazon Redshift

16 minute read
Content level: Advanced
4

This article provides a framework for establishing custom monitoring and alerting rules in Amazon Redshift based on user actions

Introduction

Amazon Redshift provides out-of-the box alerting based on common metrics like CPU utilisation, latency and throughput via CloudWatch metrics.

Sometimes, customers may want to detect and alert on very specific types of events or actions taken within their data warehouse. For example, customers may want to be notified if a large number of records have been unloaded from their environment. Similarly, they may want to be notified of attempted table drops, or even read actions on a specific entity. This article provides a framework for implementing such custom alerts using Amazon Redshift, AWS Lambda and Amazon Simple Notification Service (SNS) by working through an example scenario with public datasets.

Scenario

Consider a fictitious company who operate their data warehousing environment using Amazon Redshift Serverless. They have configured alarms for resource and cost utilisation metrics, but would like to take additional measures to proactively monitor the actions that their users are taking within the data warehouse.

Specifically, they want to be notified if their users take any of the below actions:

Action typeTableThreshold
Export data (UNLOAD)customer> 100 records exported
Export data (UNLOAD)customer_address> 10 records exported
Export data (UNLOAD)customer_demographics> 10 records exported
Attempt to drop tablecustomerN/A
Attempt to drop tablecustomer_addressN/A
Attempt to drop tablecustomer_demographicsN/A
Frequent table readscustomer_address> 10 selects in one day

Prerequisites

To work through the example scenario in this article, please ensure you have:

  • An Amazon Redshift Serverless workgroup
    • Note that deployment of this solution on a provisioned Amazon Redshift cluster requires some slight modifications to system table references. If you are interested in this, please post in the comments.
  • A default role attached to Redshift that has permission to
  • Sufficient IAM privileges to deploy and configure an Amazon Simple Notification Service (SNS) topic and AWS Lambda function
  • An Amazon S3 location where datasets can be written to for testing purposes

By following the below steps you will create the following objects in your account:

  • One Amazon Simple Notification Service (SNS) topic
  • One AWS Lambda function
  • One AWS Identity and Access Management (IAM) role
  • Multiple Amazon Redshift objects including tables, Lambda User Defined Functions (UDFs), and stored procedures

Architecture

The solution depicted below performs the following steps:

  1. A stored procedure interrogates system tables based on alerting rules defined in the configuration table cfg_event_monitoring
  2. If any alert-invoking events are detected, the procedure will call a Redshift Lambda User-Defined Function (Lambda UDF), providing detailed information about the event(s).
  3. The AWS Lambda function will make a call to Amazon Simple Notification Service (SNS) to notify required parties
  4. The stored procedure will then log the details of its execution to configuration table cfg_event_monitoring_log for historical analysis. Enter image description here

Configuration: cfg_event_monitoring

The architecture above makes use of a configuration driven approach to adding or removing alerting rules. For each custom alerting rule, a user is required to provide the following:

  • event_id: A unique identifier for the type of event (a single event type may relate to multiple monitored events)
  • event_name: A unique name for the type of event
  • event_details: a json representation of specific entity level details that instruct the stored procedure to run specific checks. Note that this is at a lower grain when compared to event_id and event_name
  • event_active_flag: A boolean attribute that represents the state of each event check. When set to FLASE, the procedure will skip checking for that specific event.

For each type of alerting rule, a corresponding method for validating violations of that rule must be present in the stored procedure.

Implementation steps - SNS and Lambda

  1. In the AWS Management console, open Simple Notification Service > Topics and select ‘Create topic’.
  2. For Type, select Standard. Provide a topic Name [e.g. redshift-custom-alerting-topic] and optionally, a Display name. All other options can be left as default, or set per your preferences. Scroll to the bottom of the page and click ‘Create topic’.

Enter image description here

  1. Upon topic creation, you will be taken to the topic configuration page. Click on the yellow Create subscription button.
  2. Create as many subscriptions using the protocol(s) of your choice. These subscriptions will determine who is notified when a custom alert is triggered, and by which channel will they be notified. For the purpose of this post, we will use Email protocol. Note that after subscriptions are created, they must be confirmed by the recipient.
  3. Next, you need to deploy a Lambda function. To do this, open Lambda from the AWS Management Console and click the Create function button.
  4. Use the Author from scratch option, provide a function name [e.g. redshift-custom-alerting-lambda] and select Python 3.11 as the Runtime. Enter image description here
  5. Under Permissions, expand the Change default execution role section and select the radio button for Create a new role from AWS policy templates. Provide a Role Name [e.g. redshift-custom-alerting-lambda-role], and search for the ‘Amazon SNS publish policy’ within Policy templates. Add policy by clicking on it, and then click Create function. Enter image description here
  6. In the Code Source section, add the below code, remembering to replace the <SNS_TOPIC_ARN> with the ARN of your SNS topic. This can be found in the the topic configuration page.
import json
import boto3

def send_sns(message, subject):
    client = boto3.client("sns")
    topic_arn = "<SNS_TOPIC_ARN>"
    response = client.publish(TopicArn=topic_arn, Message=message, Subject=subject)

def lambda_handler(event, context):
    try:
        response = dict()
        records = event["arguments"]
        results = []
        
        for record in records:
            subject = record[0]
            message = record[1]
            try:
                send_sns(message, subject)
                print("Message",subject," published with detail:",message)
                results.append("Alert sent for {}".format(subject))
            except:
                results.append(None)

        response['success'] = True   
        response['results'] = results
    except Exception as e:
        response['success'] = False  
        response['error_msg'] = str(e)

    return json.dumps(response)
  1. Deploy your code by clicking the Deploy button.

Implementation steps - Redshift:

  1. Log into Amazon Redshift Query Editor v2 (QEv2) through the AWS console
  2. Connect to your Redshift Serverless Workgroup and open a new editor tab
  3. Execute the following CREATE FUNCTION statement to deploy the Lambda UDF for sending notifications. Make sure to update the AWS Lambda function name if you chose a different name when creating the function.
CREATE EXTERNAL FUNCTION send_custom_alert(VARCHAR,VARCHAR)
RETURNS VARCHAR
VOLATILE
LAMBDA 'redshift-custom-alerting-lambda'
IAM_ROLE DEFAULT;
  1. Execute the following CREATE TABLE statement to create the monitoring configuration table
CREATE TABLE public.cfg_event_monitoring (
    event_id NUMERIC,
    event_name VARCHAR(100),
    event_details SUPER,
    event_active_flag BOOLEAN
);
  1. Execute the following INSERT statement to create your first event metadata records. Some sample events have been supplied for demonstrative purposes.
INSERT INTO public.cfg_event_monitoring values (
    1,
    'large unload executed',
    JSON_PARSE('{
        "events":[
            {
                "table_name":"customer",
                "num_rows":100
            },
            {
                "table_name":"customer_address",
                "num_rows":10
            },
            {
                "table_name":"customer_demographics",
                "num_rows":10
            }
        ]
    }'),
    true
),
(
    2,
    'attempted drop table',
    JSON_PARSE('{
        "events":[
            {
                "table_name":"customer"
            },
            {
                "table_name":"customer_address"
            },
            {
                "table_name":"customer_demographics"
            }
        ]
    }'),
    true
),
(
    3,
    'frequent selects on entity',
    JSON_PARSE('{
        "events":[
            {
                "table_name":"customer_address",
                "num_selects":10
            }
        ]
    }'),
    true
);
  1. Execute the following CREATE TABLE statement to create the table to be used for logging historical executions
CREATE TABLE public.cfg_event_monitoring_log
(
    event_id INTEGER,
    event_observation_time_utc TIMESTAMP DEFAULT sysdate,
    EVENT_NOTIFICATION_TRIGGER_FLAG BOOLEAN DEFAULT false,
    event_notification_details SUPER
);
  1. Create the stored procedure to be used for assessing events. This example procedure is able to monitor for the following event types
    1. Data unloaded from the data warehouse
    2. Frequent selects on an entity
    3. Attempted table drops
CREATE OR REPLACE PROCEDURE custom_alerting_framework() AS $$
DECLARE
-- Generic variables
    event_record RECORD;
    event_details_record RECORD;
    event_details_table_name VARCHAR(200) := '';
    event_id_counter INTEGER := 0;
    event_loop_flag BOOLEAN := true;
    event_notification_trigger_flag BOOLEAN := false;
    notification_triggering_tables SUPER := ARRAY();
    lambda_function_subject VARCHAR(1000) := '';
    lambda_function_message VARCHAR(4000) := '';
    check_for_notification_record RECORD;
-- Event type specific variables
---- large unload executed
    event_details_num_rows INTEGER := 0;
---- frequent selects on entity
    event_details_num_selects INTEGER := 0;

BEGIN
    WHILE event_loop_flag LOOP
        SELECT INTO event_id_counter MIN(event_id) FROM public.cfg_event_monitoring WHERE event_id > event_id_counter AND event_active_flag;
        IF event_id_counter IS NULL THEN
            RAISE INFO 'Completed event checks';
            RETURN;
        END IF;

        SELECT INTO event_record * FROM public.cfg_event_monitoring WHERE event_id = event_id_counter;

        /*###################### CHECK: large table unloads ######################*/

        IF event_record.event_name = 'large unload executed' THEN 
            
            FOR event_details_record IN (
                SELECT  ev.table_name::varchar, ev.num_rows
                FROM    public.cfg_event_monitoring em, em.event_details.events ev
                WHERE   em.event_active_flag
                AND     em.event_id = event_record.event_id
            ) LOOP
                event_details_table_name = event_details_record.table_name;
                event_details_num_rows = event_details_record.num_rows;

                RAISE INFO 'Procedure is searching for unloads of table % with more than % rows unloaded', event_details_table_name, event_details_num_rows;

                /* Check to see if there have been any unloads that satisfy the provided configuration */
                SELECT INTO check_for_notification_record
                        send_custom_alert('Redshift Alert: large unload detected for table ' || event_details_table_name,
                                'Large unload detected for table ' || event_details_table_name || chr(10) ||
                                'User name: ' || user_name || chr(10) ||
                                'Unload start time: ' || start_time || chr(10) || 
                                'Unload end time: ' || end_time || chr(10) || 
                                'Unload location: ' || unloaded_location || chr(10) || 
                                'Unloaded files count: ' || unloaded_files_count || chr(10) ||
                                'Unloaded rows: ' || unloaded_rows || chr(10) || 
                                'Unloaded files size (GB): ' || unloaded_files_size_gb
                        ) AS output
                    FROM (
                        SELECT
                            uh.user_id, 
                            u.usename::varchar AS user_name,
                            uh.query_id, 
                            qh.query_text,
                            uh.status, 
                            uh.start_time, 
                            uh.end_time, 
                            uh.unloaded_location, 
                            uh.unloaded_rows, 
                            uh.unloaded_files_count, 
                            uh.unloaded_files_size*1e-9 AS unloaded_files_size_gb
                        FROM sys_unload_history uh
                        INNER JOIN pg_user u
                        ON uh.user_id = u.usesysid
                        LEFT JOIN sys_query_history qh
                        ON uh.query_id = qh.query_id
                        WHERE (
                                LOWER(qh.query_text) LIKE '%unload%from%' || event_details_table_name || '%to%'
                                AND 
                                LOWER(qh.query_text) NOT LIKE '%unload%from%\\_' || event_details_table_name || '%to%'
                                AND 
                                LOWER(qh.query_text) NOT LIKE '%unload%from%' || event_details_table_name || '\\_%to%'
                        )
                        AND uh.unloaded_rows > event_details_num_rows
                        AND uh.end_time > (SELECT NVL(MAX(event_observation_time_utc),sysdate-1) FROM public.cfg_event_monitoring_log hist WHERE hist.event_id = event_record.event_id AND hist.event_notification_trigger_flag)
                    ) qt
                    ;
                IF NOT FOUND THEN
                    RAISE INFO 'No large unload events detected for table %', event_details_table_name;
                ELSE 
                    event_notification_trigger_flag = true;
                    notification_triggering_tables = array_concat(notification_triggering_tables, array(event_details_table_name));

                    RAISE INFO 'Notification triggered for unload of table %', event_details_table_name;
                END IF;
            END LOOP;

            /* Log this check in configuration table for historical analysis*/ 
            EXECUTE 'INSERT INTO public.cfg_event_monitoring_log 
                            (event_id, event_notification_trigger_flag, event_notification_details) 
                            VALUES 
                            ('|| 
                                event_record.event_id ||','|| 
                                event_notification_trigger_flag::integer ||
                                ',json_parse('''|| json_serialize(notification_triggering_tables) ||
                            '''))';
            /* reset notification triggering tables array */
            notification_triggering_tables = ARRAY();
        
        /*###################### CHECK: frequent selects on entity ######################*/

        ELSEIF event_record.event_name = 'frequent selects on entity' THEN 

            FOR event_details_record IN (
                SELECT  ev.table_name::varchar, ev.num_selects
                FROM    public.cfg_event_monitoring em, em.event_details.events ev
                WHERE   em.event_active_flag
                AND     em.event_id = event_record.event_id
            ) LOOP
                event_details_table_name = event_details_record.table_name;
                event_details_num_selects = event_details_record.num_selects;

                RAISE INFO 'Procedure is searching for selects on table % with more than % executions within a 1 day period', event_details_table_name, event_details_num_selects;
                
                /* Check to see if there have been any frequent selects that satisfy the provided configuration */
                SELECT INTO check_for_notification_record
                        send_custom_alert('Redshift Alert: large number of selects on table ' || event_details_table_name,
                                'Large number of selects on table ' || event_details_table_name || chr(10) ||
                                'User name: ' || user_name || chr(10) ||
                                'First query start time: ' || start_time || chr(10) || 
                                'Last query end time: ' || end_time || chr(10) ||                                 
                                'Total returned rows: ' || returned_rows || chr(10) || 
                                'Total returned payload size (GB): ' || returned_data_gb
                        ) AS output
                    FROM (
                        SELECT
                            qh.user_id, 
                            u.usename::varchar AS user_name,
                            MIN(qh.start_time) AS start_time, 
                            MAX(qh.end_time) AS end_time,  
                            SUM(qh.returned_rows) AS returned_rows, 
                            SUM(qh.returned_bytes*1e-9) AS returned_data_gb
                        FROM sys_query_history qh
                        INNER JOIN pg_user u
                        ON qh.user_id = u.usesysid
                        WHERE (
                                LOWER(qh.query_text) LIKE '%select%from%' || event_details_table_name || '%'
                                AND 
                                LOWER(qh.query_text) NOT LIKE '%select%from%\\_' || event_details_table_name || '%'
                                AND 
                                LOWER(qh.query_text) NOT LIKE '%select%from%' || event_details_table_name || '\\_%'
                        )
                        AND qh.end_time > (SELECT NVL(MAX(event_observation_time_utc),sysdate-1) FROM public.cfg_event_monitoring_log hist WHERE hist.event_id = event_record.event_id AND hist.event_notification_trigger_flag)
                        GROUP BY qh.user_id, u.usename
                        HAVING count(*) > event_details_num_selects
                    ) qt
                    ;
                IF NOT FOUND THEN
                    RAISE INFO 'The table % does not have a large number of selects', event_details_table_name;
                ELSE 
                    event_notification_trigger_flag = true;
                    notification_triggering_tables = array_concat(notification_triggering_tables, array(event_details_table_name));

                    RAISE INFO 'Notification triggered for large amount of selects on table %', event_details_table_name;
                END IF;
            END LOOP;

            /* Now record the latest runtime of the most-recent check for this event */
            EXECUTE 'INSERT INTO public.cfg_event_monitoring_log 
                            (event_id, event_notification_trigger_flag, event_notification_details) 
                            VALUES 
                            ('|| 
                                event_record.event_id ||','|| 
                                event_notification_trigger_flag::integer ||
                                ',json_parse('''|| json_serialize(notification_triggering_tables) ||
                            '''))';
            /* reset notification details array */
            notification_triggering_tables = array();

        /*###################### CHECK: attempted drop table ######################*/

        ELSEIF event_record.event_name = 'attempted drop table' THEN 

            FOR event_details_record IN (
                SELECT  ev.table_name::varchar
                FROM    public.cfg_event_monitoring em, em.event_details.events ev
                WHERE   em.event_active_flag
                AND     em.event_id = event_record.event_id
            ) LOOP
                event_details_table_name = event_details_record.table_name;

                RAISE INFO 'Procedure is searching for attempts to drop table %', event_details_table_name;
                
                /* Check to see if there have been any frequent selects that satisfy the provided configuration */
                SELECT INTO check_for_notification_record
                        send_custom_alert('Redshift Alert: Attempted drop table on entity ' || event_details_table_name || ' with status: ' || status,
                                'Attempted drop on table: ' || event_details_table_name || chr(10) ||
                                'User name: ' || user_name || chr(10) ||
                                'Query start time: ' || start_time || chr(10) ||  
                                'Query status: ' || status || chr(10)                         
                        ) AS output
                    FROM (
                        SELECT
                            qh.user_id, 
                            u.usename::varchar AS user_name,
                            qh.START_TIME, 
                            qh.status  
                        FROM sys_query_history qh
                        INNER JOIN pg_user u
                        ON qh.user_id = u.usesysid
                        WHERE (
                                LOWER(qh.query_text) LIKE '%drop table%' || event_details_table_name || '%'
                                AND 
                                LOWER(qh.query_text) NOT LIKE '%drop table%\\_' || event_details_table_name || '%'
                                AND 
                                LOWER(qh.query_text) NOT LIKE '%drop table%' || event_details_table_name || '\\_%'
                        )
                        AND qh.query_type = 'DDL'  
                        AND qh.end_time > (SELECT NVL(MAX(event_observation_time_utc),sysdate-1) FROM public.cfg_event_monitoring_log hist WHERE hist.event_id = event_record.event_id AND hist.event_notification_trigger_flag)
                    ) qt
                    ;
                IF NOT FOUND THEN
                    RAISE INFO 'The table % has no attempted drop statements', event_details_table_name;
                ELSE 
                    event_notification_trigger_flag = true;
                    notification_triggering_tables = array_concat(notification_triggering_tables, array(event_details_table_name));

                    RAISE INFO 'Notification triggered for attempted drop of table %', event_details_table_name;
                END IF;
            END LOOP;

            /* Now record the latest runtime of the most-recent check for this event */
            EXECUTE 'INSERT INTO public.cfg_event_monitoring_log 
                            (event_id, event_notification_trigger_flag, event_notification_details) 
                            VALUES 
                            ('|| 
                                event_record.event_id ||','|| 
                                event_notification_trigger_flag::integer ||
                                ',json_parse('''|| json_serialize(notification_triggering_tables) ||
                            '''))';
            /* reset notification details array */
            notification_triggering_tables = array();

        END IF;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

Test the monitoring procedure

Execute the following steps to test the monitoring and alerting framework:

  1. Execute the following CREATE TABLE and COPY statements to import test data.
CREATE TABLE public.customer_address
(
 ca_address_sk int4 NOT NULL ,
  ca_address_id CHAR(16) NOT NULL ,
  ca_street_number CHAR(10) ,      
  ca_street_name VARCHAR(60) ,   
  ca_street_type CHAR(15) ,     
  ca_suite_number CHAR(10) ,    
  ca_city VARCHAR(60) ,         
  ca_county VARCHAR(30) ,       
  ca_state CHAR(2) ,            
  ca_zip CHAR(10) ,             
  ca_country VARCHAR(20) ,      
  ca_gmt_offset NUMERIC(5,2) ,  
  ca_location_type CHAR(20)     
  ,PRIMARY KEY (ca_address_sk)
) DISTKEY(ca_address_sk);

CREATE TABLE public.customer_demographics
(
  cd_demo_sk INT4 NOT NULL ,   
  cd_gender CHAR(1) ,          
  cd_marital_status CHAR(1) ,   
  cd_education_status CHAR(20) , 
  cd_purchase_estimate INT4 ,   
  cd_credit_rating CHAR(10) ,   
  cd_dep_count INT4 ,             
  cd_dep_employed_count INT4 ,    
  cd_dep_college_count INT4       
  ,PRIMARY KEY (cd_demo_sk)
)DISTKEY (cd_demo_sk);

CREATE TABLE public.customer
(
  c_customer_sk INT4 NOT NULL ,                 
  c_customer_id CHAR(16) NOT NULL ,             
  c_current_cdemo_sk INT4 ,   
  c_current_hdemo_sk INT4 ,   
  c_current_addr_sk INT4 ,    
  c_first_shipto_date_sk INT4 ,                 
  c_first_sales_date_sk INT4 ,
  c_salutation CHAR(10) ,     
  c_first_name CHAR(20) ,     
  c_last_name CHAR(30) ,      
  c_preferred_cust_flag CHAR(1) ,               
  c_birth_day INT4 ,          
  c_birth_month INT4 ,        
  c_birth_year INT4 ,         
  c_birth_country VARCHAR(20) ,                 
  c_login CHAR(13) ,          
  c_email_address CHAR(50) ,  
  c_last_review_date_sk INT4 ,
  PRIMARY KEY (c_customer_sk)
) DISTKEY(c_customer_sk);

COPY public.customer_address FROM 's3://redshift-downloads/TPC-DS/2.13/3TB/customer_address/' IAM_ROLE DEFAULT GZIP DELIMITER '|' EMPTYASNULL REGION 'us-east-1';
COPY public.customer_demographics FROM 's3://redshift-downloads/TPC-DS/2.13/3TB/customer_demographics/' IAM_ROLE DEFAULT GZIP DELIMITER '|' EMPTYASNULL REGION 'us-east-1';
COPY public.customer FROM 's3://redshift-downloads/TPC-DS/2.13/3TB/customer/' IAM_ROLE DEFAULT GZIP DELIMITER '|' EMPTYASNULL REGION 'us-east-1';
  1. UNLOAD in excess of 100 records from the customer table. Be sure to replace <S3_LOCATION> with an appropriate S3 location, as per the prerequisites. Note that the predicate in the UNLOAD statement restricts unloaded record count to 122,600.
UNLOAD ('SELECT * FROM public.customer WHERE TRIM(c_last_name) = ''Thompson''') TO '<S3_LOCATION>' IAM_ROLE DEFAULT;
  1. Execute the below SELECT statement more than 10 times.
SELECT * FROM public.customer_address LIMIT 10;
  1. DROP the customer demographics table. Note that you may re-instantiate this table later by re-executing the relevant create table and COPY command above.
DROP TABLE public.customer_demographics;
  1. You can now execute the monitoring procedure and inspect the the logging table and email alerts.
CALL custom_alerting_framework();
  1. Once executed successfully, the stored procedure will insert 3 records into the history table cfg_event_monitoring_log. This can be confirmed by executing the below SQL query.
SELECT * FROM public.cfg_event_monitoring_log ORDER BY event_observation_time_utc DESC;
  1. You should also receive notifications relating to these entries, with specific details of the monitoring procedure’s findings. If your alerts do not arrive, please ensure you have confirmed the SNS subscription (this may require checking junk folders).
  2. Re-execution of the procedure select should reveal three new entries in cfg_event_monitoring_log with event_notification_trigger_flag = false. This is because a single detected event will only trigger one notification. Re-run the unload, drop, or multiple select statements to trigger a new notification.

Conclusion

In this article, we have established a framework to enable monitoring and alerting on custom in-data-warehouse events. This example implementation has been designed to detect and alert on large data unloads, frequent selects on entities and attempted table drops (successful and failed).

To extend this framework for detection of other in-data-warehouse events such as truncates or new user logins, appropriate entires can be made in cfg_event_monitoring along with corresponding logic tests in the procedure custom_alerting_framework.

This stored procedure can now be scheduled as a regular workload to identify and alert on specific user events or actions.

Clean up

To clean up your environment after executing the above steps, please:

  • Delete the SNS topic redshift-custom-alerting-topic
  • Delete the Lambda function redshift-custom-alerting-lambda
  • Delete the IAM Role redshift-custom-alerting-lambda-role
  • Delete the files unloaded to Amazon S3
  • Execute the following statements to remove objects created in Redshift:
DROP TABLE public.cfg_event_monitoring;
DROP TABLE public.cfg_event_monitoring_log;
DROP TABLE public.customer_address;
DROP TABLE IF EXISTS public.customer_demographics;
DROP TABLE public.customer;
DROP PROCEDURE public.custom_alerting_framework();
DROP FUNCTION send_custom_alert(VARCHAR,VARCHAR);

Authors

Rick Fraser - Specialist Data Solutions Architect

Sean Beath - Specialist Redshift Solutions Architect