How do I replicate tables in RDS for PostgreSQL using logical replication?

7 minute read
0

I want to replicate tables between databases in Amazon Relational Database Service (Amazon RDS) for PostgreSQL without using any extensions.

Resolution

A typical use case for logical replication is replicating a set of tables between two Amazon RDS for PostgreSQL DB instances. RDS for PostgreSQL supports logical replication with PostgreSQL 10.4 and later. Amazon Aurora PostgreSQL-Compatible Edition version 2.2.0 and later supports logical replication with PostgreSQL 10.6 and later.

In the resolution provided, two source tables are replicated to two target tables using logical replication in RDS for PostgreSQL. Logical replication first performs the initial load of data already present in the source tables and then continues to replicate ongoing changes.

Turn on logical replication

To turn on logical replication in RDS for PostgreSQL, modify a custom parameter group to set rds.logical_replication to 1 and attach rds.logical_replication to the DB instance. Update the parameter group to set rds.logical_replication to 1 if a custom parameter group is attached to a DB instance. The rds.logical_replication parameter is a static parameter that requires a DB instance reboot to take effect. When the DB instance reboots, the wal_level parameter is set to logical.

Verify the values for wal_level and rds.logical_replication:

postgres=> SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
          name           | setting
-------------------------+---------
 rds.logical_replication | on
 wal_level               | logical
(2 rows)

Connect to source database in source DB instance

Connect to the source database in the source RDS for PostgreSQL DB instance. Create the source tables:

source=> CREATE TABLE reptab1 (slno int primary key);
CREATE TABLE
source=> CREATE TABLE reptab2 (name varchar(20));
CREATE TABLE

Insert data into the source tables:

source=> INSERT INTO reptab1 VALUES (generate_series(1,1000));
INSERT 0 1000
source=> INSERT INTO reptab2 SELECT SUBSTR ('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',((random()*(36-1)+1)::integer),1) FROM generate_series(1,50);
INSERT 0 50

Create a publication for source tables

Create a publication for the source tables. Use a SELECT query to verify the details of the publication that was created:

source=> CREATE PUBLICATION testpub FOR TABLE reptab1,reptab2;
CREATE PUBLICATION
source=> SELECT * FROM pg_publication;
  oid   | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
--------+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
 115069 | testpub |    16395 | f            | t         | t         | t         | t           | f
(1 row)

Verify that the source tables are added to the publication:

source=> SELECT * FROM pg_publication_tables;
 pubname | schemaname | tablename
---------+------------+-----------
 testpub | public     | reptab1
 testpub | public     | reptab2
(2 rows)

Note: To replicate all the tables in a database, run this command:

CREATE PUBLICATION testpub FOR ALL TABLES;

Connect to target database and create target tables

Connect to the target database in the target DB instance. Create the target tables with the same names as the source tables. Be sure that there's no data present in the target tables by running a SELECT query on the target tables:

target=> CREATE TABLE reptab1 (slno int primary key);
CREATE TABLE
target=> CREATE TABLE reptab2 (name varchar(20));
CREATE TABLE
target=> SELECT count(*) FROM reptab1;
 count
-------
     0
(1 row)
target=> SELECT count(*) FROM reptab2;
 count
-------
     0
(1 row)

Create and verify subscription in target database

Create the subscription in the target database. Use a SELECT query to verify whether the subscription is enabled:

target=> CREATE SUBSCRIPTION testsub CONNECTION 'host=<source RDS/host endpoint> port=5432 dbname=<source_db_name> user=<user> password=<password>' PUBLICATION testpub;
NOTICE:  Created replication slot "testsub" on publisher
CREATE SUBSCRIPTION
target=> SELECT oid,subname,subenabled,subslotname,subpublications FROM pg_subscription;
  oid  | subname | subenabled | subslotname | subpublications
-------+---------+------------+-------------+-----------------
 16434 | testsub | t          | testsub     | {testpub}
(1 row)

Important: To avoid storing a plaintext version of your user name and password in the database logs, before creating the subscription, run the following commands:

target=> SET log_min_messages to 'PANIC';
SET
target=> SET log_statement to NONE;
SET

When the subscription is created, the subscription loads all the data present in the source tables to the target tables. Run a SELECT query on the target tables to verify that the initial data loads:

target=> SELECT count(*) FROM reptab1;
 count
-------
  1000
(1 row)
target=> SELECT count(*) FROM reptab2;
 count
-------
    50
(1 row)

Verify replication slot in source database

The creation of a subscription in the target database creates a replication slot in the source database. Verify the replication slot details by running the following SELECT query on the source database:

source=> SELECT * FROM pg_replication_slots;
 slot_name |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
 ----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
 testsub   | pgoutput | logical   | 115048 | source   | f         | t      |        846 |      |         6945 | 58/B4000568 | 58/B40005A0         | reserved   |
(1 row)

Test replication from source tables

Test whether data changes in the source tables are being replicated to the target tables by inserting rows into the source tables:

source=> INSERT INTO reptab1 VALUES(generate_series(1001,2000));
INSERT 0 1000
source=> INSERT INTO reptab2 SELECT SUBSTR('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',((random()*(36-1)+1)::integer),1) FROM generate_series(1,50);
INSERT 0 50
source=> SELECT count(*) FROM reptab1;
 count
-------
  2000
(1 row)
source=> SELECT count(*) FROM reptab2; count
-------
   100
(1 row)

Test replication by verifying number of rows in target tables

Verify the number of rows in the target tables to confirm that new inserts are being replicated to the target tables:

target=> SELECT count(*) FROM reptab1;
 count
-------
  2000
(1 row)
target=> SELECT count(*) FROM reptab2;
 count
-------
   100
(1 row)

Clean up and turn off logical replication

Clean up and turn off logical replication when the replication is complete and is no longer required. Inactive replication slots cause the accumulation of WAL files on the source DB instances. WAL files might fill storage and cause outages.

Drop the subscription on the target database:

target=> DROP SUBSCRIPTION testsub;
NOTICE:  Dropped replication slot "testsub" on publisher
DROP SUBSCRIPTION
target=> SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subconninfo | subslotname | subsynccommit | subpublications
----+---------+---------+----------+------------+-------------+-------------+---------------+-----------------
(0 rows)

Note: Dropping the subscription also drops the replication slot the subscription created.

Verify that the replication slot is dropped from the source database by running the following SELECT query statement on the source:

source=> SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------
(0 rows)

Drop the publication. Verify that the publication is dropped successfully:

source=> DROP PUBLICATION testpub;
DROP PUBLICATION
source=> SELECT * FROM pg_publication;
 oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-----+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
(0 rows)
source=> SELECT * FROM pg_publication_tables;
 pubname | schemaname | tablename
---------+------------+-----------
(0 rows)

Modify rds.logical_replication to 0 in the custom parameter group that is attached to the DB instance. Reboot the DB instance as required if the DB instance is not using logical replication.

Review max_replication_slots, max_wal_senders, max_logical_replication_workers, max_worker_processes, and max_sync_workers_per_subscription based on your usage.

Note: The following commands check whether there are any inactive replication slots, determine the respective sizes of the slots, and then drop the slots, if required.

SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) AS replicationSlotLag, active FROM pg_replication_slots ;
SELECT pg_drop_replication_slot('Your_slotname_name');

Related information

PostgreSQL documentation for Replication

3 Comments

I'm connected to my RDS DB as the user postgres. I get the following error whenever I try to create a subscription to an external DB ERROR: could not connect to the publisher: FATAL: must be superuser or replication role to start walsender. I've already assigned the rds_replication role to the user but that didn't help. Which user should I be using to create a subscription?

I also noticed that the rds_replication role doesn't actually seem to have the replication role set on them. Is this a bug?

=> SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'rds_replication';
     rolname     | rolreplication
-----------------+----------------
 rds_replication | f
(1 row)
replied a year ago

Thank you for your comment. We'll review and update the Knowledge Center article as needed.

profile pictureAWS
MODERATOR
replied a year ago

@aws_official I believe this is actually a bug how do I report it?

replied a year ago