-
Notifications
You must be signed in to change notification settings - Fork 8
部署canal.deployer(OB作为数据源)
参考:https://github.com/alibaba/canal/wiki/AdminGuide#spring%E9%85%8D%E7%BD%AE
- 部署OceanBase社区版:https://github.com/oceanbase/oceanbase
- 部署OceanBase Log Proxy服务:https://github.com/oceanbase/oblogproxy
# canal监听的端口
canal.port = 11111
# 指定文件存储目录,包含日志转换位点信息、客户端消费位点信息等。默认为配置目录
canal.file.data.dir = ${canal.conf.dir}
# 指定instance的destination,若为多个则以逗号分隔
canal.destinations = example
# 指定配置目录
canal.conf.dir = ../conf
# 指定instance的spring配置
canal.instance.global.spring.xml = classpath:spring/ob-file-instance.xml
tcp模式下
canal.serverMode = tcp
mq模式下,以kafka为例
canal.serverMode = kafka
kafka.bootstrap.servers = 127.0.0.1:9092
<!-- 继承ob的instance公共配置 -->
<!-- 其中会指定读取instance配置目录,默认为example/ob-instance.properties -->
<import resource="classpath:spring/ob-base-instance.xml"/>
<!-- 设置位点管理器,默认使用内存+metaManager -->
<property name="logPositionManager">
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</constructor-arg>
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<constructor-arg ref="metaManager"/>
</bean>
</constructor-arg>
</bean>
</property>
<!-- metaManager默认使用FileMixedMetaManager,先写内存,定期刷数据到文件 -->
<bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
<property name="dataDir" value="${canal.file.data.dir:../conf}"/>
<property name="period" value="${canal.file.flush.period:1000}"/>
</bean>
首先,修改ob-instance.properties名为instance.properties,以替换原有的instance.properties文件 之后配置instance参数:
# oceanbase集群参数
canal.instance.oceanbase.rsList=127.0.0.1:2882:2881
canal.instance.oceanbase.username=username@test
canal.instance.oceanbase.password=password
canal.instance.oceanbase.startTimestamp=0
# 监听的租户,4.x 版本只能监听非 sys 租户
canal.instance.oceanbase.tenant=test
# oceanbase logproxy的地址
canal.instance.oceanbase.logproxy.address=127.0.0.1:2983
# 与oceanbase logproxy间通信可以使用ssl加密,默认关闭
canal.instance.oceanbase.logproxy.sslEnabled=false
canal.instance.oceanbase.logproxy.serverCert=../conf/${canal.instance.destination:}/ca.crt
canal.instance.oceanbase.logproxy.clientCert=../conf/${canal.instance.destination:}/client.crt
canal.instance.oceanbase.logproxy.clientKey=../conf/${canal.instance.destination:}/client.key
# 监听表范围
canal.instance.filter.regex=schema1.tableName1,schema2.tableName2
若是在mq模式下,需要增加mq的配置
# 指定当前instance对应的topic
canal.mq.topic=example
# 动态路由topic的规则,若设置则会覆盖上面的canal.mq.topic
#canal.mq.dynamicTopic=canaltest\\..*
# 指定使用的partition
canal.mq.partition=0
配置完成后,在根目录下执行bin/startup.sh即可启动
Canal的高可用集群依赖于zookeeper,假设当前的zookeeper地址为127.0.0.1:2181
# canal监听的端口
canal.port = 11111
# 指定zookeeper的地址
canal.zkServers =127.0.0.1:2181
# 指定instance的destination,若为多个则以逗号分隔
canal.destinations = example
# 指定instance的spring配置
canal.instance.global.spring.xml = classpath:spring/ob-default-instance.xml
tcp模式下
canal.serverMode = tcp
mq模式下,以kafka为例
canal.serverMode = kafka
kafka.bootstrap.servers = 127.0.0.1:9092
<!-- 继承ob的instance公共配置 -->
<!-- 其中会指定读取instance配置目录,默认为example/ob-instance.properties -->
<import resource="classpath:spring/ob-base-instance.xml"/>
<!-- 设置位点管理器,默认使用内存+metaManager -->
<property name="logPositionManager">
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</constructor-arg>
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<constructor-arg ref="metaManager"/>
</bean>
</constructor-arg>
</bean>
</property>
<!-- metaManager默认使用PeriodMixedMetaManager,先写内存,定期刷数据到metaManager -->
<bean id="metaManager" class="com.alibaba.otter.canal.meta.PeriodMixedMetaManager">
<property name="zooKeeperMetaManager">
<bean class="com.alibaba.otter.canal.meta.ZooKeeperMetaManager">
<property name="zkClientx" ref="zkClientx" />
</bean>
</property>
<property name="period" value="${canal.zookeeper.flush.period:1000}" />
</bean>
<!-- zkClientx -->
<bean id="zkClientx" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean" >
<property name="targetClass" value="com.alibaba.otter.canal.common.zookeeper.ZkClientx" />
<property name="targetMethod" value="getZkClient" />
<property name="arguments">
<list>
<value>${canal.zkServers:127.0.0.1:2181}</value>
</list>
</property>
</bean>
首先,修改ob-instance.properties名为instance.properties,以替换原有的instance.properties文件 之后配置instance参数:
# oceanbase集群参数
canal.instance.oceanbase.rsList=127.0.0.1:2882:2881
canal.instance.oceanbase.username=username
canal.instance.oceanbase.password=password
canal.instance.oceanbase.startTimestamp=0
# oceanbase logproxy参数
canal.instance.oceanbase.logproxy.address=127.0.0.1:2983
canal.instance.oceanbase.logproxy.sslEnabled=true
canal.instance.oceanbase.logproxy.serverCert=../conf/${canal.instance.destination:}/ca.crt
canal.instance.oceanbase.logproxy.clientCert=../conf/${canal.instance.destination:}/client.crt
canal.instance.oceanbase.logproxy.clientKey=../conf/${canal.instance.destination:}/client.key
# 是否要在库名中去掉租户前缀。logproxy输出的日志中库名默认为 [tenant].[db]
canal.instance.parser.excludeTenantInDbName=true
canal.instance.oceanbase.tenant=sys
# 日志过滤。格式为[tenant].[database].[table],支持正则
canal.instance.filter.regex=sys.canal_test.*
若是在mq模式下,需要增加mq的配置
# 指定当前instance对应的topic
canal.mq.topic=example
# 动态路由topic的规则,若设置则会覆盖上面的canal.mq.topic
#canal.mq.dynamicTopic=canaltest\\..*
# 指定使用的partition
canal.mq.partition=0
配置完成后,在根目录下执行bin/startup.sh即可启动。
对于集群当前的运行状态,可以通过zkCli连接zookeeper后查询得到
# 查看example-ob下的cluster信息
ls /otter/canal/destinations/example/cluster
# 查看example-ob的当前运行状态
get /otter/canal/destinations/example/running
参考 https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart
# 指定admin服务监听的端口
server:
port: 8089
# 数据库配置
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
# canal server连接时使用的鉴权信息
canal:
adminUser: admin
adminPasswd: admin
此处的密码为原始密码,其通过SHA-1处理后的值对应canal server的配置项canal.admin.passwd,处理函数为 com.alibaba.otter.canal.protocol.SecurityUtil.scrambleGenPass。
确认前述配置文件中数据库的用户名和密码可用,之后导入元数据。
# 导入初始化SQL
> source conf/canal_manager.sql
之后就可以通过bin/startup.sh启动该服务。
进入 http://127.0.0.1:8089/
。用户名和密码默认是admin和123456,这里的用户信息保存在canal_manager.canal_user表,表中的密码同样是
com.alibaba.otter.canal.protocol.SecurityUtil.scrambleGenPass处理过的。
配置conf/canal_local.properties
# register ip
canal.register.ip =
# canal admin地址
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =
单机版的Canal Server需要启动时,可以直接通过如下命令启动:
sh bin/startup.sh local
启动后的Canal Server可以在Canal Admin的server管理页面进行配置修改等管理操作。
启动集群版的流程:
- 在Canal Admin的集群管理页面创建一个集群,点击主配置,编辑信息并保存。主要需要修改以下几项:
- canal.zkServers:填写当前使用的zookeeper
- canal.serverMode:填写使用的消费方式
- canal.instance.global.spring.xml:填写使用的xml,此处应为
classpath:spring/ob-default-instance.xml
- 编辑Canal Server的conf/canal_local.properties文件,确保如下两条填写无误:
# 集群名称,与Canal Admin集群管理页面的集群名称保持一致
canal.admin.register.cluster =
# 本Canal Server的名称
canal.admin.register.name =
- 启动Canal Server
sh bin/startup.sh local
此时,在Canal Admin的Server管理页面应当可以看到已经启动的Canal Server。
- 启动Canal Instance
在Canal Server的配置中,指定canal.instance.global.spring.xml
# 集群模式。位点信息存入zookeeper
canal.instance.global.spring.xml = classpath:spring/ob-default-instance.xml
# 单机模式。位点信息存入本地文件
canal.instance.global.spring.xml = classpath:spring/ob-file-instance.xml
进入Instance管理页面,创建instance,配置参考Canal Server中的conf/example/ob-instance.properties。 创建完毕后保存,instance将自动启动。