Canal安装及配置使用
-
MySQL 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/etc/my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
数据库重启后, 简单测试
my.cnf
配置是否生效:mysql> show variables like 'binlog_format'; mysql> show variables like 'log_bin';
-
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
-
下载压缩包
到canal官网地址(release)下载最新压缩包,请下载 canal.deployer-
latest
.tar.gz -
将canal.deployer 复制到固定目录并解压
mkdir -p /usr/local/canal mv canal.deployer-1.1.5.tar.gz /usr/local/canal/ tar -zxvf canal.deployer-1.1.5.tar.gz
-
配置修改参数(具体配置说明:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart)
a. 修改instance 配置文件sudo vim conf/example/instance.properties
# 按需修改成自己的数据库信息 ################################################# ... canal.instance.master.address=127.0.0.1:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # table regex canal.instance.filter.regex=distribution\\.dis_course,distribution\\.dis_group_buy # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) canal.instance.filter.field=distribution\\.dis_course:title/status/group_status/is_show/is_visible/course_status # mq config canal.mq.topic=xd_test # 针对库名或者表名发送动态topic canal.mq.dynamicTopic=xd_course_canal:distribution\\.dis_course,xd_group_canal:distribution\\.dis_group_buy canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################
b. 修改canal 配置文件sudo vim /usr/local/canal/conf/canal.properties
# ... # 可选项: tcp(默认), kafka, RocketMQ canal.serverMode = kafka # 是否为flat json格式对象 canal.mq.flatMessage = true # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # ... # kafka 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 kafka.bootstrap.servers = 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 kafka.acks = all kafka.compression.type = none # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 kafka.batch.size = 16384 kafka.max.request.size = 1048576 # flatMessage模式下请将该值改大, 建议50-200 kafka.linger.ms = 200 kafka.buffer.memory = 33554432 # 发送失败重试次数 kafka.retries = 0
-
启动/日志查看/关闭
a. 启动
cd /usr/local/canal/ sudo sh bin/startup.sh
b. 查看日志
vi logs/canal/canal.log vi logs/example/example.log
c. 关闭
cd /usr/local/canal/ sudo sh bin/stop.sh
-
Canal MQ Performance(性能测试文档)
如果遇到性能问题,可以查看对应的做一些调整。
-
常见问题解答
安装或者运行过程中遇到问题,可查阅:
<https://github.com/alibaba/canal/wiki/%E5%B8%B8%E8%A7%81%E9%97%AE%E9%A2%98%E8%A7%A3%E7%AD%94