Write PostgreSQL data to OmniFabric using Flink

This chapter describes how to write PostgreSQL data to OmniFabric using Flink.

Pre-preparation

This practice requires the installation and deployment of the following software environments:

Operational steps

wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1.1/flink-sql-connector-postgres-cdc-2.1.1.jar

Copy the jar package

Copy the Flink CDC connector and the corresponding Jar packages for flink-connector-jdbc_2.12-1.13.6.jar and mysql-connector-j-8.0.33.jar to flink-1.13.6/lib/ If flink is already started, restart flink and load the valid jar package.

Postgresql Turn on cdc configuration

  1. postgresql.conf Configuration

    #change the maximum number of wal send processes (default is 10), which is the same value as the solts setting above
    max_wal_senders = 10 # max number of walsender processes #break replication connections that have been inactive for more than the specified number of milliseconds, you can set it appropriately a little larger (default 60s)
    wal_sender_timeout = 180s # in milliseconds; 0 disables #change the maximum number of solts (default is 10), flink-cdc defaults to one table
    max_replication_slots = 10 # max number of replication slots #specify as logical
    wal_level = logical # minimal, replica, or logical
    
  2. pg_hba.conf

    #IPv4 local connections:
    host all all 0.0.0.0/0 password
    host replication all 0.0.0.0/0 password
    

Create table in postgresql and insert data

create table student
(
    stu_id integer not null unique,
    stu_name varchar(50),
    stu_age integer,
    stu_bth date
);

INSERT  into student VALUES (1,"lisa",12,'2022-10-12');
INSERT  into student VALUES (2,"tom",23,'2021-11-10');
INSERT  into student VALUES (3,"jenny",11,'2024-02-19');
INSERT  into student VALUES (4,"henry",12,'2022-04-22');

Building tables in OmniFabric

create table student
(
    stu_id integer not null unique,
    stu_name varchar(50),
    stu_age integer,
    stu_bth date
);

Start cluster

Switch to the flink directory and execute the following command:

./bin/start-cluster.sh
./bin/sql-client.sh

Turn on checkpoint

Set up checkpoint every 3 seconds

SET execution.checkpointing.interval = 3s;
CREATE TABLE pgsql_bog  (
  stu_id  int not null,
  stu_name    varchar(50),
  stu_age     int,
  stu_bth     date,
 primary key (stu_id) not enforced
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'xx.xx.xx.xx',
  'port' = '5432',
  'username' = 'postgres',
  'password' = '123456',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'student',
  'decoding.plugin.name' = 'pgoutput' ,
  'debezium.snapshot.mode' = 'initial'
  ) ;

If it's table sql, pgoutput is the standard logical decode output plugin in PostgreSQL 10+. It needs to be set up. Without adding: 'decoding.plugin.name' = 'pgoutput', an error is reported: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory.

Create sink table

CREATE TABLE test_pg (
  stu_id  int not null,
  stu_name    varchar(50),
  stu_age     int,
  stu_bth     date,
  primary key (stu_id) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:6001/postgre',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '111',
'table-name' = 'student'
);

Importing PostgreSQL data into OmniFabric

insert into test_pg select * from pgsql_bog;

Query the corresponding table data in OmniFabric;

mysql> select * from student;

+--------+----------+---------+------------+
| stu_id | stu_name | stu_age | stu_bth    |

+--------+----------+---------+------------+
|      1 | lisa     |      12 | 2022-10-12 |
|      2 | tom      |      23 | 2021-11-10 |
|      3 | jenny    |      11 | 2024-02-19 |
|      4 | henry    |      12 | 2022-04-22 |

+--------+----------+---------+------------+
4 rows in set (0.00 sec)

Data can be found to have been imported

Adding data to postgrsql

insert into public.student values (51, '58', 39, '2020-01-03');

Query the corresponding table data in OmniFabric;

mysql>  select * from student;

+--------+----------+---------+------------+
| stu_id | stu_name | stu_age | stu_bth    |

+--------+----------+---------+------------+
|      1 | lisa     |      12 | 2022-10-12 |
|      2 | tom      |      23 | 2021-11-10 |
|      3 | jenny    |      11 | 2024-02-19 |
|      4 | henry    |      12 | 2022-04-22 |
|     51 | 58       |      39 | 2020-01-03 |

+--------+----------+---------+------------+
5 rows in set (0.01 sec)

You can find that the data has been synchronized to the OmniFabric correspondence table.

To delete data:

delete from public.student where stu_id=1;

If something goes wrong,

cannot delete from table "student" because it does not have a replica identity and publishes deletes

then execute

alter table public.student replica identity full;

Query the corresponding table data in OmniFabric;

mysql> select * from student;

+--------+----------+---------+------------+
| stu_id | stu_name | stu_age | stu_bth    |

+--------+----------+---------+------------+
|      2 | tom      |      23 | 2021-11-10 |
|      3 | jenny    |      11 | 2024-02-19 |
|      4 | henry    |      12 | 2022-04-22 |
|     51 | 58       |      39 | 2020-01-03 |

+--------+----------+---------+------------+
4 rows in set (0.00 sec)