spring cloud集成canal
2020-12-13 14:50
标签:common let 类型 man ade str example rom 提交 前提 加入canal依赖 把ip、端口、监听表名做成配置文件 代码实现 新增 修改 删除 注意:拿到的值都是字符串,建议拿到id反查数据库,拿到对象再同步到自己的缓存。 spring cloud集成canal 标签:common let 类型 man ade str example rom 提交 原文地址:https://www.cnblogs.com/xiaostudy/p/11569750.htmlwin运行canal
1 dependency>
2 groupId>com.alibaba.ottergroupId>
3 artifactId>canal.clientartifactId>
4 version>1.1.3version>
5 dependency>
1 package com.frame.modules.dabis.archives.thread;
2
3 import com.alibaba.fastjson.JSONObject;
4 import com.alibaba.otter.canal.client.CanalConnector;
5 import com.alibaba.otter.canal.client.CanalConnectors;
6 import com.alibaba.otter.canal.protocol.CanalEntry;
7 import com.alibaba.otter.canal.protocol.Message;
8 import com.frame.solr.em.SolrCode;
9 import com.frame.utils.PropertiesLoader;
10 import org.apache.commons.logging.Log;
11 import org.apache.commons.logging.LogFactory;
12
13 import java.net.InetSocketAddress;
14 import java.util.HashMap;
15 import java.util.List;
16 import java.util.Map;
17
18 /**
19 * @author liwei
20 * @date 2019/8/2 14:39
21 * @desc Created with IntelliJ IDEA.
22 */
23 public class CanalThread implements Runnable {
24
25 Log log = LogFactory.getLog(CanalThread.class);
26
27 private String solrName = SolrCode.ARCHIVES.getValue();
28
29
30 @Override
31 public void run() {
32 PropertiesLoader loader = new PropertiesLoader("solrConfig.properties");
33 listener(loader.getProperty("canalHost"), loader.getProperty("canalPort"), loader.getProperty("canalTable"));
34 }
35
36
37 public void listener(String canalHost, String canalPort, String table) {
38 // 创建链接
39 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, Integer.valueOf(canalPort)), "example", "", "");
40 int batchSize = 1000;
41 try {
42 // 连接
43 connector.connect();
44 // 监听表
45 connector.subscribe(table);
46 connector.rollback();
47 // 一直循环监听
48 while (true) {
49 // 获取指定数量的数据
50 Message message = connector.getWithoutAck(batchSize);
51 long batchId = message.getId();
52 if(-1 != batchId && 0 != message.getEntries().size()) {
53 printEntry(message.getEntries());
54 }
55 // 提交确认
56 connector.ack(batchId);
57 }
58 } finally {
59 connector.disconnect();
60 }
61 }
62
63 /**
64 * 打印具体变化
65 * @param entrys
66 */
67 private void printEntry(List
上一篇:【原】win7下调整分区
下一篇:python快速排序