主从同步原理
所有数据库同步原理几乎一样,MongoDB解析oplog,Mysql解析bin.log,今天实现了MongoDB同步机制,请关注小编下次更新Mysql同步机制。
- intial sync:初始化所有数据。
- replication:根据oplog实现增量同步。
初始化所有数据这个不说了,以下代码根据oplog实时读取并同步数据。
Change Stream
MongoDB官网提供操作流,通过watch机制监听oplog变更并反向通知程序。
MongoDB官网给出oplog操作类型:
- insert:添加数据
- delete:删除数据
- replace:替换数据
- update:更新数据
- drop:删除表
- rename:修改表名
- dropDatabase:删除数据库
- invalidate:drop/rename/dropDatabase 将导致invalidate被触发,并关闭 change stream
还可以提供了监听条件、开始监听时间、Resume Tokens断点恢复等功能。
注:断点恢复也属于监听条件,只支持一个监听条件
Change Events解析
watch监听返回信息
{
_id : {
// 存储元信息
"_data" : <BinData|hex string> // resumeToken,用于断点恢复
},
"operationType" : "<operation>", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate,部分仅支持4.0后的版本,详情见下
"fullDocument" : {
<document> }, // 修改后的数据,出现在insert, replace, delete, update的事件中
"ns" : {
// namespace
"db" : "<database>", // 操作库名
"coll" : "<collection" // 操作表名
},
"to" : {
// 只在operationType为rename的时候有效,表示改名以后的namespace
"db" : "<database>",
"coll" : "<collection"
},
"documentKey" : {
"_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update事件中。正常只包含_id,对于sharded collection,还包括shard key
"updateDescription" : {
// 只在operationType为update的时候出现,相当于是增量的修改,而replace是替换
"updatedFields" : {
<document> }, // 更新的field的值
"removedFields" : [ "<field>", ... ] // 删除的field列表
},
"clusterTime" : <Timestamp>, // 相当于ts字段
"txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增
"lsid" : {
// 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id
"id" : <UUID>,
"uid" : <BinData>
}
}
go 代码
新建go mod工程,目录如下:
utils可以忽略,小编自己写的映射。
- 获取mongo连接包: go get go.mongodb.org/mongo-driver/mongo
initMongo.go代码:
package mongo
import (
"context"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
"time"
)
func initMasterDBClient() *mongo.Database {
var err error
clientOptions := options.Client().ApplyURI("mongodb://ip:端口/?connect=direct").SetConnectTimeout(5 * time.Second)
// 连接到MongoDB
client, err := mongo.Connect(context.TODO(), clientOptions)
if err != nil {
log.Fatal(err)
}
//选择数据库
return client.Database("数据库")
}
func initLiveDBClient() *mongo.Database {
var err error
clientOptions := options.Client().ApplyURI("mongodb://ip:端口/?connect=direct").SetConnectTimeout(5 * time.Second)
// 连接到MongoDB
client, err := mongo.Connect(context.TODO(), clientOptions)
if err != nil {
log.Fatal(err)
}
//选择数据库
return client.Database("数据库")
}
func initSlaveDBClient() *mongo.Database {
var err error
clientOptions := options.Client().ApplyURI("mongodb://ip:端口/?connect=direct").SetConnectTimeout(5 * time.Second)
// 连接到MongoDB
client, err := mongo.Connect(context.TODO(), clientOptions)
if err != nil {
log.Fatal(err)
}
//选择数据库
return client.Database("数据库")
}
sync.go代码:
package mongo
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
"time"
)
type StreamObject struct {
Id *WatchId `bson:"_id"`
OperationType string
FullDocument map[string]interface{
}
Ns NS
UpdateDescription map[string]interface{
}
DocumentKey map[string]interface{
}
}
type NS struct {
Database string `bson:"db"`
Collection string `bson:"coll"`
}
type WatchId struct {
Data string `bson:"_data"`
}
const (
OperationTypeInsert = "insert"
OperationTypeDelete = "delete"
OperationTypeUpdate = "update"
OperationTypeReplace = "replace"
)
var resumeToken bson.Raw
func Sync() {
go syncMaster()
for {
time.Sleep(2 * time.Second)
}
}
func syncMaster() {
for {
//获得主库数据连接
client := initMasterDBClient()
watch(client)
}
}
func watch(client *mongo.Database) {
defer func() {
err := recover()
if err != nil {
log.Printf("同步出现异常: %+v \n", err)
}
}()
//设置过滤条件
pipeline := mongo.Pipeline{
bson.D{
{
"$match",
bson.M{
"operationType": bson.M{
"$in": bson.A{
"insert", "delete", "replace", "update"}}},
}},
}
//当前时间前一小时
now := time.Now()
m, _ := time.ParseDuration("-1h")
now = now.Add(m)
timestamp := &primitive.Timestamp{
T: uint32(now.Unix()),
I: 0,
}
//设置监听option
opt := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetStartAtOperationTime(timestamp)
if resumeToken != nil {
opt.SetResumeAfter(resumeToken)
opt.SetStartAtOperationTime(nil)
}
//获得watch监听
watch, err := client.Watch(context.TODO(), pipeline, opt)
if err != nil {
log.Fatal("watch监听失败:", err)
}
//获得从库连接
slaveClient := initSlaveDBClient()
for watch.Next(context.TODO()) {
var stream StreamObject
err = watch.Decode(&stream)
if err != nil {
log.Println("watch数据失败:", err)
}
log.Println("=============", stream.FullDocument["_id"])
//保存现在resumeToken
resumeToken = watch.ResumeToken()
switch stream.OperationType {
case OperationTypeInsert:
syncInsert(slaveClient, stream)
case OperationTypeDelete:
filter := bson.M{
"_id": stream.FullDocument["_id"]}
_, err := slaveClient.Collection(stream.Ns.Collection).DeleteOne(context.TODO(), filter)
if err != nil {
log.Println("删除失败:", err)
}
case OperationTypeUpdate:
filter := bson.M{
"_id": stream.FullDocument["_id"]}
update := bson.M{
"$set": stream.FullDocument}
_, err := slaveClient.Collection(stream.Ns.Collection).UpdateOne(context.TODO(), filter, update)
if err != nil {
log.Println("更新失败:", err)
}
case OperationTypeReplace:
filter := bson.M{
"_id": stream.FullDocument["_id"]}
_, err := slaveClient.Collection(stream.Ns.Collection).ReplaceOne(context.TODO(), filter, stream.FullDocument)
if err != nil {
log.Println("替换失败:", err)
}
}
}
}
func syncInsert(slaveClient *mongo.Database, stream StreamObject) {
defer func() {
_ = recover()
}()
_, err := slaveClient.Collection(stream.Ns.Collection).InsertOne(context.TODO(), stream.FullDocument)
if err != nil {
log.Println("插入失败:", err)
}
}
main.go代码:
package main
import (
"moneky-data-sync/mongo"
)
func main() {
mongo.Sync()
}