Getting data in realtime from MySQL with Java
To follow this tutorial you will need some knowledge of Java programming and MySQL management tasks. You will need the Java JDK, Maven, MySQL and Node.js installed on your machine.
Applications are more complex than ever before.
And it’s not only about the increasing number of users that must be handled or reducing response times.
If your application has a database, probably you’ll need its data in other places as soon as it goes in.
In this context, change data capture is the approach you use to capture and deliver the changes in the database to other sources.
In this tutorial, you’re going to learn how to stream, in realtime, the changes made to a table in a MySQL database to a React app. Something like this:
Prerequisites
Here’s what you need to have installed to follow this tutorial:
- Java JDK (8 or superior)
- Maven
- MySQL Community Server 5.7 (5.6 and 5.5 should also work, but MySQL 8.0 is not yet supported)
- Node.js (6 or superior)
- Optionally, a Java IDE and a JavaScript editor.
You’ll need to have knowledge of:
- Java programming (intermediate level)
- Basic MySQL management tasks
- React (beginner level)
If you want to track and determine if something in a database has changed, you have three main approaches:
- Poll the database every X seconds and determine if something has changed using a timestamp, version number or status field.
- Use database or application-level triggers to execute a piece of code when something changes.
- Use the database transaction/replication log, which records every change to the database.
I’m going to use the third approach because I think is the most robust. It doesn’t waste resources (like polling) or hurt performance (like triggers).
We’ll read the database changes from the MySQL replication log using the library mysql-binlog-connector-java. Then, we’ll parse the event to extract the relevant data and publish it to a Pusher channel so it can be consumed by a React application.
Here’s the diagram that describes the above process:
For reference, here is a GitHub repository with all the code shown in this tutorial and instructions to run it.
Let’s start by creating a Pusher application.
Creating a Pusher application
If you haven’t already, create a free account at Pusher.
Then, go to your dashboard and create a Channels app, choosing a name, the cluster closest to your location, and optionally, React as the frontend tech and Java as the backend tech:
This will give you some sample code to get started:
Save your app id, key, secret and cluster values. We’ll need them later.
Configuring MySQL replication
The first thing you need to do is enable replication in MySQL.
Replication allows data from one MySQL server (the master) to be copied in an asynchronous way to one or more different MySQL servers (the slaves).
It works by writing all the changes in the master to a binary log file that then is synchronized between master and slaves, so these can apply all those changes.
For this tutorial, you don’t have to set up slave servers. We’re only interested in the binary log.
In the MySQL configuration file (usually at /etc/my.cnf
or C:\ProgramData\MySQL\MySQL Server 5.7\my.ini
), add the following lines:
[mysqld]
server-id = 1 #1
log_bin = /var/log/mysql/mysql-bin.log #2
expire_logs_days = 10 #3
max_binlog_size = 100M #4
binlog-format = row #5
Line #1 assigns an identifier to the server.
Line #2 specifies the directory where the logs will be stored. In Windows, it will be something like c:/logs/mysql-bin.log
. In Linux, make sure this directory has the necessary permissions for MySQL.
Line #3 and #4 are optional, they specify the expiration time and maximum size of the file.
Line #5 is important, it specifies the format in which the log will be written.
There are two main types of replication formats:
- Statement Based Replication (SBR), which replicates entire SQL statements, and
- Row Based Replication (RBR), which replicates only the changed rows.
For our purposes, RBR will be easier to work with. That’s why the file specifies this format.
Now restart the server.
In a terminal window, connect to the MySQL server using mysql
:
mysql -u <YOUR_USER> -p
Now choose or create a database and create the table that is going to be used by the application:
USE myDatabase
CREATE TABLE products(id int(11) not null auto_increment, name varchar(50) default null, price decimal(6,2), primary key (id));
It’s not recommended to work with a user with administrative privileges like root
so let’s create another user for the application:
CREATE USER '<YOUR_USER>'@'<YOUR_HOST>' IDENTIFIED BY '<YOUR_PASSWORD>';
Give it replication and table privileges:
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '<YOUR_USER>'@'<YOUR_HOST>';
GRANT ALL PRIVILEGES ON `<INSERT_YOUR_DB_NAME>`.* TO '<YOUR_USER>'@'<YOUR_HOST>';
FLUSH PRIVILEGES;
Now execute the following command to check if replication is enabled:
show master status;
It should show something like the following:
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 | 001 | | | |
+------------------+----------+--------------+------------------+-------------------+
It indicates the current log file and the position of the last statement.
If you’re getting <Empty set>
or something like that, execute:
show variables like "%log_bin%";
If replication is enabled, you should see something like the this:
+---------------------------------+--------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/log/mysql/mysql-bin |
| log_bin_index | /var/log/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+--------------------------------+
Otherwise double check your configuration. You can learn more about replication here.
Now let’s create the Java program that will read the binary log.
Reading MySQL’s binary log
It turns out that reading binary logs for change data capture is more common than you think.
Microsoft SQL Server has built-in support for change data capture.
Oracle offers GoldenGate for real-time data integration and replication.
MongoDB offers Change Streams to access real-time data changes.
For MySQL, there a lot of libraries for reading the binary log and stream changes as events to other sources. In this wiki, you can find many of these libraries.
Most of these libraries were made for enterprise system so they work natively with Apache Kafka, a publish and subscribe distributed platform that streams event and records to multiple sources.
But if you don’t need something like that, you can use mysql-binlog-connector-java, which allows you to read the binary log file and listen for changes as events from any Java program.
So open your favorite IDE and create a Maven project.
Or just create a directory structure like the following:
src
|- main
|- java
|- pom.xml
In the pom.xml
file specify the project information, java version, and mysql-binlog-connector-java
and pusher-http-java
as dependencies:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>MySQLRealtime</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.16.1</version>
</dependency>
<dependency>
<groupId>com.pusher</groupId>
<artifactId>pusher-http-java</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
</project>
Now create a class, let’s say scr/ReadLog.java
, with the code to connect to MySQL and listen for log events:
public class ReadLog {
public static void main(String[] args) throws IOException {
BinaryLogClient client =
new BinaryLogClient("localhost", 3306, "<MYSQL_USER>", "<MYSQL_PASSWROD>");
client.registerEventListener(event -> {
System.out.println(event);
});
client.connect();
}
}
If you execute this class, the program will block until an event is received from the log.
For example, this is an example of the events you receive when a database is created:
Event{header=EventHeaderV4{timestamp=1524607461000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=463, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1524607461000, eventType=QUERY, serverId=1, headerLength=19, dataLength=75, nextPosition=557, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='CREATE DATABASE test'}}
You receive an event for the creation of the global transaction identifier (GTID) and the actual query (CREATE DATABASE test
).
Here’s an example of the events you receive when a table is created:
Event{header=EventHeaderV4{timestamp=1524609716000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1696, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1524609716000, eventType=QUERY, serverId=1, headerLength=19, dataLength=181, nextPosition=1896, flags=0}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='create table products(id int(11) not null auto_increment, name varchar(50) default null, price decimal(6,2), primary key (id))'}}
When you insert a record:
Event{header=EventHeaderV4{timestamp=1524609804000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1961, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1524609804000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=2033, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1524609804000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=38, nextPosition=2090, flags=0}, data=TableMapEventData{tableId=109, database='test', table='products', columnTypes=3, 15, -10, columnMetadata=0, 50, 518, columnNullability={1, 2}}}
Event{header=EventHeaderV4{timestamp=1524609804000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=2140, flags=0}, data=WriteRowsEventData{tableId=109, includedColumns={0, 1, 2}, rows=[
[1, laptop, 999.99]
]}}
Event{header=EventHeaderV4{timestamp=1524609804000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=2171, flags=0}, data=XidEventData{xid=28}}
When you update a record:
Event{header=EventHeaderV4{timestamp=1524609897000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=2236, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1524609897000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=2308, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1524609897000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=38, nextPosition=2365, flags=0}, data=TableMapEventData{tableId=109, database='test', table='products', columnTypes=3, 15, -10, columnMetadata=0, 50, 518, columnNullability={1, 2}}}
Event{header=EventHeaderV4{timestamp=1524609897000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=47, nextPosition=2431, flags=0}, data=UpdateRowsEventData{tableId=109, includedColumnsBeforeUpdate={0, 1, 2}, includedColumns={0, 1, 2}, rows=[
{before=[1, laptop, 999.99], after=[1, laptop, 100.01]}
]}}
Event{header=EventHeaderV4{timestamp=1524609897000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=2462, flags=0}, data=XidEventData{xid=29}}
When you delete two records:
Event{header=EventHeaderV4{timestamp=1524610005000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=2805, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1524610005000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=2877, flags=8}, data=QueryEventData{threadId=6, executionTime=0, errorCode=0, database='test', sql='BEGIN'}}
Event{header=EventHeaderV4{timestamp=1524610005000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=38, nextPosition=2934, flags=0}, data=TableMapEventData{tableId=109, database='test', table='products', columnTypes=3, 15, -10, columnMetadata=0, 50, 518, columnNullability={1, 2}}}
Event{header=EventHeaderV4{timestamp=1524610005000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=49, nextPosition=3002, flags=0}, data=DeleteRowsEventData{tableId=109, includedColumns={0, 1, 2}, rows=[
[1, laptop, 100.01],
[2, laptop v2, 999.99]
]}}
Event{header=EventHeaderV4{timestamp=1524610005000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=3033, flags=0}, data=XidEventData{xid=31}}
This way, you can see that data manipulation (DML) statements are mapped this way:
- Insert statements have the event type
EXT_WRITE_ROWS
and you can find the information of the insertion in a class of typeWriteRowsEventData
. - Insert statements have the event type
EXT_UPDATE_ROWS
and you can find the information of the update in a class of typeUpdateRowsEventData
. - Insert statements have the event type
EXT_DELETE_ROWS
and you can find the information of the deletion in a class of typeDeleteRowsEventData
.
In addition, all of these events are preceded by a TABLE_MAP
event with information about the table and columns that are being modified.
So we need to listen for these events.
The only problem is that if you need to keep track of the changes of many tables in a separate way, you cannot rely on the tableId
field because this ID may change between executions.
You can change the way events are deserialized but maybe a simpler approach would be to keep track of the table names and IDs in a map.
Taking into account this, you can modify the program in this way:
public class ReadLog {
public static void main(String[] args) throws IOException {
final Map<String, Long> tableMap = new HashMap<String, Long>();
BinaryLogClient client =
new BinaryLogClient("localhost", 3306, "<MYSQL_USER>", "<MYSQL_PASSWROD>");
client.registerEventListener(event -> {
EventData data = event.getData();
if(data instanceof TableMapEventData) {
TableMapEventData tableData = (TableMapEventData)data;
tableMap.put(tableData.getTable(), tableData.getTableId());
}
});
client.connect();
}
}
Notice how the program checks the subtype of EventData
to get the information.
Now, let’s add the Pusher object with the information you got when you created the app:
public class ReadLog {
public static void main(String[] args) throws IOException {
final Map<String, Long> tableMap = new HashMap<String, Long>();
Pusher pusher =
new Pusher("<PUSHER_APP_ID>", "<PUSHER_APP_KEY>", "<PUSHER_APP_SECRET>");
pusher.setCluster("<PUSHER_APP_CLUSTER>");
pusher.setEncrypted(true);
// ...
}
}
And you can check if the event is an insert, update or delete, you can check if it corresponds to the product
table, extract the product information and publish it as a map to a product
channel.
Here’s the code for INSERT
events:
public class ReadLog {
public static void main(String[] args) throws IOException {
// ...
client.registerEventListener(event -> {
EventData data = event.getData();
if(data instanceof TableMapEventData) {
// ...
} else if(data instanceof WriteRowsEventData) {
WriteRowsEventData eventData = (WriteRowsEventData)data;
if(eventData.getTableId() == tableMap.get(PRODUCT_TABLE_NAME)) {
for(Object[] product: eventData.getRows()) {
pusher.trigger(
PRODUCT_TABLE_NAME, "insert", getProductMap(product)
);
}
}
}
});
client.connect();
}
static Map<String, String> getProductMap(Object[] product) {
Map<String, String> map = new HashMap<>();
map.put("id", java.lang.String.valueOf(product[0]));
map.put("name", java.lang.String.valueOf(product[1]));
map.put("price", java.lang.String.valueOf(product[2]));
return map;
}
}
For the update event, only the after
data is needed. The before
and after
fields are formatted as a map entry, where after
is the value part of this structure:
public class ReadLog {
public static void main(String[] args) throws IOException {
// ...
client.registerEventListener(event -> {
EventData data = event.getData();
if(data instanceof TableMapEventData) {
// ...
} else if(data instanceof WriteRowsEventData) {
// ...
} else if(data instanceof UpdateRowsEventData) {
UpdateRowsEventData eventData = (UpdateRowsEventData)data;
if(eventData.getTableId() == tableMap.get(PRODUCT_TABLE_NAME)) {
for(Map.Entry<Serializable[], Serializable[]> row :
eventData.getRows()) {
pusher.trigger(
PRODUCT_TABLE_NAME, "update", getProductMap(row.getValue())
);
}
}
}
});
client.connect();
}
// ...
}
For the delete event, you’ll only need the ID of the deleted record:
public class ReadLog {
public static void main(String[] args) throws IOException {
// ...
client.registerEventListener(event -> {
EventData data = event.getData();
if(data instanceof TableMapEventData) {
// ...
} else if(data instanceof WriteRowsEventData) {
// ...
} else if(data instanceof UpdateRowsEventData) {
// ...
} else if(data instanceof DeleteRowsEventData) {
DeleteRowsEventData eventData = (DeleteRowsEventData)data;
if(eventData.getTableId() == tableMap.get(PRODUCT_TABLE_NAME)) {
for(Object[] product: eventData.getRows()) {
pusher.trigger(PRODUCT_TABLE_NAME, "delete", product[0]);
}
}
}
});
client.connect();
}
// ...
}
Now, any application listening for the product
channel will get the information about the database changes.
Let’s build a React client to show this.
Building the React application
Let’s use create-react-app to bootstrap a React app.
Execute the following command in a terminal window to create a new app:
npx create-react-app my-app
Now go into the app directory and install the Pusher dependency with npm
:
cd my-app
npm install --save pusher-js
Open the file src/App.css
and add the following CSS styles:
.table {
border: 2px solid #FFFFFF;
width: 100%;
text-align: center;
border-collapse: collapse;
}
.table td, .table th {
border: 1px solid #FFFFFF;
padding: 3px 4px;
}
.table tbody td {
font-size: 13px;
}
.table thead {
background: #FFFFFF;
border-bottom: 4px solid #333333;
}
.table thead th {
font-size: 15px;
font-weight: bold;
color: #333333;
text-align: center;
border-left: 2px solid #333333;
}
.table thead th:first-child {
border-left: none;
}
Now let’s create a new component, src/Table.js
, to show the product information (received as a property) in a table:
import React, { Component } from 'react';
import './App.css';
export default class Table extends Component {
render() {
const rowsMapped =this.props.rows.map(row => (
<tr key={row.id}>
<td>{row.id}</td>
<td>{row.name}</td>
<td>{row.price}</td>
</tr>
));
return (
<table className="table">
<thead>
<tr>
<th>ID</th>
<th>Name</th>
<th>Price</th>
</tr>
</thead>
<tbody>
{rowsMapped}
</tbody>
</table>
);
}
}
Now modify the file src/App.js
to import this component and Pusher:
import React, { Component } from 'react';
import logo from './logo.svg';
import './App.css';
import Table from './Table.js';
import Pusher from 'pusher-js';
class App extends Component {
// ...
}
Let’s have the array of rows as the state of this component, and while we are at the constructor, let’s bind the functions we are going to use to insert, update and delete items:
// ...
class App extends Component {
constructor(props) {
super(props);
this.state = {rows: []};
this.insert = this.insert.bind(this);
this.update = this.update.bind(this);
this.delete = this.delete.bind(this);
}
}
In the componentDidMount
method, let’s configure the Pusher object subscribe to the channel to get the events:
// ...
class App extends Component {
constructor(props) {
// ...
}
componentDidMount() {
this.pusher = new Pusher('<PUSHER_APP_KEY>', {
cluster: '<PUSHER_APP_CLUSTER>',
encrypted: true,
});
this.channel = this.pusher.subscribe('products');
this.channel.bind('insert', this.insert);
this.channel.bind('update', this.update);
this.channel.bind('delete', this.delete);
}
}
These are the functions to insert, update and delete items from this.state.rows
:
// ...
class App extends Component {
// ...
insert(data) {
this.setState(prevState => ({
rows: [ data, ...prevState.rows ]
}));
}
update(data) {
this.setState(prevState => ({
rows: prevState.rows.map(el =>
el.id === data.id ? data : el
)
}));
}
delete(id) {
this.setState(prevState => ({
rows: prevState.rows.filter(el => el.id !== String(id))
}));
}
}
Finally, the render
function will look like this:
// ...
class App extends Component {
// ...
render() {
return (
<div className="App">
<header className="App-header">
<img src={logo} className="App-logo" alt="logo" />
<h1 className="App-title">Welcome to React</h1>
</header>
<Table rows={this.state.rows} />
</div>
);
}
}
And that’s it.
Let’s test the application.
Testing the application
Make sure the MySQL server is running with replication enabled.
If you’re working with an IDE, run the class ReadLog
.
Otherwise, you can add this property to the pom.xml
file:
<properties>
...
<exec.mainClass>ReadLog</exec.mainClass>
</properties>
And execute this command to run the app:
mvn exec:java
For the React app, inside the app directory, execute:
npm start
A browser window will open http://localhost:3000/, and from there, you can connect to the database with the mysql
client and insert, update or delete records in the product
table:
Conclusion
In this tutorial, you have learned how to turn MySQL into a realtime database by using the replication log to publish the changes made to a database using Pusher.
You used mysql-binlog-connector-java to get the insert, update and delete events from the log. However, at the time of this writing, the current version of MySQL (MySQL 8.0.11) is not yet supported.
But there are other options. As mentioned before, in this wiki you can find more libraries to work with MySQL binary log.
In this blog post, you can find another way to extract data from MySQL using Alibaba’s open sourced Canal project.
The applications that this tutorial present are simple but they show how this change data capture using transaction logs work.
They can be extended in many ways:
- Support for more tables
- Detect when the Java application goes down and have to be restarted
- Read from the log to start at a given position
- Change the React implementation to support a bigger table in an efficient way
Remember that all the source code for this applications is available on Github.
4 May 2018
by Esteban Herrera