Write Oracle data to OmniFabric using Flink

This chapter describes how to write Oracle data to OmniFabric using Flink.

Pre-preparation

This practice requires the installation and deployment of the following software environments:

Operational steps

Create a table in Oracle and insert data

create table flinkcdc_empt
(
    EMPNO    NUMBER not null primary key,
    ENAME    VARCHAR2(10),
    JOB      VARCHAR2(9),
    MGR      NUMBER(4),
    HIREDATE DATE,
    SAL      NUMBER(7, 2),
    COMM     NUMBER(7, 2),
    DEPTNO   NUMBER(2)
)

--Modify the FLINKCDC_EMPT table to support incremental logging
ALTER TABLE scott.FLINKCDC_EMPT ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

--Insert test data:
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(1, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(2, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(3, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(4, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(5, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(6, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);
INSERT INTO SCOTT.FLINKCDC_EMPT (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO) VALUES(5989, 'TURNER', 'SALESMAN', 7698, TIMESTAMP '2022-10-31 16:21:11.000000', 1500, 0, 30);

Creating a Target Table in OmniFabric

create database test;
use test;
CREATE TABLE `oracle_empt` (
    `empno` bigint NOT NULL COMMENT "",
    `ename` varchar(10) NULL COMMENT "",
    `job` varchar(9) NULL COMMENT "",
    `mgr` int NULL COMMENT "",
    `hiredate` datetime NULL COMMENT "",
    `sal` decimal(7, 2) NULL COMMENT "",
    `comm` decimal(7, 2) NULL COMMENT "",
    `deptno` int NULL COMMENT ""
);

Copy the jar package

Copy flink-sql-connector-oracle-cdc-2.2.1.jar, flink-connector-jdbc_2.11-1.13.6.jar, mysql-connector-j-8.0.31.jar to flink-1.13.6/lib/.

If flink is already started, you need to restart flink and load the effective jar package.

./bin/start-cluster.sh
./bin/sql-client.sh

Turn on checkpoint

SET execution.checkpointing.interval = 3s;
-- Create source table (oracle)
CREATE TABLE `oracle_source` (
    EMPNO bigint NOT NULL,
    ENAME VARCHAR(10),
    JOB VARCHAR(9),
    MGR int,
    HIREDATE timestamp,
    SAL decimal(7,2),
    COMM decimal(7,2),
    DEPTNO int,
    PRIMARY KEY(EMPNO) NOT ENFORCED
) WITH (
 'connector' = 'oracle-cdc',
 'hostname' = 'xx.xx.xx.xx',
 'port' = '1521',
 'username' = 'scott',
 'password' = 'tiger',
 'database-name' = 'ORCLCDB',
 'schema-name' = 'SCOTT',
 'table-name' = 'FLINKCDC_EMPT',
 'debezium.database.tablename.case.insensitive'='false',
 'debezium.log.mining.strategy'='online_catalog'
    );

-- Creating a sink table (mo)
CREATE TABLE IF NOT EXISTS `oracle_sink` (
    EMPNO bigint NOT NULL,
   ENAME VARCHAR(10),
   JOB VARCHAR(9),
   MGR int,
   HIREDATE timestamp,
   SAL decimal(7,2),
   COMM decimal(7,2),
   DEPTNO int,
    PRIMARY KEY(EMPNO) NOT ENFORCED
) with (
'connector' = 'jdbc',
 'url' = 'jdbc:mysql://ip:6001/test',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = '111',
  'table-name' = 'oracle_empt'
);

-- Read and insert the source table data into the sink table.
insert into `oracle_sink` select * from `oracle_source`;

Query correspondence table data in OmniFabric

select * from oracle_empt;