new ServiceBroker
default settings
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker();
custom settings
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
logLevel: "info"
});
communicate with remote nodes
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
nodeID: "node-1",
transporter: "nats://localhost:4222",
logLevel: "debug",
requestTimeout: 5 * 1000,
requestRetry: 3
});
broker options
- logLevel
- type:string
- default:info
- des:可选项目还有 trace、debug、 info、 warn、 error、 fatal
- middlewares
- type:Array
- default:null
- des:中间件
- type:Array
- created
- type:Function
- default:null
- des:broker 实例被创建的时候将会触发此函数
- started
- type:Function
- default:null
- des:broker 实例开始执行时触发此函数
- stopped
- type:Function
- default:null
- des:broker 实例停止执行时触发此函数
- hotReload
- type:Boolean
- default:false
- des:是否启动热加载
- cacher
- type:String、Object、Cacher
- default:null
- des:若是启动缓存,两个相同模型的 broker.call,只有第一个 call 会让 action 中对应 handler 完整的执行一遍,第二个 call 就不会了,它会直接从缓存中取数据,不常用
- https://moleculer.services/docs/0.13/caching.html
- transporter
- type:string、object、Transporter
- default:null
- des:多个节点通讯,需要一个传输中心,这就是消息代理服务器,这是微服务核心。一般选择 NATS,这需要安装 NATS 依赖包
- https://moleculer.services/docs/0.13/networking.html#Transporters
- serializer
- type:string、Serializer
- default:JSONSerializer
- des:系统默认以 json 格式序列化,不常用
- https://moleculer.services/docs/0.13/networking.html#Serialization
- nodeID
- type:string
- default:hostname + PID
- des:这是节点的id,挂载在 某一个 namespace 中是不能够同名的
- namespace
- type:string
- defalut:”“
- des:分割一个网咯中的不同区域,基本上用不到,除非项目特别复杂,子服务特别多
- requestTimeout
- type:Number
- default:0
- des:请求超时设置,单位毫秒
createService
该服务表示Moleculer框架中的一个微服务。您可以定义操作并订阅事件。若要创建服务,必须定义架构。服务模式类似于VueJS的一个组件
// 定义了两个actions
broker.createService({
name: "math",
actions: {
add(ctx) {
return Number(ctx.params.a) + Number(ctx.params.b);
},
sub(ctx) {
return Number(ctx.params.a) - Number(ctx.params.b);
}
}
});
name
- 强制属性,最后去 call 某一个微服务的时候必须带上 name
version
- 可以区分同一个 action 不同版本,一般不带
broker.call("v1.math.add")
- restful call:https://moleculer.services/docs/0.13/moleculer-web.html
settings
- 此属性相当于仓库
- 赋值可以是对象,对象中设置任意键值对,action 中通过
this.settings.xxxx
能够访问到设置项 - 远程节点上可以获得这些设置项
- 有一些内部设置是由核心模块使用的。这些设置名称以$(美元符号)开头
- $noVersionPrefix
- type:Boolean
- default:false
- des:禁用 action 版本前缀
- $noServiceNamePrefix
- type:Boolean
- default:false
- des:禁用 action 中的服务名称前缀。
- $dependencyTimeout
- type:Number
- default:0
- des:依赖等待超时
- $shutdownTimeout
- type:Number
- default:0
- des:关闭时等待活动请求的超时
- $noVersionPrefix
- 赋值可以是对象,对象中设置任意键值对,action 中通过
mixins
Mixins是一种为Moleculer服务分发可重用功能的灵活方法。服务构造函数将这些混合与当前架构合并。它是在您的服务中扩展其他服务。当服务使用混音时,混音中的所有属性都将“混合”到当前服务中。
const ApiGwService = require("moleculer-web");
module.exports = {
name: "api",
mixins: [ApiGwService]
settings: {
// Change port setting
port: 8080
},
actions: {
myAction() {
// Add a new action to apiGwService service
}
}
}
上面的示例创建了一个API服务,该服务继承了ApiGwService的所有内容,但是覆盖了端口设置,并使用新的myAction操作对其进行了扩展
actions
action 是服务中可调用的公共方法,broker.call 或 ctx.call,具体 action 必须在 action 中,可以是一个函数,可以是一个对象
- 调用时
- const res = await broker.call("math.add", { a: 5, b: 7 })
- const res = await broker.call("math.mult", { a: 10, b: 31 })
- https://moleculer.services/docs/0.13/services.html#Actions
- 调用时
events
- 事件订阅
lifecycle events
- 有一些生命周期服务事件,这些事件将由代理触发。它们被放置在模式的根中
- created:broker.loadService 或者 broker.createService 触发
- started:broker.start() 触发
- stopped:broker.stop() 触发
methods
- 创建私有方法,以供 action、event、lifecycle event 使用
dependencies
- 如果您的服务依赖于其他服务,请使用架构中的依赖项属性。服务在调用已启动的生命周期事件处理程序之前等待依赖服务
- 除了配做中添加 dependencies 属性,也可以用 broker 实例进行外部设置
- broker.waitForServices(["posts", "users"]),返回以恶 promise 对象
- broker.waitForServices("accounts", 10 * 1000, 500),设置超时事件和
metadata
- 元数据属性,您可以在这里存储有关服务的任何元信息。在服务函数中可以访问到元数据
- 元数据时可以被远程节点获取的
this
- 微服务中 this 指向微服务的实例,有些方法是可以直接被点出来的,具体查看请点击
- https://moleculer.services/docs/0.13/services.html#Properties-of-Service-instances
broker.createService
// 创建微服务实例方式之一
broker.createService({
name: "math",
actions: {
add(ctx) {
return Number(ctx.params.a) + Number(ctx.params.b);
}
}
});
load service from file
math.service.js
// Export the schema of service
module.exports = {
name: "math",
actions: {
add(ctx) {
return Number(ctx.params.a) + Number(ctx.params.b);
},
sub(ctx) {
return Number(ctx.params.a) - Number(ctx.params.b);
}
}
}
// Create broker
const broker = new ServiceBroker();
// Load service
broker.loadService("./math.service");
// Start broker
broker.start();
推荐使用这样的方式,一目了然,不会在一个文件写过多的代码
Load multiple services from a folder
如果您有很多服务,建议将它们放到一个服务文件夹中,并使用 Serge.loadService s方法加载所有这些服务
broker.loadServices(folder = "./services", fileMask = "**/*.service.js");
// 从 ./services 文件夹(包括子文件夹)加载每个 *.service.js 文件
broker.loadServices();
// 从当前文件夹(包括子文件夹)加载每个 *.service.js 文件
broker.loadServices("./");
// 从“./svc”文件夹加载每个用户*.service.js文件
broker.loadServices("./svc", "user*.service.js");
hot reloading services
Moleculer具有内置的热重加载功能.在开发期间,注意只针对 service.js 文件的修改被启动热重启,其他位置可以使用 nodemon
const broker = new ServiceBroker({
hotReload: true
});
broker.loadService("./services/test.service.js");
Internal services
// 列出所有已知节点(包括本地节点)
broker.call("$node.list").then(res => console.log(res))
// 列出所有注册的服务(本地和远程)
broker.call("$node.services").then(res => console.log(res))
// 列出所有已注册 action(本地和远程)。
broker.call("$node.actions").then(res => console.log(res))
// 列出所有订阅的事件
broker.call("$node.events").then(res => console.log(res))
// 列出本地节点的健康信息(包括进程和OS信息)
broker.call("$node.health").then(res => console.log(res));
action
action 是服务的可调用的公共方法。action 调用表示远程过程调用(RPC)。它有请求参数&返回响应,就像HTTP请求一样。如果您有多个服务实例,代理将在实例之间负载平衡请求
call services
若要调用服务,请使用 broke.Call 方法。代理查找具有给定 action service (可能在某一个节点上)并调用它。调用之后将会返回一个承诺
const res = await broker.call(actionName, params, opts)
params:参数是作为上下文的一部分传递给 action,action service 可以通过 ctx.params 访问传递参数,这是可选的
ops:一个对象,用于设置或者覆盖某些请求参数,例如:timeout、retry Count,这是可选的
- tiemout:请求超时,以毫秒为单位。如果请求超时,而您没有定义应急响应,将会报错。若要禁用设置0,请执行以下操作。如果未定义,将会启用 new ServiceBroker 中的 requestTimeout 设置
- retries :请求重试次数,如果请求超时,代理将再次尝试调用。若要禁用设置0。如果没有定义,将启用 new ServiceBroker 中的配置
- fallbackResponse :若请求失败就返回,这是一个 Function
- nodeID:目标节点,如果设置,它将直接调用给定的节点
- meta:请求元数据,通过操作处理程序中的 ctx.meta 访问它,它也将在嵌套调用中被传输和合并
- parentCtx:父亲的上下文实例
- requestID:请求ID或相关ID。它出现在标准事件中
broker.call("user.recommendation", { limit: 5 }, { timeout: 500, retries: 3, fallbackResponse: defaultRecommendation }).then(res => console.log("Result: ", res));
meta
- 元信息发送到具有元属性的服务,通过 action 处理程序中的ctx.meta访问它。请注意,在嵌套调用时,元被合并。
Streaming
Moleculer支持Node.js流作为请求参数和响应。使用它来传输从网关上传的文件,或者编码/解码或压缩/解压缩流
const stream = fs.createReadStream(fileName);
broker.call("storage.save", stream, { meta: { filename: "avatar-123.jpg" }});
请注意,参数应该是一个流,您不能向参数中添加更多的变量。使用元属性传输其他数据。
服务中接受流
module.exports = { name: "storage", actions: { save(ctx) { const s = fs.createWriteStream(
/tmp/${ctx.meta.filename}
); ctx.params.pipe(s); } } };将流作为服务中的响应返回
module.exports = { name: "storage", actions: { get: { params: { filename: "string" }, handler(ctx) { return fs.createReadStream(
/tmp/${ctx.params.filename}
); } } } };调用方接受流
const filename = "avatar-123.jpg"; broker.call("storage.get", { filename }) .then(stream => { const s = fs.createWriteStream(
./${filename}
); stream.pipe(s); s.on("close", () => broker.logger.info("File has been received")); })AES编解码示例服务
const crypto = require("crypto"); const password = "moleculer";
module.exports = { name: "aes", actions: { encrypt(ctx) { const encrypt = crypto.createCipher("aes-256-ctr", password); return ctx.params.pipe(encrypt); }, decrypt(ctx) { const decrypt = crypto.createDecipher("aes-256-ctr", password); return ctx.params.pipe(decrypt); } } };
action visibility
visibility:该属性控制 action service 是否可见、可调用
- published:公共的 action,它可以在本地调用,也可以远程调用,并且可以通过API网关发布
- pulic:公共的 action ,可以在本地或者远程调用,但不能通过APIGW发布
- protected:只能在本地 action service 调用(从本地服务调用)
- private:只能在内部调用(通过 this.actions.xy() 内部服务)
- 不设置,默认是 null,也就是 published,公共的
module.exports = { name: "posts", actions: { // It's published by default find(ctx) {}, clean: { // Callable only via
this.actions.clean
visibility: "private", handler(ctx) {} } }, methods: { cleanEntities() { // Call the action directly return this.actions.clean(); } } }
action hooks
定义 action 钩子来包装来自混合器的某些 action
有 before、after、error 钩子,将其分配给指定的 action 或者所有 action service (*)
钩子可以是函数,也可以是字符串。字符串必须是本地服务方法名。
const DbService = require("moleculer-db"); // before hook module.exports = { name: "posts", mixins: [DbService] hooks: { before: { // Define a global hook for all actions // The hook will call the
resolveLoggedUser
method. "*": "resolveLoggedUser", // Define multiple hooks remove: [ function isAuthenticated(ctx) { if (!ctx.user) throw new Error("Forbidden"); }, function isOwner(ctx) { if (!this.checkOwner(ctx.params.id, ctx.user.id)) throw new Error("Only owner can remove it."); } ] } }, methods: { async resolveLoggedUser(ctx) { if (ctx.meta.user) ctx.user = await ctx.call("users.get", { id: ctx.meta.user.id }); } } }const DbService = require("moleculer-db"); // after hook // error hook module.exports = { name: "users", mixins: [DbService] hooks: { after: { // Define a global hook for all actions to remove sensitive data "": function(ctx, res) { // Remove password delete res.password; // Please note, must return result (either the original or a new) return res; }, get: [ // Add a new virtual field to the entity async function (ctx, res) { res.friends = await ctx.call("friends.count", { query: { follower: res._id }}); return res; }, // Populate the
referrer
field async function (ctx, res) { if (res.referrer) res.referrer = await ctx.call("users.get", { id: res._id }); return res; } ] }, error: { // Global error handler "": function(ctx, err) { this.logger.error(Error occurred when '${ctx.action.name}' action was called
, err); // Throw further the error throw err; } } } };推荐的用例是创建混合元素,用方法填充服务,并在钩子中设置方法名
module.exports = { methods: { checkIsAuthenticated(ctx) { if (!ctx.meta.user) throw new Error("Unauthenticated"); }, checkUserRole(ctx) { if (ctx.action.role && ctx.meta.user.role != ctx.action.role) throw new Error("Forbidden"); }, checkOwner(ctx) { // Check the owner of entity } } }
// Use mixin methods in hooks const MyAuthMixin = require("./my.mixin");
module.exports = { name: "posts", mixins: [MyAuthMixin] hooks: { before: { "*": ["checkIsAuthenticated"], create: ["checkUserRole"], update: ["checkUserRole", "checkOwner"], remove: ["checkUserRole", "checkOwner"] } }, actions: { find: { // No required role handler(ctx) {} }, create: { role: "admin", handler(ctx) {} }, update: { role: "user", handler(ctx) {} } } };
context
当你去 call 一个 action service,broker 就会创建一个上下文 context 实例,这个实例包含着所有的请求信息,最后这些信息都会被当做 action service 中 handler 的一个参数 ctx 进行传递使用
在 handler 中可以点出的上下文信息(属性或者方法)
- ctx.id:context id
- ctx.broker:broker 对象实例
- ctx.action:action 定义实例
- ctx.nodeID:caller 或者 目标节点 id
- ctx.requestID:请求ID,如果在 nested-calls 中使用,它将是相同的ID。
- ctx.parentID:父亲上下文实例 id(在 nested-calls 中使用)
- ctx.params:请求参数,也就是 broker.call 中第二个参数具体设置
- ctx.meta:请求元数据,它将会传递到 nested-calls 中
- ctx.level:请求等级(在 nested-calls 内部使用),第一层等级为1
- ctx.call():在 nested-calls 中触发 action service,参数形式与 broker.call 一样
- ctx.emit():emit an event,same as broker.emit
- ctx.broadcast():Broadcast an event, same as broker.broadcast
优雅地关闭服务,请在代理选项中启用上下文跟踪功能。如果启用它,所有服务都将在关闭之前等待所有正在运行的上下文
- 一个超时值可以通过关闭Timeout Broker选项来定义。默认值为5秒
- 在 action services 中,关闭超时设置可以通过 $Shupdown Timeout 属性重写
const broker = new ServiceBroker({ nodeID: "node-1", tracking: { enabled: true, shutdownTimeout: 10 * 1000 } });
broker.call("posts.find", {}, { tracking: false }) // 关闭追踪
event
- Broker 有一个内置的事件总线来支持事件驱动体系结构,并将事件发送到本地和远程服务
- 事件侦听器被排列成逻辑组,这意味着每个组中只触发一个侦听器
- 例如你有两个主要服务 users、payments,这两个服务都订阅了 user.created 事件。此时,从 users 服务上注册 3 个具体实例,同时从 paymengs 服务上注册 2 个具体实例,当 emit 触发 user.created 事件,只有一个 user 和一个 payments 服务会被触发,效果如下
组名来自服务名称,但可以在服务中的事件定义中覆盖它。
module.exports = { name: "payment", events: { "order.created": { // Register handler to the "other" group instead of "payment" group. group: "other", handler(payload) { // ... } } } }
Emit balanced events
broker.emit 函数发送平衡的事件,第一个参数是事件的名称,第二个参数是传递的载荷,如果是复杂数据,可以传递一个对象
// The
user
will be serialized to transportation. broker.emit("user.created", user);指定哪些组/服务接收事件
// Only the
mail
&payments
services receives it broker.emit("user.created", user, ["mail", "payments"]);
Broadcast event
- 广播事件被发送到所有可用的本地和远程服务,它是不平衡的,所有服务实例都会收到它
利用
broker.broadcast
发送广播broker.broadcast("config.changed", config);
指定哪些组/服务接收事件
// Send to all "mail" service instances broker.broadcast("user.created", { user }, "mail");
// Send to all "user" & "purchase" service instances. broker.broadcast("user.created", { user }, ["user", "purchase"]);
Local broadcast event
Send broadcast events to only all local services with
broker.broadcastLocal
methodbroker.broadcastLocal("config.changed", config);
Subscribe to events
通过 service 中的属性 event 可以订阅具体事件,在事件名称中可以使用通配符
module.exports = { events: { // Subscribe to
user.created
event "user.created"(user) { console.log("User created:", user); }, // Subscribe to alluser
events "user.*"(user) { console.log("User event:", user); } // Subscribe to all internal events "$**"(payload, sender, event) { console.log(Event '${event}' received from ${sender} node:
, payload); } } }
Internal events
- broker broadcasts 广播内部事件,这些事件总是以$前缀开头
- $services.changed
- 如果本地节点或远程节点加载或破坏服务,代理将发送此事件
- $circuit-breaker.opened
- The broker sends this event when the circuit breaker module change its state to
open
- The broker sends this event when the circuit breaker module change its state to
- $circuit-breaker.half-opened
- The broker sends this event when the circuit breaker module change its state to
half-open
.
- The broker sends this event when the circuit breaker module change its state to
- $circuit-breaker.closed
- The broker sends this event when the circuit breaker module change its state to
closed
.
- The broker sends this event when the circuit breaker module change its state to
- $node.connected
- The broker sends this event when a node connected or reconnected.
- $node.updated
- The broker sends this event when it has received an INFO message from a node, (i.e. a service is loaded or destroyed).
- $node.disconnected
- The broker sends this event when a node disconnected (gracefully or unexpectedly).
- $broker.started
- The broker sends this event once
broker.start()
is called and all local services are started.
- The broker sends this event once
- $broker.stopped
- The broker sends this event once
broker.stop()
is called and all local services are stopped.
- The broker sends this event once
- $transporter.connected
- The transporter sends this event once the transporter is connected.
- $transporter.disconnected
- The transporter sends this event once the transporter is disconnected.
lifecycle
Broker lifecycle
- starting logic
- broker 启动传输连接,但是不会将本地服务列表发送到远程节点
- 完成后,broker 将启动所有服务(call service
started
handler) - 一旦所有服务启动成功,broker 就会将本地服务列表发布到远程节点上
- 因此,远程节点只有在所有本地服务正确启动之后才能发送请求
- avoid deadlocks
- broker start...
user
service hasdependencies: ["post"]
posts
service hasdependencies: ["users"]
- 这就死锁了,按照顺序加载,user 永元无法加载到依赖项 post
- stopping logic
- call
broker.stop
或者停止进程 - 首先,broker 会向远程节点发送一个空的服务列表,所以他们可以将请求路由到其他实例而不是停止服务
- 之后,broker 开始停止所有本地服务,之后 transporter 断开连接
- call
Service lifecycle
created event handler
broker.createService
orbroker.loadService
会触发此事件- 函数内部拿到 broker 实例(this),还可以创建其他模块实例,例如 http 服务器、数据库模块
const http = require("http");
module.exports = { name: "www", created() { // Create HTTP server this.server = http.createServer(this.httpHandler); } }; // created function is sync event handler,can not use async/await
started event handler
- 它被触发的时候,代理会启动所有的本地服务,而 broker 会启动所有的本地服务。使用它连接到数据库,侦听服务器…等
module.exports = { name: "users", async started() { try { await this.db.connect(); } catch(e) { throw new MoleculerServerError("Unable to connect to database.", e.message); } } }; // started function is async handler. you can use async/await
stopped event handler
- 它被触发的时候,
broker.stop
被调用和 broker 开始停止所有的本地服务。使用它关闭数据库连接,关闭套接字…等
module.exports = { name: "users", async stopped() { try { await this.db.disconnect(); } catch(e) { this.logger.warn("Unable to stop database connection gracefully.", e); } } }; // stopped function is async handler. you can use async/await
- 它被触发的时候,
logging
- 在Moleculer框架中,所有核心模块都有一个自定义记录器实例。它们是从Broker记录器实例继承的,该实例可以在Broker选项中进行配置。
Built-in logger
Moleculer有一个内置控制台记录器。这是默认的记录器
const { ServiceBroker } = require("moleculer"); const broker = new ServiceBroker({ nodeID: "node-100", // logger: true, logLevel: "info" });
broker.createService({ name: "posts", actions: { get(ctx) { this.logger.info("Log message via Service logger"); } } });
broker.start() .then(() => broker.call("posts.get")) .then(() => broker.logger.info("Log message via Broker logger")); [2018-06-26T11:38:06.728Z] INFO node-100/POSTS: Log message via Service logger [2018-06-26T11:38:06.728Z] INFO node-100/BROKER: Log message via Broker logger [2018-06-26T11:38:06.730Z] INFO node-100/BROKER: ServiceBroker is stopped. Good bye.
可以使用Broker选项中的logLevel选项更改日志级别。只与内置控制台记录器一起使用
const broker = new ServiceBroker({ logger: true, // the
true
is same asconsole
logLevel: "warn" // only logs the 'warn' & 'error' entries to the console });Available log levels:
fatal
,error
,warn
,info
,debug
,trace
可以为每个Moleculer模块设置日志级别。允许通配符使用
const broker = new ServiceBroker({ logLevel: { "MY.": false, // Disable log "TRANS": "warn", // Only 'warn ' and 'error' log entries "*.GREETER": "debug", // All log entries "": "info", // All other modules use this level } }); // 此设置是从上到下计算的,因此*级别必须是最后一项。
有一些内置的日志格式化程序
- default:[2018-06-26T13:36:05.761Z] INFO node-100/BROKER: Message
- simple:INFO - Message
- short:[13:36:30.968Z] INFO BROKER: Message
可以为内置控制台记录器设置自定义日志格式化程序函数
const broker = new ServiceBroker({ logFormatter(level, args, bindings) { return level.toUpperCase() + " " + bindings.nodeID + ": " + args.join(" "); } }); broker.logger.warn("Warn message"); broker.logger.error("Error message");
WARN dev-pc: Warn message ERROR dev-pc: Error message
自定义对象&数组打印格式化程序
- 设置一个自定义格式化程序函数来打印对象和数组。默认函数将对象和数组打印到一行,以便便于使用外部日志工具进行处理。但是,当您正在开发时,将对象打印成人类可读的多行格式将是有用的。为此,在代理选项中覆盖logObjectPrint函数。
const util = require("util");
const broker = new ServiceBroker({
logObjectPrinter: o => util.inspect(o, { depth: 4, breakLength: 100 }) }); broker.logger.warn(process.release);[2017-08-18T12:37:25.720Z] INFO dev-pc/BROKER: { name: 'node', lts: 'Carbon', sourceUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0.tar.gz', headersUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0-headers.tar.gz' }
External loggers
外部记录器可以与Moleculer一起使用。在这种情况下,将创建者函数设置为LOGER。当一个新模块继承一个新的记录器实例时,ServiceBroker将调用它
// pino const pino = require("pino")({ level: "info" }); const broker = new ServiceBroker({ logger: bindings => pino.child(bindings) });
// bunyan const bunyan = require("bunyan"); const logger = bunyan.createLogger({ name: "moleculer", level: "info" }); const broker = new ServiceBroker({ logger: bindings => logger.child(bindings) });
middlewares
networking
要通信其他节点(ServiceBrokers),您需要配置一个传输程序。大多数传输者连接到中心消息代理服务器,该服务器负责节点之间的消息传输。这些消息代理主要支持发布/订阅消息传递模式
Transporters
如果要在多个节点上运行服务,传输程序是一个重要的模块。传送器与其他节点通信。它传输事件、调用请求和处理响应… 如果一个服务在不同节点上的多个实例上运行,则请求将在活动节点之间实现负载平衡
整个通信逻辑是在传输类之外的。这意味着在不改变代码行的情况下,在传送器之间切换是很容易的。
Moleculer框架中有几个内置的运输机。
NATS
NATS服务器是一个简单、高性能的开源消息传递系统,用于云本机应用程序、物联网消息传递和微服务体系结构。
let { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
nodeID: "server-1",
transporter: "nats://nats.server:4222"
});
使用 nats 传输需要安装 nats 模块 npm install nats
// Connect to 'nats://localhost:4222'
const broker = new ServiceBroker({
transporter: "NATS"
});
// Connect to a remote NATS server
const broker = new ServiceBroker({
transporter: "nats://nats-server:4222"
});
// Connect with options
const broker = new ServiceBroker({
transporter: {
type: "NATS",
options: {
url: "nats://localhost:4222"
user: "admin",
pass: "1234"
}
}
});
// Connect with TLS
const broker = new ServiceBroker({
transporter: {
type: "NATS",
options: {
url: "nats://localhost:4222"
// More info: https://github.com/nats-io/node-nats#tls
tls: {
key: fs.readFileSync('./client-key.pem'),
cert: fs.readFileSync('./client-cert.pem'),
ca: [ fs.readFileSync('./ca.pem') ]
}
}
}
});
Serialization
传输程序需要一个序列化模块来序列化和反序列化传输的数据包。默认的串行化程序是JSONS序列化程序,但是有几个内置的串行化程序。
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
nodeID: "server-1",
transporter: "NATS",
serializer: "ProtoBuf"
});
JSON serializer:这是内置的默认序列化程序。它将数据包序列化为JSON字符串,并将接收到的数据反序列化为数据包。
const broker = new ServiceBroker({ // serializer: "JSON" // don't need to set, because it is the default });
Load balancing
Built-in strategies
若要配置策略,请在注册表属性下设置策略代理选项。它可以是一个名称(在内置策略的情况下),也可以是一个策略类(在自定义策略的情况下)。
Random strategy
const broker = new ServiceBroker({
registry: {
strategy: "Random"
}
});
RoundRobin strategy
const broker = new ServiceBroker({
registry: {
strategy: "RoundRobin"
}
});
CPU usage-based strategy
const broker = new ServiceBroker({
registry: {
strategy: "CpuUsage"
}
});
Fault tolerance
Circuit Breaker
Moleculer有一个内置的断路器解决方案.这是一个基于阈值的实现。它使用一个时间窗口来检查失败的请求率。一旦达到阈值,它就会触发断路器。
电路断路器可以防止应用程序重复尝试执行可能失败的操作。允许它继续,而不等待故障被修复或浪费CPU周期,而它确定故障是长期的。断路器模式还允许应用程序检测故障是否已经解决。如果问题似乎已经解决,应用程序可以尝试调用操作。
如果启用它,所有服务调用都将受到此内置断路器的保护。
在代理选项中启用它
const broker = new ServiceBroker({ circuitBreaker: { enabled: true, threshold: 0.5, minRequestCount: 20, windowTime: 60, // in seconds halfOpenTime: 5 * 1000, // in milliseconds check: err => err && err.code >= 500 } });
settings
- enabled:是否启动此功能,默认是 false
- threshold:阈值,默认0.5,意味着50%的跳闸失败
- minRequestCount:最小请求数,默认20,在它下面,回调函数不会触发
- windowTime:时间窗口的秒数,默认60秒
- halfOpenTime:从打开状态切换到半打开状态的毫秒数,默认10000毫秒
- check:检查失败请求的函数,默认
err && err.code >= 500
如果断路器状态发生更改,ServiceBroker将发送内部事件
这些全局选项也可以在操作定义中重写。
// users.service.js module.export = { name: "users", actions: { create: { circuitBreaker: { // All CB options can be overwritten from broker options. threshold: 0.3, windowTime: 30 }, handler(ctx) {} } } };
Retry
重试解决方案
const broker = new ServiceBroker({ retryPolicy: { enabled: true, retries: 5, delay: 100, maxDelay: 2000, factor: 2, check: err => err && !!err.retryable } });
settings
- enabled:是否启用,默认 false
- retries:重试的次数,默认5次
- delay:第一次延迟以毫秒为单位,默认 100
- maxDelay:最大延迟(以毫秒为单位),默认 2000
- factor:延迟退避系数,默认是 2,表示指数退避
- check:检查失败请求的函数,
err && !!err.retryable
在调用选项中覆盖retry值
broker.call("posts.find", {}, { retries: 3 });
在操作定义中覆盖重试策略值
// users.service.js module.export = { name: "users", actions: { find: { retryPolicy: { // All Retry policy options can be overwritten from broker options. retries: 3, delay: 500 }, handler(ctx) {} }, create: { retryPolicy: { // Disable retries for this action enabled: false }, handler(ctx) {} } } };
Timeout
可以为服务调用设置超时。它可以在代理选项或调用选项中全局设置。如果定义了超时并且请求超时,代理将抛出RequestTimeoutError错误。
const broker = new ServiceBroker({ requestTimeout: 5 * 1000 // in seconds });
覆盖调用选项中的超时值
broker.call("posts.find", {}, { timeout: 3000 });
分布式超时:Moleculer使用分布式超时。在嵌套调用的情况下,超时值会随着时间的推移而递减。如果超时值小于或等于0,则跳过下一个嵌套调用(RequestSkippedError),因为第一个调用已被RequestTimeoutError错误拒绝。
Bulkhead
在Moleculer框架中实现了舱壁特性,以控制动作的并发请求处理。
const broker = new ServiceBroker({ bulkhead: { enabled: true, concurrency: 3, maxQueueSize: 10, } });
settings
enabled:是否启动,默认 false
concurreny:最大限度的并行数量,默认3
maxQueueSize:最大队列大小,默认10
concurreny 值限制并发请求执行
如果 maxQueueSize大于0,则如果所有插槽都被占用,则 broker 将额外的请求存储在队列中
如果队列大小达到maxQueueSize限制或为0,则 Broker 将对每个添加请求抛出QueueIsFull异常
这些全局选项也可以在操作定义中重写
// users.service.js // 在操作定义中覆盖重试策略值 module.export = { name: "users", actions: { find: { bulkhead: { enabled: false }, handler(ctx) {} }, create: { bulkhead: { // Increment the concurrency value // for this action concurrency: 10 }, handler(ctx) {} } } };
Fallback
当您不想将错误返回给用户时,回退功能是非常有用的。相反,调用其他操作或返回一些常见的内容。可以在调用选项或操作定义中设置回退响应。
它应该是一个返回包含任何内容的承诺的函数。borker 将当前 context&Error对象作为参数传递给此函数。
// fallback settings in calling options const result = await broker.call("users.recommendation", { userID: 5 }, { timeout: 500, fallbackResponse(ctx, err) { // Return a common response from cache return broker.cacher.get("users.fallbackRecommendation:" + ctx.params.userID); } });
回退响应也可以在接收端,在 action 中定义
请注意,只有在action 处理程序中发生错误时,才会使用此回退响应。如果从远程节点调用请求,并且请求在远程节点上超时,则不使用回退响应。在这种情况下,在调用选项中使用回退响应。
// fallback as a function module.exports = { name: "recommends", actions: { add: { fallback: (ctx, err) => "Some cached result", //fallback: "fakeResult", handler(ctx) { // Do something } } } };
// fallback as method name module.exports = { name: "recommends", actions: { add: { // Call the 'getCachedResult' method when error occurred fallback: "getCachedResult", handler(ctx) { // Do something } } }, methods: { getCachedResult(ctx, err) { return "Some cached result"; } } };
NATS
帮助文档
Node.js 微服务,这是一本外文书籍,淘宝上有