kafka数据同步到mysql

天翼云开发者社区
• 阅读 18

本文分享自天翼云开发者社区《kafka数据同步到mysql》,作者:刘****猛

kafka安装 使用docker-compose进行安装,docker-compose文件如下:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      proxy:
        ipv4_address: 172.16.0.8
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    networks:
      proxy:
        ipv4_address: 172.16.0.9
networks:
  proxy:
    ipam:
      config:
        - subnet: 172.16.0.0/24

这样安装的kakfa是没有密码的,下面为kafka配置密码 先将kafka的配置文件映射到本机目录

docker cp 277:/opt/kafka/config /root/docker-build/kafka/config/
docker cp 277:/opt/kafka/bin /root/docker-build/kafka/bin/

添加密码 然后将容器删除

修改config目录下的server.properties

############################# Server Basics #############################
broker.id=-1
listeners=SASL_PLAINTEXT://192.168.183.137:9092
advertised.listeners=SASL_PLAINTEXT://192.168.183.137:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

修改bin目录下的kafka-server-start.sh文件,修改如下 kafka数据同步到mysql 重新启动kafka,修改docker-compose.yml文件如下

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      proxy:
        ipv4_address: 172.16.0.8
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
        - ./config:/opt/kafka/config
        - ./bin:/opt/kafka/bin
    networks:
      proxy:
        ipv4_address: 172.16.0.9
networks:
  proxy:
    ipam:
      config:
        - subnet: 172.16.0.0/24

启动容器 docker-compose up -d 这样把kafka启动起来 测试步骤 在sp中启动任务 sql脚本

create table goods_source (
  goods_id int,
  goods_price decimal(8,2),
  goods_name varchar,
  goods_details varchar
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '101.43.164.4:9092',
  'topic' = 'test_kafka',
  'properties.group.id' = 'test-consumer-group-1',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="******";',
  'scan.startup.mode' =  'earliest-offset',
  'format' =  'json'

);
create table goods_target (
  goods_id int,
  goods_price decimal(8,2),
  goods_name varchar,
  goods_details varchar,
  PRIMARY KEY (`goods_id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://101.43.164.4:3306/cdc-sink?useSSL=false&characterEncoding=utf-8',
  'table-name' = 'my_goods_kafka',
  'username' = 'root',
  'password' = '******'
);
insert into
  goods_target
select
  *
from
  goods_source;

然后写代码,推送十条数据 这里是java写的推送,仅供参考

@Test
    public void test1() throws ExecutionException, InterruptedException {
        for (int i = 10; i <= 20; i++) {
            CdcTestGoods cdcTestGoods = new CdcTestGoods();
            cdcTestGoods.setGoods_id(5 + i);
            cdcTestGoods.setGoods_name("iphone 14 pro max 128G  " + i);
            cdcTestGoods.setGoods_details("京东618大降价,买到就是赚  " + i);
            cdcTestGoods.setGoods_price(5899f);
            SendResult<String, String> result = kafkaTemplate.send("test_kafka", JacksonUtils.getString(cdcTestGoods)).get();
            log.info("sendMessageSync =>  {},message: {}", result, JacksonUtils.getString(cdcTestGoods));
        }
    }

查看mysql表,出现相关内容,kafaka只支持insert 不支持update kafka数据同步到mysql

点赞
收藏
评论区
推荐文章
Springfox与SpringDoc——swagger如何选择(SpringDoc入门)
本文分享自天翼云开发者社区@《》,作者:才开始学技术的小白0.引言之前写过一篇关于swagger(实际上是springfox)的使用指南(https://www.ctyun.cn/developer/article/371704742199365),涵盖了
基于Linux系统的PXE搭建方法
本文分享自天翼云开发者社区《》,作者:tn一、底层环境准备1、安装RedHat7.6系统2、关闭防火墙和Selinuxsystemctlstopfirewalldchkconfigfirewalldoffvim/etc/sysconfig/selinux修
大数据通用组件故障处理
本文分享自天翼云开发者社区《》,作者:fnHDFS1.HDFS服务一直异常检查HDFS是否处于安全模式。检查ZooKeeper服务是否运行正常。2.HDFS维护客户端出现OutOfMemoryError异常使用HDFS客户端之前,需要在HADOOPCLIE
基于ubuntu系统部署FateLLM1.3.0
本文分享自天翼云开发者社区《》,作者:刘阳一、基础环境本次测试是基于K8S集群管理的两个POD节点进行部署,软硬件信息如下:服务器规格:CPU:8c80GGPU:V100/40G2系统环境:ubuntu18.04cuda11.7deepspeed0.9.5
kubelet报listen tcp [::1]:0: bind问题解决
本文分享自天翼云开发者社区《》,作者:SummerSnow目录1.环境目录2.问题现象3.问题定位4.问题解决环境介绍k8s集群环境如下:root@k8smaster$kubectlversionClientVersion:version.Info
NFS使用
本文分享自天翼云开发者社区《》,作者:2m安装nfsnfs依赖于rpc,故需安装nfsutilsrpcbindyuminstallynfsutilsrpcbind​指定nfs监听端口vim/etc/sysconfig/nfs​RQUOTADPORT3000
搭建MySQL主从
本文分享自天翼云开发者社区《》,作者:2m——本文基于MySQL5.7.36进行演示1、下载MySQL安装包官网网址:https://downloads.mysql.com/archives/community2、解压MySQL安装包将以下包上传至服务器:
flink-cdc之mysql到es
本文分享自天翼云开发者社区《》,作者:刘猛环境搭建version:'2'services:elasticsearch:image:docker.elastic.co/elasticsearch/elasticsearch:7.6.1ports:"9200:
使用element-ui 的上传组件upload完成自定义上传到天翼云oss云服务器
本文分享自天翼云开发者社区@《》,作者:我是小朋友首先配置天翼云,如下操作1、要求在使用OOS之前,首先需要在www.ctyun.cn注册一个账号(Account)。创建AccessKeyId和AccessSecretKey。AccessKeyId和Acc
Flink Parallelism、Flink Slot的关系
本文分享自天翼云开发者社区《》,作者:王帅1、Parallelism(并行度)的概念parallelism在Flink中表示每个算子的并行度。举两个例子(1)比如kafka某个topic数据量太大,设置了10个分区,但source端的算子并行度却为1,只有
天翼云开发者社区
天翼云开发者社区
Lv1
天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。
文章
917
粉丝
16
获赞
40