1. MySQL 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/etc/my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    

    数据库重启后, 简单测试 my.cnf 配置是否生效:

    mysql> show variables like 'binlog_format';
    mysql> show variables like 'log_bin';
    
  2. 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    
  3. 下载压缩包

    ​ 到canal官网地址(release)下载最新压缩包,请下载 canal.deployer-latest.tar.gz

  4. 将canal.deployer 复制到固定目录并解压

    mkdir -p /usr/local/canal
    mv canal.deployer-1.1.5.tar.gz /usr/local/canal/
    tar -zxvf canal.deployer-1.1.5.tar.gz
    
  5. 配置修改参数(具体配置说明:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

    a. 修改instance 配置文件sudo vim conf/example/instance.properties

    #  按需修改成自己的数据库信息
    #################################################
    ...
    canal.instance.master.address=127.0.0.1:3306
    # username/password,数据库的用户名和密码
    ...
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    ...
    # table regex
    canal.instance.filter.regex=distribution\\.dis_course,distribution\\.dis_group_buy
    # table black regex
    canal.instance.filter.black.regex=
    # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    canal.instance.filter.field=distribution\\.dis_course:title/status/group_status/is_show/is_visible/course_status
    # mq config
    canal.mq.topic=xd_test
    # 针对库名或者表名发送动态topic
    canal.mq.dynamicTopic=xd_course_canal:distribution\\.dis_course,xd_group_canal:distribution\\.dis_group_buy
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #库名.表名: 唯一主键,多个表之间用逗号分隔
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################
    

    b. 修改canal 配置文件sudo vim /usr/local/canal/conf/canal.properties

    # ...
    # 可选项: tcp(默认), kafka, RocketMQ
    canal.serverMode = kafka
    # 是否为flat json格式对象
    canal.mq.flatMessage = true
    # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
    canal.mq.canalBatchSize = 50
    # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
    canal.mq.canalGetTimeout = 100
    # ...
    # kafka 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
    kafka.bootstrap.servers = 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
    kafka.acks = all
    kafka.compression.type = none
    # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
    kafka.batch.size = 16384
    kafka.max.request.size = 1048576
    # flatMessage模式下请将该值改大, 建议50-200
    kafka.linger.ms = 200
    kafka.buffer.memory = 33554432
    # 发送失败重试次数
    kafka.retries = 0
    
  6. 启动/日志查看/关闭

    a. 启动

    cd /usr/local/canal/
    sudo sh bin/startup.sh
    

    b. 查看日志

    vi logs/canal/canal.log
    vi logs/example/example.log
    

    c. 关闭

    cd /usr/local/canal/
    sudo sh bin/stop.sh
    
  7. Canal MQ Performance(性能测试文档)

    如果遇到性能问题,可以查看对应的做一些调整。

    https://github.com/alibaba/canal/wiki/Canal-MQ-Performance

  8. 常见问题解答

    安装或者运行过程中遇到问题,可查阅:

    <https://github.com/alibaba/canal/wiki/%E5%B8%B8%E8%A7%81%E9%97%AE%E9%A2%98%E8%A7%A3%E7%AD%94