flink-cdc之mysql到es

天翼云开发者社区
• 阅读 1

本文分享自天翼云开发者社区《flink-cdc之mysql到es》,作者:刘****猛

环境搭建

version: '2'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.6.1
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      discovery.type: single-node
  kibana:
    image: docker.elastic.co/kibana/kibana:7.6.1
    ports:
      - "5601:5601"
    environment:
      ELASTICSEARCH_URL: ://elasticsearch:9200

es加密开启,配置文件映射到宿主机


docker cp 39:/usr/share/elasticsearch/config /root/docker-build/es/config

docker cp 7b:/usr/share/kibana/config /root/docker-build/es/kibana/config

需要在配置文件中开启x-pack验证, 修改config目录下面的elasticsearch.yml文件,在里面添加如下内容,

xpack.security.enabled: true
xpack.license.self_generated.type: basic
xpack.security.transport.ssl.enabled: true

重启es

再次进入容器

修改kibana的配置文件kibana.yml

server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "://elasticsearch:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
elasticsearch.username: "elastic"  # es账号
elasticsearch.password: "*******"   # es密码

通过kibana的develop界面执行相关指令

创建索引


PUT order_index
{
    "settings":{
        "index":{
            "number_of_shards":1,
            "number_of_replicas":0
        }
    }
}

创建mapping

PUT order_index/_mapping
{
    "properties":{
        "order_id":{
            "type":"long"
        },
        "goods_name":{
            "type":"text"
        },
        "goods_count":{
            "type":"long"
        },
        "goods_price":{
            "type":"text"
        },
        "order_money":{
            "type":"text"
        }
    }
}

查看索引详情

GET order_index 返回值

{
  "order_index" : {
    "aliases" : { },
    "mappings" : {
      "properties" : {
        "goods_count" : {
          "type" : "long"
        },
        "goods_name" : {
          "type" : "text"
        },
        "goods_price" : {
          "type" : "text"
        },
        "order_id" : {
          "type" : "long"
        },
        "order_money" : {
          "type" : "text"
        }
      }
    },
    "settings" : {
      "index" : {
        "creation_date" : "1685094234700",
        "number_of_shards" : "1",
        "number_of_replicas" : "0",
        "uuid" : "YLwsxO1pS6qWolb2N7cG5w",
        "version" : {
          "created" : "7060199"
        },
        "provided_name" : "order_index"
      }
    }
  }
}

mysql创建数据表及数据

CREATE TABLE `my_order` (
  `order_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '订单id',
  `order_money` decimal(8,2) NOT NULL COMMENT '订单金额',
  `user_id` int(8) NOT NULL COMMENT '用户id',
  `sub_province` varchar(20) NOT NULL COMMENT '下单时 省',
  `sub_city` varchar(20)  NOT NULL COMMENT '下单时 市',
  `sub_district` varchar(20) NOT NULL COMMENT '下单时 区',
  `payment_status` int(1) NOT NULL DEFAULT '0' COMMENT '付款状态 0正常 1作废',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟订单表';

CREATE TABLE `my_order_goods` (
  `order_goods_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '订单商品id',
  `order_id` int(8) NOT NULL COMMENT '订单id',
  `goods_id` int(8) NOT NULL COMMENT '商品id',
  `sub_goods_name` varchar(50)  NOT NULL COMMENT '下单时商品名称',
  `sub_goods_price` decimal(8,2) NOT NULL COMMENT '下单时商品价格',
  `goods_count` int(11) NOT NULL COMMENT '下单了多少件',
  PRIMARY KEY (`order_goods_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟订单下单商品表';

CREATE TABLE `my_goods` (
  `goods_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '商品id',
  `goods_price` decimal(8,2) NOT NULL COMMENT '商品价格',
  `goods_name` varchar(50) NOT NULL COMMENT '商品名称',
  `goods_details` varchar(255) DEFAULT NULL COMMENT '商品详情',
  PRIMARY KEY (`goods_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟商品表';

写入样本数据

-- 初始化订单数据
INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (1, 19.80, 1, '北京', '北京市', '西城区', 0, '2021-06-10 11:02:29');
INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (2, 9.90, 1, '北京', '北京市', '丰台区', 0, '2021-06-10 11:02:59');
INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (3, 300.00, 1, '北京', '北京市', '朝阳区', 0, '2021-06-10 11:03:16');
INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (4, 66.60, 1, '北京', '北京市', '顺义区', 0, '2021-06-10 11:03:32');

-- 初始化商品数据
INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (1, 9.90, '两次性保温杯-改名称了~', '我是一只保温杯~');
INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (2, 100.00, '欧莱雅男士洗面奶', '只买贵的,不买对的~');
INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (3, 66.60, 'ipone13双面曲折屏', '是苹果,不是吃的那种...');

-- 初始化订单商品数据(暂时不考虑一对多)
INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (1, 1, 1, '一次性保温杯', 9.90, 2);
INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (2, 2, 1, '一次性保温杯', 9.90, 1);
INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (3, 3, 2, '欧莱雅洗面奶', 100.00, 3);
INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (4, 4, 3, '吃的苹果', 66.60, 1);

flinksql

CREATE TABLE my_order (
  order_id INT primary key not enforced,
  order_money DECIMAL(8, 2)
) WITH (
 'connector' = 'mysql-cdc',
  'hostname' = '101.43.164.4',
  'port' = '3306',
  'database-name' = 'cdc-source',
  'table-name' = 'my_order',
  'username' = 'root',
  'password' = '******',
  'jdbc.properties.useSSL' = 'false'

);
CREATE TABLE my_goods (
  goods_id INT primary key not enforced,
  goods_name STRING,
  goods_price DECIMAL(8, 2)
) WITH (
 'connector' = 'mysql-cdc',
  'hostname' = '101.43.164.4',
  'port' = '3306',
  'database-name' = 'cdc-source',
  'table-name' = 'my_goods',
  'username' = 'root',
  'password' = '******',
  'jdbc.properties.useSSL' = 'false'
);
CREATE TABLE my_order_goods (
  order_id INT primary key not enforced,
  goods_id INT,
  goods_count INT
) WITH (
   'connector' = 'mysql-cdc',
  'hostname' = '101.43.164.4',
  'port' = '3306',
  'database-name' = 'cdc-source',
  'table-name' = 'my_order_goods',
  'username' = 'root',
  'password' = '******',
  'jdbc.properties.useSSL' = 'false'
);
CREATE TABLE order_index(
  order_id INT,
  goods_name STRING,
  goods_count INT,
  goods_price DECIMAL(8, 2),
  order_money DECIMAL(8, 2),
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = '101.43.164.4:9200',
    'index' = 'order_index',
    'username' = 'elastic',
    'password' = '******'
);

insert into order_index
select mo.order_id, mg.goods_name, mog.goods_count,
 mg.goods_price, mo.order_money
from my_order mo 
left join my_order_goods mog on mo.order_id = mog.order_id
left join my_goods mg on mog.goods_id = mg.goods_id;

kibana中查询es数据

POST order_index/_search
{
  "size": 20,
  "query": {"match_all": {

  }}
}
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "order_index",
        "_type" : "_doc",
        "_id" : "3",
        "_score" : 1.0,
        "_source" : {
          "order_id" : 3,
          "goods_name" : "欧莱雅男士洗面奶",
          "goods_count" : 3,
          "goods_price" : 100.0,
          "order_money" : 300.0
        }
      },
      {
        "_index" : "order_index",
        "_type" : "_doc",
        "_id" : "4",
        "_score" : 1.0,
        "_source" : {
          "order_id" : 4,
          "goods_name" : "ipone13双面曲折屏~",
          "goods_count" : 1,
          "goods_price" : 66.6,
          "order_money" : 66.6
        }
      },
      {
        "_index" : "order_index",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "order_id" : 1,
          "goods_name" : "两次性保温杯-3342",
          "goods_count" : 2,
          "goods_price" : 9.9,
          "order_money" : 19.8
        }
      },
      {
        "_index" : "order_index",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "order_id" : 2,
          "goods_name" : "两次性保温杯-3342",
          "goods_count" : 1,
          "goods_price" : 9.9,
          "order_money" : 9.9
        }
      }
    ]
  }
}

此时修改mysql商品表

UPDATE `cdc-source`.`my_goods` SET `goods_name` = '两次性保温杯-我又改名了' WHERE `goods_id` = 1

此时查看es中的数据

{ "took" : 363, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 4, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "order_index", "_type" : "_doc", "_id" : "4", "_score" : 1.0, "_source" : { "order_id" : 4, "goods_name" : "ipone13双面曲折屏", "goods_count" : 1, "goods_price" : 66.6, "order_money" : 66.6 } }, { "_index" : "order_index", "_type" : "_doc", "_id" : "3", "_score" : 1.0, "_source" : { "order_id" : 3, "goods_name" : "欧莱雅男士洗面奶", "goods_count" : 3, "goods_price" : 100.0, "order_money" : 300.0 } }, { "_index" : "order_index", "_type" : "_doc", "_id" : "2", "_score" : 1.0, "_source" : { "order_id" : 2, "goods_name" : "两次性保温杯-我又改名了", "goods_count" : 1, "goods_price" : 9.9, "order_money" : 9.9 } }, { "_index" : "order_index", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "order_id" : 1, "goods_name" : "两次性保温杯-我又改名了", "goods_count" : 2, "goods_price" : 9.9, "order_money" : 19.8 } } ] } } ```

点赞
收藏
评论区
推荐文章
Springfox与SpringDoc——swagger如何选择(SpringDoc入门)
本文分享自天翼云开发者社区@《》,作者:才开始学技术的小白0.引言之前写过一篇关于swagger(实际上是springfox)的使用指南(https://www.ctyun.cn/developer/article/371704742199365),涵盖了
Easter79 Easter79
3年前
Springboot整合elasticsearch以及接口开发
Springboot整合elasticsearch以及接口开发搭建elasticsearch集群搭建过程略(我这里用的是elasticsearch5.5.2版本)写入测试数据新建索引book(非结构化索引)PUThttp://192.168.100.102:9200/book
Stella981 Stella981
3年前
ELK学习笔记之ElasticSearch的索引详解
0x00ElasticSearch的索引和MySQL的索引方式对比Elasticsearch是通过Lucene的倒排索引技术实现比关系型数据库更快的过滤。特别是它对多条件的过滤支持非常好,比如年龄在18和30之间,性别为女性这样的组合查询。倒排索引很多地方都有介绍,但是其比关系型
基于ubuntu系统部署FateLLM1.3.0
本文分享自天翼云开发者社区《》,作者:刘阳一、基础环境本次测试是基于K8S集群管理的两个POD节点进行部署,软硬件信息如下:服务器规格:CPU:8c80GGPU:V100/40G2系统环境:ubuntu18.04cuda11.7deepspeed0.9.5
kubelet报listen tcp [::1]:0: bind问题解决
本文分享自天翼云开发者社区《》,作者:SummerSnow目录1.环境目录2.问题现象3.问题定位4.问题解决环境介绍k8s集群环境如下:root@k8smaster$kubectlversionClientVersion:version.Info
NFS使用
本文分享自天翼云开发者社区《》,作者:2m安装nfsnfs依赖于rpc,故需安装nfsutilsrpcbindyuminstallynfsutilsrpcbind​指定nfs监听端口vim/etc/sysconfig/nfs​RQUOTADPORT3000
安装minio集群
本文分享自天翼云开发者社区《》,作者:2m1.创建minio用户创建用户useraddminio赋予密码(生产环境需强密码)passwdminio设置密码有效期为99999天chageM99999minio2.创建挂载磁盘路径mkdirp/data/min
搭建MySQL主从
本文分享自天翼云开发者社区《》,作者:2m——本文基于MySQL5.7.36进行演示1、下载MySQL安装包官网网址:https://downloads.mysql.com/archives/community2、解压MySQL安装包将以下包上传至服务器:
基于Linux系统的PXE搭建方法
本文分享自天翼云开发者社区《》,作者:tn一、底层环境准备1、安装RedHat7.6系统2、关闭防火墙和Selinuxsystemctlstopfirewalldchkconfigfirewalldoffvim/etc/sysconfig/selinux修
ES集群迁移方案总结
本文分享自天翼云开发者社区《ES集群迁移方案总结》,作者:刘鑫ES集群迁移可以通过以下几种方式实现,具体方案的选择,需要根据数据量、索引类型、网络情况等进行方案评估和选择。在实施迁移时,需确保目标集群能够承载迁移的数据量,并考虑到集群的可用性、数据一致性和
天翼云开发者社区
天翼云开发者社区
Lv1
天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。
文章
908
粉丝
16
获赞
40