目录
项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求
原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。
具体步骤
一、打开mysql的binlog
1.1 打开 MySQL 配置文件 my.cnf
(通常位于 /etc/mysql/my.cnf
或 /etc/my.cnf
)并添加或修改以下设置:
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row
注意 :确保binlog-format是 row模式
1.2 重启mysql服务
具体命令根据你的服务器类型决定
1.3 验证是否生效
SHOW MASTER STATUS;
二、 部署canal 服务端(docker)
2.1 下载启动脚本(可能需要梯子)
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
2.2 启动服务
# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scan=false \
-e canal.destinations=test \
-e canal.instance.master.address=127.0.0.1:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=.*\\..*
参数解释:
-e canal.auto.scan=false:
关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。
-e canal.destinations=test:
设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。
-e canal.instance.master.address=127.0.0.1:3306:
指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。
-e canal.instance.dbUsername=canal:
设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。
-e canal.instance.dbPassword=canal:
设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。
-e canal.instance.connectionCharset=UTF-8:
设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。
-e canal.instance.tsdb.enable=true:
启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。
-e canal.instance.gtidon=false:
关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。
-e canal.instance.filter.regex=.*\\..*:
设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。
2.3 验证服务启动成功
docker logs <containerids>
可以看到这样的打印:
三、springboot端集成canal客户端
3.1 添加依赖 /配置
<!-- canal begin-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.0</version>
</dependency>
<!-- canal end-->
canal:
host: 127.0.0.1 #自己的canal服务器ip
port: 11111 #canal默认端口
destination: test #配置文件配置的名称
username: root
password: 214365
batch:
size: 100
3.2 客户端代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.eco.db.entity.Record;
import com.eco.fishway.service.RecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
@Component
public class CanalClient implements InitializingBean, DisposableBean {
@Value("${canal.host}")
private String canalHost;
@Value("${canal.port}")
private int canalPort;
@Value("${canal.destination}")
private String canalDestination;
@Value("${canal.username}")
private String canalUsername;
@Value("${canal.password}")
private String canalPassword;
@Value("${canal.batch.size}")
private int batchSize;
private final RecordService recordService;
private CanalConnector canalConnector;
private ExecutorService executorService;
public CanalClient(RecordService recordService) {
this.recordService = recordService;
}
@Override
public void afterPropertiesSet() throws Exception {
this.canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalHost, canalPort),
canalDestination,
canalUsername,
canalPassword
);
this.executorService = Executors.newSingleThreadExecutor();
this.executorService.execute(new Task());
}
@Override
public void destroy() throws Exception {
if (executorService != null) {
executorService.shutdown();
}
}
private class Task implements Runnable {
@Override
public void run() {
while (true) {
try {
//连接
canalConnector.connect();
//订阅
canalConnector.subscribe();
while (true) {
Message message = canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
try {
//如果没有数据
if (batchId == -1 || size == 0) {
// log.info("无数据");
// 线程休眠2秒
Thread.sleep(2000);
} else {
// 如果有数据,处理数据
printEntry(message.getEntries());
// 确认处理完成
canalConnector.ack(batchId);
}
} catch (Exception e) {
log.error(e.getMessage());
// 程序错误,也直接确认,跳过这次偏移
canalConnector.ack(batchId);
}
} catch (Exception e) {
log.error("Error occurred when running Canal Client", e);
} finally {
canalConnector.disconnect();
}
}
}
}
private void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (isTransactionEntry(entry)){
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
CanalEntry.EventType eventType = rowChange.getEventType();
//打印Header信息
log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType);
//判断是否是DDL语句
if (rowChange.getIsDdl()) {
log.info("================》;isDdl: true,sql:{}", rowChange.getSql());
}
log.info(rowChange.getSql());
//获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
//如果是删除语句
if (eventType == CanalEntry.EventType.DELETE) {
log.info(">>>>>>>>>> 删除 >>>>>>>>>>");
printColumnAndExecute(rowData.getBeforeColumnsList(), "DELETE");
//如果是新增语句
} else if (eventType == CanalEntry.EventType.INSERT) {
log.info(">>>>>>>>>> 新增 >>>>>>>>>>");
printColumnAndExecute(rowData.getAfterColumnsList(), "INSERT");
//如果是更新的语句
} else {
log.info(">>>>>>>>>> 更新 >>>>>>>>>>");
//变更前的数据
log.info("------->; before");
printColumnAndExecute(rowData.getBeforeColumnsList(), null);
//变更后的数据
log.info("------->; after");
printColumnAndExecute(rowData.getAfterColumnsList(), "UPDATE");
}
}
}
}
/**
* 执行数据同步
* @param columns
* @param type
*/
private void printColumnAndExecute(List<CanalEntry.Column> columns, String type) {
if(type == null){
return;
}
JSONObject jsonObject = new JSONObject();
for (CanalEntry.Column column : columns) {
jsonObject.put(column.getName(), column.getValue());
}
// 此处使用json转对象的方式进行转换
Record bean = jsonObject.toBean(Record.class);
if(type.equals("INSERT")){
// 执行新增
recordService.save(bean);
log.info("新增成功->{}", jsonObject.toJSONString(0));
}else if (type.equals("UPDATE")){
// 执行编辑
recordService.updateById(bean);
log.info("编辑成功->{}", jsonObject.toJSONString(0));
}else if (type.equals("DELETE")){
// 执行删除
recordService.removeById(bean.getRecordId());
log.info("删除成功->{}", jsonObject.toJSONString(0));
}
}
/**
* 判断当前entry是否为事务日志
*/
private boolean isTransactionEntry(CanalEntry.Entry entry){
if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN){
log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getEntryType()
);
return true;
}else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getEntryType()
);
return true;
}else {
return false;
}
}
}
3.3 数据同步效果
有点感叹需求就是最好的老师,但是完不成需求就不好玩了
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » springboot集成canal
发表评论 取消回复