1. Overview
在任何时刻,计算资源总是稀缺资源。为了保证我们的服务能够保证服务请求的质量,通常只会允许一部分请求同时处理。对于剩下的请求,直接返回失败的结果。这也是应对故障的一种方式,为了保证部分请求的 SLA (Service Level Agreement),而牺牲掉一部分请求的方式。
ConcurrencyLimiter
这个中间件可以帮助实现这个功能。它借助了 IQueuePolicy
这个组件来实现请求访问量的限制。这个接口包含了如下的方法:
TryEnetryAsync
方法回一个ValueTask
对象,如果得到结果是true
,则该请求能够继续处理下去;如果得到的结果是false
, 则拒绝该请求OnExit
方法会在请求完成之后调用,这样允许后续的请求得到处理的机会。
QueuePolicyOptions
包含下面的属性
MaxConcurrentRequests
指定最大并发数量RequestQueueLimit
如果请求数量大于并发数量,这里并不会直接拒绝掉请求,而是将他们存放到队列中,等待处理,那么队列长度由这个字段指定。
IQueuePolicy
有两种实现:
QueuePolicy
StackPolicy
接下来我们探究一下这两个的实现
2. QueuePolicy
QueuePolicy
借助了 SemaphoreSlim
对象控制了并发执行的数量,在初始化大小就是 maxConcurrentRequests
值。
在 TryEnterAsync
方法中,如果当前的请求量大于 maxConcurrentRequest + requestQueueLimit
的时候,直接返回 False
; 否则通过调用 SemaphoreSlim
的 WaitAsync
方法。
如果当前数量小于 maxConcurrentRequests
时候, WaitAsync
方法会直接返回一个完成的 Task
, 否则返回一个还没有完成的 Task
, 这个 Task
只有等到调用 Release
方法的时候,才会返回。
3. StackPolicy
在 QueuePolicy
的实现中,在 maxConcurrentRequest + requestQueueLimit
之后的请求,都会被舍弃掉。而 StackPolicy
则采取截然不同的策略。假设我么我们现在的 _maxQueueCapacity=3
,当 MaxConcurrency
数量已经填充完毕,我们该如何处理呢?
- 第一个请求: 创建一个
IValueTaskSource<bool>
对象,并且添加到列表中, 此刻queueLength=1
,head=1
- 第二个请求:再创建一个
IValueTaskSource<bool>
对象,再添加到列表中,此刻queueLength=2, head=2
- 第三个请求:再创建要给
IValueTaskSoruce<bool>
对象,再添加到列表中,此刻queueLength=3, head=0
- 第四个请求: 由于当前的
queueLength=maxQueueCapacity
, 则将head=0
指向的IValueTaskSource
设置为 false, 表示这个请求被拒绝。然后将这个创建一个新的IValueTaskSource<bool>
存放到head=0
的位置,并且将head
调整为 1.
从中我们可以看出, StackPolicy
采取的策略是当新的请求过来的时候,用 head
指向的最老的 IValueTaskSouce<bool>
对象直接设置为 false
,也就是取消了该操作。
OnExit
方法的调用时候
首先是 head=0
, 将 head 此刻指向的 IValueTaskSource<bool>
对象设置为 true
,并且修改 _queueLength
的大小。
所以, StackPolicy
是如果有新的请求没有空间,则取消最旧的请求;如果有之前的请求完成,则处理最新加入的请求。