多租户后台作业建议做法

作业特点

这类任务有2个明显特点:

  • 对所有租户执行相同的操作,例如:执行SQL脚本更新一些数据表
  • 执行时间长(因为第1点)




不建议的实现方式

以下用伪码举例

// 获取所有租户ID清单
List<string> list = GetAllTenantId();

// 用StringBuilder来存储所有租房的处理过程
StringBuilder sb = new ();

foreach(string tenantId in list) {

    // 在循环中一次处理一个租户
    string text = ProcessTenant(tenantId);
    sb.AppendLine(text);
}

// 给外界发送通知
SendNotify(sb.ToString());

这样设计有以下缺点:

  • 不利于任务重试!
    • 重试成本过高,重试只能从头开始
  • 不能中断!
  • 不利于性能评估!
    • 因为循环所有租户耗费的时间太长,不知道在某个租户上的具体耗时。
    • 带来一个新问题:由于没有性能监控,也 没法做性能优化!




推荐的实现方式

核心原则:能方便地评估代码的执行性能!

实现过程:

  • 拆分成2个部分
  • 主任务只做分裂处理(不做业务处理)
    • 如果是周期性的任务,则用 BackgroudTask 来实现
    • 如果是临时性的用户触发导致,可以直接直接由 HttpAction 实现

    • 每次执行时,按租户拆分成多个子任务
    • 每个子任务由一条消息表示,其中包含一个租户ID,然后将消息发送到MQ
  • 子任务(一定是后台运行)
    • 用 MessageHandler 来实现(有日志和性能监控
    • 根据消息中的租户ID,完成对一个租户的业务处理(任务足够小,性能评估才有意义)

以下是实现步骤的伪码。

1,先定义消息类型

public class TenantMssage {
    public string TenantId {get; set;}
    public string 其它所需的参数 {get; set;}


    // 用于记录执行过程中产生的消息(此属性不是必需)
    public StringBuilder Messages;

    // 用于记录执行了多少次(此属性不是必需)
    public ValueCounter ExecCounter;
}

2,程序初始化时注册消息订阅者

private static readonly MemoryMesssageQueue<TenantMssage> s_asyncMMQ 
                            = new MemoryMesssageQueue<TenantMssage>(MmqWorkMode.Async);

MmqSubscriber.StartAsync<TenantMssage, TenantMssageHandler>(new MmqSubscriberArgs {
    Queue = s_asyncMMQ,
    SubscriberCount = LocalSettings.GetUInt("TenantMssageHandler_Subscriber_Count", 1)
});

3-1,临时性主任务的实现-HttpAction

[Route("test1.aspx")]
public int Test1()
{
    // 获取要处理的租户ID清单
    List<string> list = GetAllTenantId();

    foreach(string tenantId in list) {
            TenantMssage msg = new TenantMssage{ TenantId = tenantId };
            client.SendMessage(msg);        
        }
}

3-2,周期性主任务的实现-BackgroudTask

public class TenantTask1 : BackgroundTask
{
    public override int? SleepSeconds => 3600;    // 假设1小时执行一次

    public override void Execute()
    {
        // 获取要处理的租户ID清单
        List<string> list = GetAllTenantId();

        using( RabbitClient client = new RabbitClient("SettingName", "Application111") ) {
            foreach(string tenantId in list) {
                TenantMssage msg = new TenantMssage{ TenantId = tenantId };
                client.SendMessage(msg);        
            }
        }
    }
}

4,执行租户处理逻辑-MessageHandler

public class TenantMssageHandler : BaseMessageHandler<TenantMssage>
{
    public override void ProcessMessage(PipelineContext<TenantMssage> context)
    {
        // 获取消息对象
        TenantMssage message = context.MessageData;
        
        ProcessTenant(message);
    }
}




任务的整体执行进度

如果需要获取任何的整体执行进度,可以参考下面的实现方式。

1,在消息结构中,增加一个总数,例如:

public class TenantMssage {
    public string TenantId {get; set;}
    public int TenantCount {get; set;}
    public string 其它所需的参数 {get; set;}
}

2,开始执行任务时

[Route("test1.aspx")]
public int Test1()
{
    // 获取要处理的租户ID清单
    List<string> list = GetAllTenantId();

    // ========================================= 注意下面这行代码
    // 创建一个计数器,用于累加执行进度
    Redis.GetDatabase().StringSet("XXX业务场景_key", 0);
    // =========================================

    using( RabbitClient client = new RabbitClient("SettingName", "Application111") ) {
        foreach(string tenantId in list) {
            TenantMssage msg = new TenantMssage{ TenantId = tenantId };
            client.SendMessage(msg);        
        }
    }
}

3,在消息处理时,用Redis的计数器功能累加进度:

public class TenantMssageHandler : BaseMessageHandler<TenantMssage>
{
    public override void ProcessMessage(PipelineContext<TenantMssage> context)
    {
        // 获取消息对象
        TenantMssage message = context.MessageData;
        
        ProcessTenant(message);

        // 更新执行进度
        Redis.GetDatabase().StringIncrement("XXX业务场景_key");
    }
}

4,获取整体执行进度

int count = Redis.GetDatabase().StringGet("XXX业务场景_key");

int 百分比 = count / TenantCount;

// 如果到了 100%,可以把这个 Redis 计数器删除。




任务的整体执行进度