MongoDB手动同步主库(Change Stream)

Stella981
• 阅读 1280

主从同步原理

所有数据库同步原理几乎一样,MongoDB解析oplog,Mysql解析bin.log,今天实现了MongoDB同步机制,请关注小编下次更新Mysql同步机制。

  • intial sync:初始化所有数据。
  • replication:根据oplog实现增量同步。

初始化所有数据这个不说了,以下代码根据oplog实时读取并同步数据。


Change Stream

MongoDB官网提供操作流,通过watch机制监听oplog变更并反向通知程序。

MongoDB官网给出oplog操作类型:

  • insert:添加数据
  • delete:删除数据
  • replace:替换数据
  • update:更新数据
  • drop:删除表
  • rename:修改表名
  • dropDatabase:删除数据库
  • invalidate:drop/rename/dropDatabase 将导致invalidate被触发,并关闭 change stream

还可以提供了监听条件、开始监听时间、Resume Tokens断点恢复等功能。

注:断点恢复也属于监听条件,只支持一个监听条件

Change Events解析

watch监听返回信息

{
   
    
        _id : {
   
     // 存储元信息
            "_data" : <BinData|hex string> // resumeToken,用于断点恢复
        },
        "operationType" : "<operation>", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate,部分仅支持4.0后的版本,详情见下
        "fullDocument" : {
   
     <document> }, // 修改后的数据,出现在insert, replace, delete, update的事件中
        "ns" : {
   
     // namespace
            "db" : "<database>", // 操作库名
            "coll" : "<collection" // 操作表名
        },
        "to" : {
   
     // 只在operationType为rename的时候有效,表示改名以后的namespace
            "db" : "<database>",
            "coll" : "<collection"
        },
        "documentKey" : {
   
     "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update事件中。正常只包含_id,对于sharded collection,还包括shard key
        "updateDescription" : {
   
     // 只在operationType为update的时候出现,相当于是增量的修改,而replace是替换
            "updatedFields" : {
   
     <document> }, // 更新的field的值
            "removedFields" : [ "<field>", ... ] // 删除的field列表
        },
        "clusterTime" : <Timestamp>, // 相当于ts字段
        "txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增
        "lsid" : {
   
     // 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id
            "id" : <UUID>,
            "uid" : <BinData>
        }
    }

go 代码

新建go mod工程,目录如下:
MongoDB手动同步主库(Change Stream)

utils可以忽略,小编自己写的映射。

  • 获取mongo连接包: go get go.mongodb.org/mongo-driver/mongo

initMongo.go代码:

package mongo

import (
    "context"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "log"
    "time"
)

func initMasterDBClient() *mongo.Database {
   
    
    var err error
    clientOptions := options.Client().ApplyURI("mongodb://ip:端口/?connect=direct").SetConnectTimeout(5 * time.Second)

    // 连接到MongoDB
    client, err := mongo.Connect(context.TODO(), clientOptions)
    if err != nil {
   
    
        log.Fatal(err)
    }

    //选择数据库
    return client.Database("数据库")
}

func initLiveDBClient() *mongo.Database {
   
    
    var err error
    clientOptions := options.Client().ApplyURI("mongodb://ip:端口/?connect=direct").SetConnectTimeout(5 * time.Second)

    // 连接到MongoDB
    client, err := mongo.Connect(context.TODO(), clientOptions)
    if err != nil {
   
    
        log.Fatal(err)
    }

    //选择数据库
    return client.Database("数据库")
}

func initSlaveDBClient() *mongo.Database {
   
    
    var err error
    clientOptions := options.Client().ApplyURI("mongodb://ip:端口/?connect=direct").SetConnectTimeout(5 * time.Second)

    // 连接到MongoDB
    client, err := mongo.Connect(context.TODO(), clientOptions)
    if err != nil {
   
    
        log.Fatal(err)
    }

    //选择数据库
    return client.Database("数据库")
}

sync.go代码:

package mongo

import (
    "context"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/bson/primitive"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "log"
    "time"
)

type StreamObject struct {
   
    
    Id                *WatchId `bson:"_id"`
    OperationType     string
    FullDocument      map[string]interface{
   
    }
    Ns                NS
    UpdateDescription map[string]interface{
   
    }
    DocumentKey       map[string]interface{
   
    }
}

type NS struct {
   
    
    Database   string `bson:"db"`
    Collection string `bson:"coll"`
}

type WatchId struct {
   
    
    Data string `bson:"_data"`
}

const (
    OperationTypeInsert  = "insert"
    OperationTypeDelete  = "delete"
    OperationTypeUpdate  = "update"
    OperationTypeReplace = "replace"
)

var resumeToken bson.Raw

func Sync() {
   
    
    go syncMaster()

    for {
   
    
        time.Sleep(2 * time.Second)
    }
}

func syncMaster() {
   
    
    for {
   
    
        //获得主库数据连接
        client := initMasterDBClient()
        watch(client)
    }
}

func watch(client *mongo.Database) {
   
    
    defer func() {
   
    
        err := recover()
        if err != nil {
   
    
            log.Printf("同步出现异常: %+v \n", err)
        }
    }()

    //设置过滤条件
    pipeline := mongo.Pipeline{
   
    
        bson.D{
   
    {
   
    "$match",
            bson.M{
   
    "operationType": bson.M{
   
    "$in": bson.A{
   
    "insert", "delete", "replace", "update"}}},
        }},
    }

    //当前时间前一小时
    now := time.Now()
    m, _ := time.ParseDuration("-1h")
    now = now.Add(m)
    timestamp := &primitive.Timestamp{
   
    
        T: uint32(now.Unix()),
        I: 0,
    }

    //设置监听option
    opt := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetStartAtOperationTime(timestamp)
    if resumeToken != nil {
   
    
        opt.SetResumeAfter(resumeToken)
        opt.SetStartAtOperationTime(nil)
    }

    //获得watch监听
    watch, err := client.Watch(context.TODO(), pipeline, opt)
    if err != nil {
   
    
        log.Fatal("watch监听失败:", err)
    }

    //获得从库连接
    slaveClient := initSlaveDBClient()

    for watch.Next(context.TODO()) {
   
    
        var stream StreamObject
        err = watch.Decode(&stream)
        if err != nil {
   
    
            log.Println("watch数据失败:", err)
        }

        log.Println("=============", stream.FullDocument["_id"])

        //保存现在resumeToken
        resumeToken = watch.ResumeToken()

        switch stream.OperationType {
   
    
        case OperationTypeInsert:
            syncInsert(slaveClient, stream)
        case OperationTypeDelete:
            filter := bson.M{
   
    "_id": stream.FullDocument["_id"]}
            _, err := slaveClient.Collection(stream.Ns.Collection).DeleteOne(context.TODO(), filter)
            if err != nil {
   
    
                log.Println("删除失败:", err)
            }
        case OperationTypeUpdate:
            filter := bson.M{
   
    "_id": stream.FullDocument["_id"]}
            update := bson.M{
   
    "$set": stream.FullDocument}
            _, err := slaveClient.Collection(stream.Ns.Collection).UpdateOne(context.TODO(), filter, update)
            if err != nil {
   
    
                log.Println("更新失败:", err)
            }
        case OperationTypeReplace:
            filter := bson.M{
   
    "_id": stream.FullDocument["_id"]}
            _, err := slaveClient.Collection(stream.Ns.Collection).ReplaceOne(context.TODO(), filter, stream.FullDocument)
            if err != nil {
   
    
                log.Println("替换失败:", err)
            }
        }
    }
}

func syncInsert(slaveClient *mongo.Database, stream StreamObject) {
   
    
    defer func() {
   
    
        _ = recover()
    }()

    _, err := slaveClient.Collection(stream.Ns.Collection).InsertOne(context.TODO(), stream.FullDocument)
    if err != nil {
   
    
        log.Println("插入失败:", err)
    }
}

main.go代码:

package main

import (
    "moneky-data-sync/mongo"
)

func main() {
   
    
    mongo.Sync()
}

git hub代码地址

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
MongoDB 定位 oplog 必须全表扫描吗?
MongoDBoplog(类似于MySQLbinlog)记录数据库的所有修改操作,除了用于主备同步;oplog还能玩出很多花样,比如1.全量备份增量备份所有的oplog,就能实现MongoDB恢复到任意时间点的功能2.通过oplog,除了实现到备节点的同步,也可以额外再往单独的集群同步数据(甚至是异构的数据库),实现容
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
mysql主从同步问题梳理
前言:MySQL主从复制故障机延迟原因有很多,之前详细介绍了Mysql主从复制的原理和部署过程,在mysql同步过程中会出现很多问题,导致数据同步异常。以下梳理了几种主从同步中可能存在的问题:1)slave运行过慢不能与master同步,也就是MySQL数据库主从同步延迟MySQL数据库slave服务器延迟的现象是非常普遍的,MySQ
Wesley13 Wesley13
3年前
MYSQL主从同步故障解决(主键重复)
MYSQL主从同步故障解决(主键重复)转载2010年04月05日18:52:00标签:mysql(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fso.csdn.net%2Fso%2Fsearch%2Fs.do%3Fq%
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
一种Mysql和Mongodb数据同步到Elasticsearch的实现办法和系统
一种Mysql和Mongodb数据同步到Elasticsearch的实现办法和系统
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这