广告位联系
返回顶部
分享到

Springboot2.3.x整合Canal的代码

java 来源:互联网 作者:秩名 发布时间:2022-02-20 08:58:44 人浏览
摘要

一、故事背景 前言 最近工作中遇到了一个数据同步的问题 我们这边系统的一个子业务需要依赖另一个系统的数据,当另一个系统数据变更时,我们这边的数据库要对数据进行同步 那么

一、故事背景

前言…

最近工作中遇到了一个数据同步的问题

我们这边系统的一个子业务需要依赖另一个系统的数据,当另一个系统数据变更时,我们这边的数据库要对数据进行同步…

那么我自己想到的同步方式呢就两种:

1、MQ订阅,另一个系统数据变更后将变更数据方式到MQ 我们这边订阅接受

2、数据库的触发器

但是呢,两者都被组长paas了!

1、MQ呢,会造成代码侵入,但是另一个系统暂时不会做任何代码更改…

2、数据库的触发器会直接跟生产数据库强关联,会抢占资源,甚至有可能造成生产数据库的不稳定…

对此很是苦恼…

于是啊,只能借由强大的google、百度,看看能不能解决我这个问题!一番搜索,有学习了一个很有趣的东西…

Canal

二、什么是Canal

canal:阿里开源mysql binlog 数据组件

官网解释的相当详细了(国产牛逼)…下边我也是照搬过来的…

官网地址如下:https://github.com/alibaba/canal/wiki

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析

image-20200926161534433

canal [k?’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

canal呢,实际是就是运用了Mysql的主从复制原理…

MySQL主从复制实现

image-20200926161859141

复制遵循三步过程:

  • 主服务器将更改记录到binlog中(这些记录称为binlog事件,可以通过来查看show binary events)
  • 从服务器将主服务器的二进制日志事件复制到其中继日志。
  • 中继日志中的从服务器重做事件随后将更新其旧数据。

如何运作

image-20200926161913848

原理很简单:

  • Canal模拟MySQL从站的交互协议,伪装成MySQL从站,然后将转储协议发送到MySQL主服务器。
  • MySQL Master接收到转储请求,并开始将二进制日志推送到slave(即运河)。
  • 运河将二进制日志对象解析为其自己的数据类型(最初为字节流)

通过官网的介绍,让我们了解到,canal实际上就是伪装为了一个从库,我们只需要订阅到数据变更的主库,那么canal就会以从库的身份读取到其主库的binlog日志!我们拿到canal解析好的binlog日志信息,就等于拿到了变更的数据啦!…

这样的话呢,我们即保证了不影响其系统数据库正常使用,又不会侵入他的项目代码,一举两得

ok,接下来开始实战篇…

三、Canal安装

(1)事前准备

(1)数据库开启binlog

使用canal呢,有一个前提条件,即被订阅的数据库需要开启binlog

如何查看是否开启binlog呢?

登录服务器上数据库或在可视化工具中 执行查询语句: 如果出现 log_bin ON 表示已开启Binlog

1

show variables like 'log_bin';

image-20200927230024401

如果服务器上的数据库为自己安装的,则找到配置文件my.conf 添加以下内容,如果买的云实例,则询问厂商开启即可

image-20200926164518333

在my.conf文件中的 [mysqld] 下添加以下三行内容

1

2

3

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式 读行

server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

(2)数据库新建账号,开启MySQL slav权限

canaltest:作为slave 角色的账户 Canal123…:为密码

1

2

3

4

CREATE USER canaltest IDENTIFIED BY 'Canal123..'; 

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canaltest'@'%';

GRANT ALL PRIVILEGES ON *.* TO 'canaltest'@'%' ;

FLUSH PRIVILEGES;

image-20200926165912070

连接测试

image-20200926170124796

那么到这里,准备工作就好了!

可能呢,有的小伙伴有点懵,你这是在干啥?那么咱们就来理那么一理! 敲黑板了哈!

image-20200926170334603

1、事前准备,是针对于订阅数据库的(即主库)

2、实际步骤也就两步 1:更改配置,开启binlog 2:设置新账号,赋予slave权限,供canal读取Binlog桥梁使用

3、以上操作与canal本身没啥关系,仅仅是使用canal的前提条件罢辽…

(2)Canal Admin 安装

canal admin 是 一个可视化的 canal web管理运维工程,脱离以往服务器运维,面向web…

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

canal-admin的限定依赖:

  • MySQL,用于存储配置和节点等相关数据
  • canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)
  • 需要JRE 环境 (安装JDK)

下载

1

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

解压

1

2

mkdir /usr/local/canal-admin

tar zxvf canal.admin-1.1.4.tar.gz  -C /usr/local/canal-admin

进入canal-admin目录下查看

1

cd /usr/local/canal-admin

修改配置

1

vim conf/application.yml

里边的配置 按照自己的实际情况更改…

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

server:

  port: 8089

spring:

  jackson:

    date-format: yyyy-MM-dd HH:mm:ss

    time-zone: GMT+8

#这里是配置canal-admin 所依赖的数据库,,,存放web管理中设置的配置等,,,

spring.datasource:

  address: 127.0.0.1:3306

  database: canal_manager

  username: root   

  password: 123456

  driver-class-name: com.mysql.jdbc.Driver

  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false

  hikari:

    maximum-pool-size: 30

    minimum-idle: 1

# 连接所用的账户密码

canal:

  adminUser: admin

  adminPasswd: leitest

导入canaladmin 所需要的数据库文件

这里需要注意了,要和 application.yml中的数据库名对应,你可以选择命令导入,也可以Navicat 可视化拖sql文件导入…一切…看你喜欢.

我这个玩canal的服务器呢,是新安装的,mysql直接用docker安装即可,具体可查看我的博客:

Docker在CentOS7下不能下载镜像timeout的解决办法(图解)

CentOS 7安装Docker

需要注意的是,使用docker 安装的mysql 是无法直接使用 mysql -uroot -p 命令的哦,需要先将脚本复制到容器中,docker不熟练或觉得麻烦的同鞋,请直接使用Navicat可视化工具…

导入canal-admin服务所必需的sql文件

如果是服务器软件软件安装的mysql 则直接执行以下命令即可

1

2

3

4

mysql -uroot -p

#.........

# 导入初始化SQL

> source conf/canal_manager.sql

image-20200926215220008

启动

直接执行启动脚本即可

1

2

cd bin

./startup.sh

image-20200926220024950

默认账户密码:

1

admin:123456

image-20200926220129110

(3)Canal Server 安装

canal-server 才是canal的核心我们前边所讲的canal的功能,实际上讲述的就是canal-server的功能…admin 仅仅只是一个web管理而已,不要搞混主次关系…

下载

1

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压

1

2

mkdir /usr/local/canal-server

tar zxvf canal.deployer-1.1.4.tar.gz  -C /usr/local/canal-server

启动,并连接到canal-admin web端

首先,我们需要修改配置文件

1

2

3

cd /usr/local/canal-server

 

vim /conf/canal_local.properties

image-20200926221410324

image-20200926221344585

注意了,密码如何加密!!!

要记得,前边 canal-admin 的 aplication.yml 中设置了账户密码为 admin:leitest

1

2

3

4

# 连接所用的账户密码

canal:

  adminUser: admin

  adminPasswd: leitest

所以,我们这里需要对明文 leitest 加密并替换即可

使用数据库函数 PASSWORD 加密即可

SELECT PASSWORD(‘要加密的明文’),然后去掉前边的* 号就行

image-20200926221859106

启动并连接到admin

1

sh bin/startup.sh local

查看端口看是否有 11110 、11111、11112

netstat -untlp 看了一下,发现没有,说明server 没有启动成功

image-20200926222317087

看下日志

1

vim logs/canal/canal.log

image-20200926222442734

解决办法:

1、canal-admin 先停止后从起

2、canal server 先以之前的形式运行,不输入后边 local 命令

3、关闭canal server

4、再以canal server 连接 admin 形式启动

image-20200926230725865

admin页面上新建server

image-20200926230834367

修改配置,注释 (instance连接信息,我们还是以前边设置的 admin:leitest 为准,所有这里需要注释掉,如果不注释,那么我们代码中连接则需要使用此账号以及密码)

image-20200926234156162

接下来咱们创建instance

如何理解server 和instance 呢,我认为,可以把它当做 java 中的 class 和 bean 即 类和对象

server 为类 instance 为其具体的实例对象 ,可创建多个不同的实例…

而我们这边监听到主库变化的呢,则是根据业务,对不同的实例即(instance )做不同配置即可…

image-20200926231123562

image-20200926231135593

image-20200926231429803

image-20200926231516786

根据自己情况进行过滤数据

canal.instance.filter.regex mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子:1. 所有表:.* or .\… 2. canal schema下所有表: canal\…* 3. canal下的以canal打头的表:canal\.canal.* 4. canal schema下的一张表:canal\.test15. 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)    
canal.instance.filter.druid.ddl 是否使用druid处理所有的ddl解析来获取库和表名 true  
canal.instance.filter.query.dcl 是否忽略dcl语句 false  
canal.instance.filter.query.dml 是否忽略dml语句 (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档) false  
canal.instance.filter.query.ddl 是否忽略ddl语句 false  

更多设置请见官网:https://github.com/alibaba/canal/wiki/AdminGuide

如此一来,一个简单的canal环境就搭建好了,接下来,咱们开始测试吧!

(4)springboot demo示例

引入canal所需依赖

1

2

3

4

5

<dependency>

            <groupId>com.alibaba.otter</groupId>

            <artifactId>canal.client</artifactId>

            <version>1.1.4</version>

        </dependency>

配置

1

2

3

4

5

6

7

8

9

10

11

canal:

  # instance 实例所在ip

  host: 192.168.96.129

  # tcp通信端口

  port: 11111

  # 账号  canal-admin application.yml 设置的

  username: admin

  # 密码

  password: leitest

  #实例名称

  instance: test

代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

package com.leilei;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.boot.ApplicationArguments;

import org.springframework.boot.ApplicationRunner;

import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

import java.util.List;

/**

 * @author lei

 * @version 1.0

 * @date 2020/9/27 22:23

 * @desc 读取binlog日志

 */

@Component

public class ReadBinLogService implements ApplicationRunner {

    @Value("${canal.host}")

    private String host;

    @Value("${canal.port}")

    private int port;

    @Value("${canal.username}")

    private String username;

    @Value("${canal.password}")

    private String password;

    @Value("${canal.instance}")

    private String instance;

    @Override

    public void run(ApplicationArguments args) throws Exception {

        CanalConnector conn = getConn();

        while (true) {

            conn.connect();

            //订阅实例中所有的数据库和表

            conn.subscribe(".*\\..*");

            // 回滚到未进行ack的地方

            conn.rollback();

            // 获取数据 每次获取一百条改变数据

            Message message = conn.getWithoutAck(100);

            long id = message.getId();

            int size = message.getEntries().size();

            if (id != -1 && size > 0) {

                // 数据解析

                analysis(message.getEntries());

            }else {

                Thread.sleep(1000);

            }

            // 确认消息

            conn.ack(message.getId());

            // 关闭连接

            conn.disconnect();

        }

    }

    /**

     * 数据解析

     */

    private void analysis(List<CanalEntry.Entry> entries) {

        for (CanalEntry.Entry entry : entries) {

            // 只解析mysql事务的操作,其他的不解析

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) {

                continue;

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {

            // 解析binlog

            CanalEntry.RowChange rowChange = null;

            try {

                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

            } catch (Exception e) {

                throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);

            if (rowChange != null) {

                // 获取操作类型

                CanalEntry.EventType eventType = rowChange.getEventType();

                // 获取当前操作所属的数据库

                String dbName = entry.getHeader().getSchemaName();

                // 获取当前操作所属的表

                String tableName = entry.getHeader().getTableName();

                // 事务提交时间

                long timestamp = entry.getHeader().getExecuteTime();

                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {

                    dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp);

                    System.out.println("-------------------------------------------------------------");

                }

     * 解析具体一条Binlog消息的数据

     *

     * @param dbName    当前操作所属数据库名称

     * @param tableName 当前操作所属表名称

     * @param eventType 当前操作类型(新增、修改、删除)

    private static void dataDetails(List<CanalEntry.Column> beforeColumns,

                                    List<CanalEntry.Column> afterColumns,

                                    String dbName,

                                    String tableName,

                                    CanalEntry.EventType eventType,

                                    long timestamp) {

        System.out.println("数据库:" + dbName);

        System.out.println("表名:" + tableName);

        System.out.println("操作类型:" + eventType);

        if (CanalEntry.EventType.INSERT.equals(eventType)) {

            System.out.println("新增数据:");

            printColumn(afterColumns);

        } else if (CanalEntry.EventType.DELETE.equals(eventType)) {

            System.out.println("删除数据:");

            printColumn(beforeColumns);

        } else {

            System.out.println("更新数据:更新前数据--");

            System.out.println("更新数据:更新后数据--");

        System.out.println("操作时间:" + timestamp);

    private static void printColumn(List<CanalEntry.Column> columns) {

        for (CanalEntry.Column column : columns) {

            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());

     * 获取连接

    public CanalConnector getConn() {

        return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);

}

测试查看

数据库修改数据库时

image-20200927222635688

数据新增数据时

image-20200927222734833

删除数据(把我们才添加的小明删掉)

image-20200927222834002

当我们操作监控的数据库DM L操作的时候呢,会被canal监听到…我们呢,通过canal监听,拿到修改的库,修改的表,修改的字段,便可以根据自己业务进行数据处理了!

哎,这个时候啊,可能有小伙伴就要问了,那么,我能不能直接获取其操作的sql语句呢?

目前,我是自己解析其列来手动拼接的sql语句实现了

话不多说,先上效果:

canal 监听到主库sql变化----> update students set  id = '2', age = '999', name = '小三', city = '11', date = '2020-09-27 17:41:44', birth = '2020-09-27 18:00:48' where id=2
canal 监听到主库sql变化----> delete from students where id=6
canal 监听到主库sql变化----> insert into students (id,age,name,city,date,birth) VALUES ('89','98','测试新增','深圳','2020-09-27 22:46:53','')
canal 监听到主库sql变化----> update students set  id = '89', age = '98', name = '测试新增', city = '深圳', date = '2020-09-27 22:46:53', birth = '2020-09-27 22:46:56' where id=89

image-20200927224716304

实际上呢,我们也就是拿到其执行前列数据变化 执行后列数据变化,自己拼接了一个sql罢了…附上代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

package com.leilei;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry.*;

import com.alibaba.otter.canal.protocol.Message;

import com.alibaba.otter.canal.protocol.exception.CanalClientException;

import com.google.protobuf.InvalidProtocolBufferException;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.boot.ApplicationArguments;

import org.springframework.boot.ApplicationRunner;

import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

import java.util.List;

import java.util.Queue;

import java.util.concurrent.ConcurrentLinkedQueue;

/**

 * @author lei

 * @version 1.0

 * @date 2020/9/27 22:33

 * @desc 读取binlog日志

 */

@Component

public class ReadBinLogToSql implements ApplicationRunner {

    //读取的binlog sql 队列缓存 一边Push 一边poll

    private Queue<String> canalQueue = new ConcurrentLinkedQueue<>();

    @Value("${canal.host}")

    private String host;

    @Value("${canal.port}")

    private int port;

    @Value("${canal.username}")

    private String username;

    @Value("${canal.password}")

    private String password;

    @Value("${canal.instance}")

    private String instance;

    @Override

    public void run(ApplicationArguments args) throws Exception {

        CanalConnector conn = getConn();

        while (true) {

            try {

                conn.connect();

                //订阅实例中所有的数据库和表

                conn.subscribe(".*\\..*");

                // 回滚到未进行ack的地方

                conn.rollback();

                // 获取数据 每次获取一百条改变数据

                Message message = conn.getWithoutAck(100);

                long id = message.getId();

                int size = message.getEntries().size();

                if (id != -1 && size > 0) {

                    // 数据解析

                    analysis(message.getEntries());

                } else {

                    Thread.sleep(1000);

                }

                // 确认消息

                conn.ack(message.getId());

            } catch (CanalClientException | InvalidProtocolBufferException | InterruptedException e) {

                e.printStackTrace();

            } finally {

                // 关闭连接

                conn.disconnect();

            }

        }

    }

    private void analysis(List<Entry> entries) throws InvalidProtocolBufferException {

        for (Entry entry : entries) {

            if (EntryType.ROWDATA == entry.getEntryType()) {

                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

                EventType eventType = rowChange.getEventType();

                if (eventType == EventType.DELETE) {

                    saveDeleteSql(entry);

                } else if (eventType == EventType.UPDATE) {

                    saveUpdateSql(entry);

                } else if (eventType == EventType.INSERT) {

                    saveInsertSql(entry);

                }

            }

        }

    }

    /**

     * 保存更新语句

     *

     * @param entry

     */

    private void saveUpdateSql(Entry entry) {

        try {

            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

            List<RowData> dataList = rowChange.getRowDatasList();

            for (RowData rowData : dataList) {

                List<Column> afterColumnsList = rowData.getAfterColumnsList();

                StringBuffer sql = new StringBuffer("update " +

                        entry.getHeader().getTableName() + " set ");

                for (int i = 0; i < afterColumnsList.size(); i++) {

                    sql.append(" ")

                            .append(afterColumnsList.get(i).getName())

                            .append(" = '").append(afterColumnsList.get(i).getValue())

                            .append("'");

                    if (i != afterColumnsList.size() - 1) {

                        sql.append(",");

                    }

                }

                sql.append(" where ");

                List<Column> oldColumnList = rowData.getBeforeColumnsList();

                for (Column column : oldColumnList) {

                    if (column.getIsKey()) {

                        sql.append(column.getName()).append("=").append(column.getValue());

                        break;

                    }

                }

                canalQueue.add(sql.toString());

            }

        } catch (InvalidProtocolBufferException e) {

            e.printStackTrace();

        }

    }

    /**

     * 保存删除语句

     *

     * @param entry

     */

    private void saveDeleteSql(Entry entry) {

        try {

            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

            List<RowData> rowDatasList = rowChange.getRowDatasList();

            for (RowData rowData : rowDatasList) {

                List<Column> columnList = rowData.getBeforeColumnsList();

                StringBuffer sql = new StringBuffer("delete from " +

                        entry.getHeader().getTableName() + " where ");

                for (Column column : columnList) {

                    if (column.getIsKey()) {

                        sql.append(column.getName()).append("=").append(column.getValue());

                        break;

                    }

                }

                canalQueue.add(sql.toString());

            }

        } catch (InvalidProtocolBufferException e) {

            e.printStackTrace();

        }

    }

    /**

     * 保存插入语句

     *

     * @param entry

     */

    private void saveInsertSql(Entry entry) {

        try {

            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

            List<RowData> datasList = rowChange.getRowDatasList();

            for (RowData rowData : datasList) {

                List<Column> columnList = rowData.getAfterColumnsList();

                StringBuffer sql = new StringBuffer("insert into " +

                        entry.getHeader().getTableName() + " (");

                for (int i = 0; i < columnList.size(); i++) {

                    sql.append(columnList.get(i).getName());

                    if (i != columnList.size() - 1) {

                        sql.append(",");

                    }

                }

                sql.append(") VALUES (");

                for (int i = 0; i < columnList.size(); i++) {

                    sql.append("'" + columnList.get(i).getValue() + "'");

                    if (i != columnList.size() - 1) {

                        sql.append(",");

                    }

                }

                sql.append(")");

                canalQueue.add(sql.toString());

            }

        } catch (InvalidProtocolBufferException e) {

            e.printStackTrace();

        }

    }

    /**

     * 获取连接

     */

    public CanalConnector getConn() {

        return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);

    }

    /**

     * 模拟消费canal转换的sql语句

     */

    public void executeQueueSql() {

        int size = canalQueue.size();

        for (int i = 0; i < size; i++) {

            String sql = canalQueue.poll();

            System.out.println("canal 监听到主库sql变化----> " + sql);

        }

    }

}

当然了,这只是简单的demo 演示,您可根据自己的业务进行修改完善即可…

上边的安装步骤呢,我也是不断的测试过,没有问题,当然可能或多或少有些坑没有踩到,但是如果您按照我的步骤来,大概率是一马平川的…

附上项目源码:springboot-canal https://github.com/leilei0220/springboot-learn


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。

您可能感兴趣的文章 :

原文链接 : https://blog.csdn.net/leilei1366615/article/details/108819651
    Tag :
相关文章
  • SpringBoot自定义错误处理逻辑介绍

    SpringBoot自定义错误处理逻辑介绍
    1. 自定义错误页面 将自定义错误页面放在 templates 的 error 文件夹下,SpringBoot 精确匹配错误信息,使用 4xx.html 或者 5xx.html 页面可以打印错误
  • Java实现手写一个线程池的代码

    Java实现手写一个线程池的代码
    线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和
  • Java实现断点续传功能的代码

    Java实现断点续传功能的代码
    题目实现:网络资源的断点续传功能。 二、解题思路 获取要下载的资源网址 显示网络资源的大小 上次读取到的字节位置以及未读取的字节
  • 你可知HashMap为什么是线程不安全的
    HashMap 的线程不安全 HashMap 的线程不安全主要体现在下面两个方面 在 jdk 1.7 中,当并发执行扩容操作时会造成环形链和数据丢失的情况 在
  • ArrayList的动态扩容机制的介绍

    ArrayList的动态扩容机制的介绍
    对于 ArrayList 的动态扩容机制想必大家都听说过,之前的文章中也谈到过,不过由于时间久远,早已忘却。 所以利用这篇文章做做笔记,加
  • JVM基础之字节码的增强技术介绍

    JVM基础之字节码的增强技术介绍
    字节码增强技术 在上文中,着重介绍了字节码的结构,这为我们了解字节码增强技术的实现打下了基础。字节码增强技术就是一类对现有字
  • Java中的字节码增强技术

    Java中的字节码增强技术
    1.字节码增强技术 字节码增强技术就是一类对现有字节码进行修改或者动态生成全新字节码文件的技术。 参考地址 2.常见技术 技术分类 类
  • Redis BloomFilter布隆过滤器原理与实现

    Redis BloomFilter布隆过滤器原理与实现
    Bloom Filter 概念 布隆过滤器(英语:Bloom Filter)是1970年由一个叫布隆的小伙子提出的。它实际上是一个很长的二进制向量和一系列随机映射
  • Java C++算法题解leetcode801使序列递增的最小交换次

    Java C++算法题解leetcode801使序列递增的最小交换次
    题目要求 思路:状态机DP 实现一:状态机 Java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Solution { public int minSwap(int[] nums1, int[] nums2) { int n
  • Mybatis结果集映射与生命周期介绍

    Mybatis结果集映射与生命周期介绍
    一、ResultMap结果集映射 1、设计思想 对简单的语句做到零配置,对于复杂一点的语句,只需要描述语句之间的关系就行了 2、resultMap的应用场
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计