WebHook服务 - Nebula.Ceres

设计背景:增强产品开放性。
例如:工单创建或者状态改变后及时通知第三方程序实现数据同步。



通常来说,如果希望应用程序具备较好的开放性(易于集成),应该做到:

  • 提供丰富&开放的WebAPI,供第三方程序主动获取数据以及推送数据。
  • 提供WebHook事件机制,供第三方订阅关键事件,实现业务联动。

本文所说的WebHook服务,就是为了解决第2点而设计,
它是一个通用服务,有以下优点:

  • 解耦合:应用程序只管发布事件,不用关注事件如何发送给第三方,以及有多少个第三方
  • 易用性:应用程序只负责发布事件,所有注册/订阅过程全都不需要关注
  • 通用性:Ceres接受多个应用程序的事件,并将事件发送到匹配的第三方程序订阅者
  • 可靠性:内置复杂且可靠的重试机制,尽量保证事件能成功发送给第三方

对于WebHook架构而言,会有3个参与对象

  • 事件发布者: 发布Web事件的应用程序,通常是【内部程序】
  • 事件订阅者: 订阅事件的应用程序,通用是【外部程序】,并且一个事件可能有多个不同的订阅者
  • 服务本身: WebHook服务 - Nebula.Ceres

如下图:

xx



事件发布者

事件发布者在使用WebHook服务过程中需要有2个动作:

  • 事件注册
    • 应用程序在发送事件前,需要先注册事件(可在 AdminUI 中配置)
  • 事件发布
    • 应用程序代码发布事件,例如:创建工单之后



事件发布者-发布事件

使用客户端的示例代码

// 准备事件相关的参数
string eventName = "TaskOrder.Create";
object eventData = new { name1, name2 };   // 也可以是一个DTO或者实体对象

// 调用WebHook服务端
await new CeresClient(eventName, eventData)
            .SetRetry(2, 3600)  // 设置重试参数,这个调用是可选的
            .GetRequest()
            .SendAsync();

使用HTTP协议的调用示例:

POST http://localhost:8504/v20/api/ceres/event/publish?sender=xxxx&eventName=xxxx&tenantId=xxxx HTTP/1.1
Content-Type: application/json; chartset=utf-8
x-Retry-MaxCount: 2
x-Retry-Expiration: 60

{
   "name1": "aaa"
   "name2": "bbb"
}



事件订阅者(第三方程序)

事件订阅者在使用WebHook服务过程中需要有2个动作:

  • 订阅事件
    • 在接收WebHook事件前,必须先订阅事件(可在 AdminUI 中配置)
      订阅事件时,必须提供【事件回调模板】来接收事件数据,示例如下。
  • 处理回调事件
    • WebHook服务根据订阅时提供的【事件回调模板】,调用第三方应用

事件回调模板示例:
POST http://www.3rd-app.com/webhook/callback?v={rand} HTTP/1.1
x-header1: aaaaaaaaaa
x-header2: bbbbbbbbb
x-event-name: TaskOrder.Create
x-event-src: Nebula.TestApp
x-xx1: {data.name1}
x-xx2: {data.name2}
Content-Type: application/json; charset=utf-8

{data}

事件回调模板的用法可参考:文本模板



WebHook服务内部实现


事件处理流程

  • 接收事件
    • 根据 sender+eventname 查找事件定义
    • 根据 evnetId+tenantId 匹配第三方订阅者
    • 构造事件消息 HookEvent,每个订阅都有一个消息对象
    • 将消息对象存入MQ -- Ceres_WebHook_Events
  • 执行推送
    • 根据回调模板和事件数据,构造回调的HTTP请求
    • 发起回调请求
    • 记录成功日志
  • 执行推送-错误处理
    • 检查消息是否需要重试
    • 如果需要则
      • 累加重试次数
      • 将消息发到MQ -- Ceres_WebHook_WaitRetry
      • 没有订阅者,一直会等到 “到期” 被移到 “Ceres_WebHook_Retry”
    • 如果不需要重试,就结束处理
    • 如果所有重试全部失败:将消息写入MQ -- Ceres_WebHook_DeadMsg
  • 执行失败重试
    • 记录【重试操作】日志
    • 执行重试,与 执行推送 过程一致





重试处理

  • 当出现消息处理失败时,将启动消息重试机制
  • MaxRetryCount: 如果超过这个次数,将不再重试
  • Expiration: 过期时间,单位:秒。从服务端接收到调用算起,如果超过这个时间,将不再重试
  • 每次消息处理过程,不论是第一次还是后续重试,都产生一条执行日志-HookEventLog
  • 重试失败的消息将写入一个死信队列-Ceres_WebHook_DeadMsg



重试间隔时间:

  • 固定间隔:3 秒
  • 变长间隔:请参考下图
  • 控制参数:本地参数 Nebula_LongRetryUtils_Mode = 1,启用【变长间隔】,否则使用 【固定间隔】

xx







执行日志

在回调第三方的WebHook时,

  • 不论成功还是失败,都是会记录一条日志
  • 如果是重试执行,会单独再记录一条日志,RetryCount 会不同

所有日志会写入 ElasticSearch,索引文件名称:hookeventlog-yyyyMMdd

建议在部署Ceres时在Kibana中执行以下配置

  • 新建 "Index Lifecycle Policie",假设名为:applog_policy
  • 新建 "Index Management", Index pattern: hookeventlog-* ,并按下面的示例来配置Settings
  • 新建 "Index pattern",时间字段选 serverTime
{
  "index": {
    "format": "1",
    "lifecycle": {
      "name": "applog_policy"
    }
  }
}