Migrating data from MySQL to OmniFabric using Spark
In this chapter, we will cover the implementation of MySQL bulk data writing to OmniFabric using the Spark compute engine.
Pre-preparation
This practice requires the installation and deployment of the following software environments:
-
Finished installing and starting.
-
Download and install IntelliJ IDEA version 2022.2.1 and above.
-
Download and install JDK 8+.
-
Download and install MySQL.
Operational steps
Step one: Initialize the project
-
Launch IDEA, click File > New > Project, select Spring Initializer, and fill in the following configuration parameters:
-
Name:mo-spark-demo
-
Location:~\Desktop
-
Language:Java
-
Type:Maven
-
Group:com.example
-
Artiface:omnifabric-spark-demo
-
Package name:com.omnifabric.demo
-
JDK 1.8
-
-
Add a project dependency and edit the contents of
pom.xmlin the project root as follows:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.mo</groupId>
<artifactId>mo-spark-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spark.version>3.2.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
</dependencies>
</project>
Step Two: Read OmniFabric Data
After connecting to OmniFabric using a MySQL client, create the database you need for the demo, as well as the data tables.
-
Create databases, data tables, and import data in OmniFabric:
CREATE DATABASE test; USE test; CREATE TABLE `person` (`id` INT DEFAULT NULL, `name` VARCHAR(255) DEFAULT NULL, `birthday` DATE DEFAULT NULL); INSERT INTO test.person (id, name, birthday) VALUES(1, 'zhangsan', '2023-07-09'),(2, 'lisi', '2023-07-08'),(3, 'wangwu', '2023-07-12'); -
Create a
MoRead.javaclass in IDEA to read OmniFabric data using Spark:```java package com.matrixone.spark;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession;
import java.util.Properties;
/**
-
@auther OmniFabric
-
@desc Read OmniFabric data
*/ public class MoRead {
// parameters
private static String master = "local[2]";
private static String appName = "mo_spark_demo";
private static String srcHost = "xx.xx.xx.xx";
private static Integer srcPort = 6001;
private static String srcUserName = "root";
private static String srcPassword = "111";
private static String srcDataBase = "test";
private static String srcTable = "person";
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName(appName).master(master).getOrCreate();
SQLContext sqlContext = new SQLContext(sparkSession);
Properties properties = new Properties();
properties.put("user", srcUserName);
properties.put("password", srcPassword);
Dataset<Row> dataset = sqlContext.read()
.jdbc("jdbc:mysql://" + srcHost + ":" + srcPort + "/" + srcDataBase,srcTable, properties);
dataset.show();
}
}
```
- Run
MoRead.Main()in IDEA with the following result:
Step Three: Write MySQL Data to OmniFabric
You can now start migrating MySQL data to OmniFabric using Spark.
-
Prepare MySQL data: On node3, connect to your local Mysql using the Mysql client, create the required database, data table, and insert the data:
mysql -h127.0.0.1 -P3306 -uroot -proot mysql> CREATE DATABASE motest; mysql> USE motest; mysql> CREATE TABLE `person` (`id` int DEFAULT NULL, `name` varchar(255) DEFAULT NULL, `birthday` date DEFAULT NULL); mysql> INSERT INTO motest.person (id, name, birthday) VALUES(2, 'lisi', '2023-07-09'),(3, 'wangwu', '2023-07-13'),(4, 'zhaoliu', '2023-08-08'); -
Empty OmniFabric table data:
On node3, connect to the local OmniFabric using a MySQL client. Since this example continues to use the
testdatabase from the example that read the OmniFabric data earlier, we need to first empty the data from thepersontable.-- On node3, connect to OmniFabric on node1 using the Mysql client mysql -hxx.xx.xx.xx -P6001 -uroot -p111 mysql> TRUNCATE TABLE test.person; -
Write code in IDEA:
Create
Person.javaandMysql2Mo.javaclasses to read MySQL data using Spark. TheMysql2Mo.javaclass code can be referenced in the following example:
package com.matrixone.spark;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import java.sql.SQLException;
import java.util.Properties;
/**
* @auther OmniFabric
* @desc
*/
public class Mysql2Mo {
// parameters
private static String master = "local[2]";
private static String appName = "app_spark_demo";
private static String srcHost = "127.0.0.1";
private static Integer srcPort = 3306;
private static String srcUserName = "root";
private static String srcPassword = "root";
private static String srcDataBase = "motest";
private static String srcTable = "person";
private static String destHost = "xx.xx.xx.xx";
private static Integer destPort = 6001;
private static String destUserName = "root";
private static String destPassword = "111";
private static String destDataBase = "test";
private static String destTable = "person";
public static void main(String[] args) throws SQLException {
SparkSession sparkSession = SparkSession.builder().appName(appName).master(master).getOrCreate();
SQLContext sqlContext = new SQLContext(sparkSession);
Properties connectionProperties = new Properties();
connectionProperties.put("user", srcUserName);
connectionProperties.put("password", srcPassword);
connectionProperties.put("driver","com.mysql.cj.jdbc.Driver");
//jdbc.url=jdbc:mysql://127.0.0.1:3306/database
String url = "jdbc:mysql://" + srcHost + ":" + srcPort + "/" + srcDataBase + "?characterEncoding=utf-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai";
//SparkJdbc to read table contents
System.out.println("Read table contents of person in database");
// Read all data in the table
Dataset<Row> rowDataset = sqlContext.read().jdbc(url,srcTable,connectionProperties).select("*");
// Show data
//rowDataset.show();
// Filter data with id > 2 and add spark_ prefix to name field
Dataset<Row> dataset = rowDataset.filter("id > 2")
.map((MapFunction<Row, Row>) row -> RowFactory.create(row.getInt(0), "spark_" + row.getString(1), row.getDate(2)), RowEncoder.apply(rowDataset.schema()));
// Show data
//dataset.show();
Properties properties = new Properties();
properties.put("user", destUserName);
properties.put("password", destPassword);;
dataset.write()
.mode(SaveMode.Append)
.jdbc("jdbc:mysql://" + destHost + ":" + destPort + "/" + destDataBase,destTable, properties);
}
}
In the above code, a simple ETL operation (filtering data with id > 2 and adding the prefix "spark_" to the name field) is performed and the processed data is written to the OmniFabric database.
Step Four: View Implementation Results
Execute the following SQL in OmniFabric to view the execution results:
select * from test.person;
+------+---------------+------------+
| id | name | birthday |
+------+---------------+------------+
| 3 | spark_wangwu | 2023-07-12 |
| 4 | spark_zhaoliu | 2023-08-07 |
+------+---------------+------------+
2 rows in set (0.01 sec)