Middleware (4) — ConcurrencyLimiter

Feng Gao
4 min readSep 1, 2021

1. Overview

在任何时刻,计算资源总是稀缺资源。为了保证我们的服务能够保证服务请求的质量,通常只会允许一部分请求同时处理。对于剩下的请求,直接返回失败的结果。这也是应对故障的一种方式,为了保证部分请求的 SLA (Service Level Agreement),而牺牲掉一部分请求的方式。

ConcurrencyLimiter 这个中间件可以帮助实现这个功能。它借助了 IQueuePolicy 这个组件来实现请求访问量的限制。这个接口包含了如下的方法:

  • TryEnetryAsync 方法回一个 ValueTask 对象,如果得到结果是 true ,则该请求能够继续处理下去;如果得到的结果是 false , 则拒绝该请求
  • OnExit 方法会在请求完成之后调用,这样允许后续的请求得到处理的机会。

QueuePolicyOptions 包含下面的属性

  • MaxConcurrentRequests 指定最大并发数量
  • RequestQueueLimit 如果请求数量大于并发数量,这里并不会直接拒绝掉请求,而是将他们存放到队列中,等待处理,那么队列长度由这个字段指定。

IQueuePolicy 有两种实现:

  1. QueuePolicy
  2. StackPolicy

接下来我们探究一下这两个的实现

2. QueuePolicy

QueuePolicy 借助了 SemaphoreSlim 对象控制了并发执行的数量,在初始化大小就是 maxConcurrentRequests 值。

TryEnterAsync 方法中,如果当前的请求量大于 maxConcurrentRequest + requestQueueLimit 的时候,直接返回 False ; 否则通过调用 SemaphoreSlimWaitAsync 方法。

如果当前数量小于 maxConcurrentRequests 时候, WaitAsync 方法会直接返回一个完成的 Task , 否则返回一个还没有完成的 Task , 这个 Task只有等到调用 Release 方法的时候,才会返回。

3. StackPolicy

QueuePolicy 的实现中,在 maxConcurrentRequest + requestQueueLimit 之后的请求,都会被舍弃掉。而 StackPolicy 则采取截然不同的策略。假设我么我们现在的 _maxQueueCapacity=3 ,当 MaxConcurrency 数量已经填充完毕,我们该如何处理呢?

  • 第一个请求: 创建一个 IValueTaskSource<bool> 对象,并且添加到列表中, 此刻 queueLength=1 , head=1
  • 第二个请求:再创建一个 IValueTaskSource<bool> 对象,再添加到列表中,此刻 queueLength=2, head=2
  • 第三个请求:再创建要给 IValueTaskSoruce<bool> 对象,再添加到列表中,此刻 queueLength=3, head=0
  • 第四个请求: 由于当前的 queueLength=maxQueueCapacity , 则将 head=0 指向的 IValueTaskSource 设置为 false, 表示这个请求被拒绝。然后将这个创建一个新的 IValueTaskSource<bool> 存放到 head=0 的位置,并且将 head 调整为 1.

从中我们可以看出, StackPolicy 采取的策略是当新的请求过来的时候,用 head 指向的最老的 IValueTaskSouce<bool> 对象直接设置为 false ,也就是取消了该操作。

OnExit 方法的调用时候

首先是 head=0 , 将 head 此刻指向的 IValueTaskSource<bool> 对象设置为 true ,并且修改 _queueLength 的大小。

所以, StackPolicy 是如果有新的请求没有空间,则取消最旧的请求;如果有之前的请求完成,则处理最新加入的请求。

--

--

Feng Gao

A software developer in Microsoft at Suzhou. Most articles spoken language is Chinese. I will try with English when I’m ready