多租户后台作业建议做法
作业特点
这类任务有2个明显特点:
- 对所有租户执行相同的操作,例如:执行SQL脚本更新一些数据表
- 执行时间长(因为第1点)
不建议的实现方式
以下用伪码举例
// 获取所有租户ID清单
List<string> list = GetAllTenantId();
// 用StringBuilder来存储所有租房的处理过程
StringBuilder sb = new ();
foreach(string tenantId in list) {
// 在循环中一次处理一个租户
string text = ProcessTenant(tenantId);
sb.AppendLine(text);
}
// 给外界发送通知
SendNotify(sb.ToString());
这样设计有以下缺点:
- 不利于任务重试!
- 重试成本过高,重试只能从头开始
- 不能中断!
- 不利于性能评估!
- 因为循环所有租户耗费的时间太长,不知道在某个租户上的具体耗时。
- 带来一个新问题:由于没有性能监控,也 没法做性能优化!
推荐的实现方式
核心原则:能方便地评估代码的执行性能!
实现过程:
- 拆分成2个部分
- 主任务只做分裂处理(不做业务处理)
- 如果是周期性的任务,则用 BackgroudTask 来实现
- 如果是临时性的用户触发导致,可以直接直接由 HttpAction 实现
- 每次执行时,按租户拆分成多个子任务
- 每个子任务由一条消息表示,其中包含一个租户ID,然后将消息发送到MQ
- 子任务(一定是后台运行)
- 用 MessageHandler 来实现(有日志和性能监控)
- 根据消息中的租户ID,完成对一个租户的业务处理(任务足够小,性能评估才有意义)
以下是实现步骤的伪码。
1,先定义消息类型
public class TenantMssage {
public string TenantId {get; set;}
public string 其它所需的参数 {get; set;}
// 用于记录执行过程中产生的消息(此属性不是必需)
public StringBuilder Messages;
// 用于记录执行了多少次(此属性不是必需)
public ValueCounter ExecCounter;
}
2,程序初始化时注册消息订阅者
private static readonly MemoryMesssageQueue<TenantMssage> s_asyncMMQ
= new MemoryMesssageQueue<TenantMssage>(MmqWorkMode.Async);
MmqSubscriber.StartAsync<TenantMssage, TenantMssageHandler>(new MmqSubscriberArgs {
Queue = s_asyncMMQ,
SubscriberCount = LocalSettings.GetUInt("TenantMssageHandler_Subscriber_Count", 1)
});
3-1,临时性主任务的实现-HttpAction
[Route("test1.aspx")]
public int Test1()
{
// 获取要处理的租户ID清单
List<string> list = GetAllTenantId();
foreach(string tenantId in list) {
TenantMssage msg = new TenantMssage{ TenantId = tenantId };
client.SendMessage(msg);
}
}
3-2,周期性主任务的实现-BackgroudTask
public class TenantTask1 : BackgroundTask
{
public override int? SleepSeconds => 3600; // 假设1小时执行一次
public override void Execute()
{
// 获取要处理的租户ID清单
List<string> list = GetAllTenantId();
using( RabbitClient client = new RabbitClient("SettingName", "Application111") ) {
foreach(string tenantId in list) {
TenantMssage msg = new TenantMssage{ TenantId = tenantId };
client.SendMessage(msg);
}
}
}
}
4,执行租户处理逻辑-MessageHandler
public class TenantMssageHandler : BaseMessageHandler<TenantMssage>
{
public override void ProcessMessage(PipelineContext<TenantMssage> context)
{
// 获取消息对象
TenantMssage message = context.MessageData;
ProcessTenant(message);
}
}
任务的整体执行进度
如果需要获取任何的整体执行进度,可以参考下面的实现方式。
1,在消息结构中,增加一个总数,例如:
public class TenantMssage {
public string TenantId {get; set;}
public int TenantCount {get; set;}
public string 其它所需的参数 {get; set;}
}
2,开始执行任务时
[Route("test1.aspx")]
public int Test1()
{
// 获取要处理的租户ID清单
List<string> list = GetAllTenantId();
// ========================================= 注意下面这行代码
// 创建一个计数器,用于累加执行进度
Redis.GetDatabase().StringSet("XXX业务场景_key", 0);
// =========================================
using( RabbitClient client = new RabbitClient("SettingName", "Application111") ) {
foreach(string tenantId in list) {
TenantMssage msg = new TenantMssage{ TenantId = tenantId };
client.SendMessage(msg);
}
}
}
3,在消息处理时,用Redis的计数器功能累加进度:
public class TenantMssageHandler : BaseMessageHandler<TenantMssage>
{
public override void ProcessMessage(PipelineContext<TenantMssage> context)
{
// 获取消息对象
TenantMssage message = context.MessageData;
ProcessTenant(message);
// 更新执行进度
Redis.GetDatabase().StringIncrement("XXX业务场景_key");
}
}
4,获取整体执行进度
int count = Redis.GetDatabase().StringGet("XXX业务场景_key");
int 百分比 = count / TenantCount;
// 如果到了 100%,可以把这个 Redis 计数器删除。