限流 (Rate Limiting) 是保护后端服务、API 接口和数据库等资源的重要手段,尤其在处理高并发请求时。通过限制在特定时间窗口内允许的请求数量,限流可以防止系统过载、拒绝服务攻击 (DoS/DDoS) 和资源耗尽,从而保证服务的稳定性和可用性。
核心思想:限流算法通过控制请求的到达速率或处理速率,确保系统的负载在可接受的范围内,避免因突发流量导致服务崩溃。
一、为什么需要限流?
- 防止系统过载:当请求量超出系统处理能力时,限流可以拒绝一部分请求,保证剩余请求能够正常响应,而不是所有请求都失败。
- 避免雪崩效应:在微服务架构中,一个服务过载可能导致其依赖的服务也跟着过载,最终演变成整个系统的瘫痪。限流可以切断这种连锁反应。
- 保护下游资源:数据库、缓存、第三方 API 等资源通常更加脆弱,限流可以保护它们免受过高压力的冲击。
- 资源公平分配:对于多租户或多用户系统,限流可以确保每个用户或租户都能获得公平的资源配额。
- 防止恶意攻击:例如 DoS/DDoS 攻击,通过限制请求速率可以有效缓解攻击对系统的影响。
- 费用控制:对于按请求量付费的第三方服务,限流可以有效控制成本。
二、常用限流算法
本节将详细介绍三种最常用的限流算法:固定窗口计数器、滑动窗口计数器、漏桶算法 和 令牌桶算法,并提供它们的 Go 语言实现。
2.1 固定窗口计数器 (Fixed Window Counter)
2.1.1 算法原理
固定窗口计数器算法是最简单、最容易理解的限流算法。它在一个固定的时间窗口(例如 1 分钟)内统计请求数量。当请求到来时,计数器加一。如果计数器值超过预设的阈值,则拒绝该请求。当时间窗口结束时,计数器清零,开始下一个窗口的计数。
优点:实现简单,易于理解。
缺点:存在“临界点问题”。在窗口的开始和结束交界处,可能会在短时间内涌入双倍于阈值的请求,导致瞬时流量超过系统承载能力。例如,限流 100 QPS,窗口是 1 秒。在第 0.9 秒时来了 100 个请求,在第 1.1 秒时又来了 100 个请求,那么在 0.9 到 1.1 秒这 0.2 秒内,系统处理了 200 个请求,是阈值的两倍。
2.1.2 Go 语言实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| package main
import ( "fmt" "sync" "time" )
type FixedWindowLimiter struct { sync.Mutex windowSize time.Duration threshold int counter int lastReset time.Time }
func NewFixedWindowLimiter(windowSize time.Duration, threshold int) *FixedWindowLimiter { return &FixedWindowLimiter{ windowSize: windowSize, threshold: threshold, counter: 0, lastReset: time.Now(), } }
func (l *FixedWindowLimiter) Allow() bool { l.Lock() defer l.Unlock()
now := time.Now()
if now.Sub(l.lastReset) >= l.windowSize { l.counter = 0 l.lastReset = now }
if l.counter < l.threshold { l.counter++ return true }
return false }
func main() { limiter := NewFixedWindowLimiter(time.Second, 3)
fmt.Println("=== 固定窗口限流器测试 ===") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) if limiter.Allow() { fmt.Printf("请求 %d 允许通过\n", i+1) } else { fmt.Printf("请求 %d 拒绝\n", i+1) } }
fmt.Println("\n等待 1 秒后...") time.Sleep(1 * time.Second)
for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) if limiter.Allow() { fmt.Printf("请求 %d 允许通过\n", i+1) } else { fmt.Printf("请求 %d 拒绝\n", i+1) } } }
|
运行结果示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| === 固定窗口限流器测试 === 请求 1 允许通过 请求 2 允许通过 请求 3 允许通过 请求 4 拒绝 请求 5 拒绝 请求 6 拒绝 请求 7 拒绝 请求 8 拒绝 请求 9 拒绝 请求 10 拒绝
等待 1 秒后... 请求 1 允许通过 请求 2 允许通过 请求 3 允许通过 请求 4 拒绝 请求 5 拒绝
|
2.2 滑动窗口计数器 (Sliding Window Counter)
2.2.1 算法原理
滑动窗口计数器算法是固定窗口计数器的改进版,旨在解决临界点问题。它将一个大的时间窗口(如 1 分钟)划分为更多小的时间片(如 10 个 6 秒的窗口)。每个小时间片都有独立的计数器。
当请求到来时,它会落入当前的小时间片。我们计算当前大窗口内的请求总数,这个总数是当前小时间片的计数,加上前面若干个完整小时间片的计数,再加上前一个小时间片中未满部分的请求计数。
更经典的实现方式:
存储每个请求的时间戳在一个队列 (或切片) 中。当新请求到来时,删除所有超过当前时间窗口的旧请求。然后判断剩余请求的数量是否小于阈值。
优点:解决了固定窗口的临界点问题,平滑了流量。
缺点:实现相对复杂,需要存储请求的时间戳,占用内存。如果请求量非常大,存储和清理时间戳的开销会比较高。
2.2.2 Go 语言实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| package main
import ( "fmt" "sync" "time" )
type SlidingWindowLimiter struct { sync.Mutex windowSize time.Duration threshold int timestamps []time.Time }
func NewSlidingWindowLimiter(windowSize time.Duration, threshold int) *SlidingWindowLimiter { return &SlidingWindowLimiter{ windowSize: windowSize, threshold: threshold, timestamps: make([]time.Time, 0), } }
func (l *SlidingWindowLimiter) Allow() bool { l.Lock() defer l.Unlock()
now := time.Now() idx := 0 for i := 0; i < len(l.timestamps); i++ { if now.Sub(l.timestamps[i]) < l.windowSize { l.timestamps[idx] = l.timestamps[i] idx++ } } l.timestamps = l.timestamps[:idx]
if len(l.timestamps) < l.threshold { l.timestamps = append(l.timestamps, now) return true }
return false }
func main() { limiter := NewSlidingWindowLimiter(time.Second, 3)
fmt.Println("=== 滑动窗口限流器测试 ===") for i := 0; i < 3; i++ { time.Sleep(100 * time.Millisecond) if limiter.Allow() { fmt.Printf("请求 %d 允许通过\n", i+1) } else { fmt.Printf("请求 %d 拒绝\n", i+1) } } time.Sleep(400 * time.Millisecond)
for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) if limiter.Allow() { fmt.Printf("请求 %d 允许通过\n", i+4) } else { fmt.Printf("请求 %d 拒绝\n", i+4) } } fmt.Println("\n等待 1 秒后 (大部分请求已从窗口中移除)...") time.Sleep(1 * time.Second)
for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) if limiter.Allow() { fmt.Printf("请求 %d 允许通过\n", i+1) } else { fmt.Printf("请求 %d 拒绝\n", i+1) } } }
|
运行结果示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| === 滑动窗口限流器测试 === 请求 1 允许通过 请求 2 允许通过 请求 3 允许通过 请求 4 拒绝 请求 5 拒绝 请求 6 拒绝 请求 7 拒绝 请求 8 拒绝
等待 1 秒后 (大部分请求已从窗口中移除)... 请求 1 允许通过 请求 2 允许通过 请求 3 允许通过 请求 4 拒绝 请求 5 拒绝
|
可以看到,在高频请求下,滑动窗口能更平滑地拒绝请求,避免固定窗口的瞬时流量峰值问题。
2.3 漏桶算法 (Leaky Bucket)
2.3.1 算法原理
漏桶算法的核心思想是:所有的请求都会先进入一个“桶”中,桶的容量有限。请求以恒定的速率从桶中流出(被处理)。
如果请求到达时桶是满的,那么该请求会被丢弃(拒绝)。
优点:能够平滑突发流量,使输出速率保持恒定。
缺点:无法有效地处理突发流量。即使系统具备处理短时突发的能力,漏桶算法也会将请求均匀化处理,可能导致资源利用率不足。
graph TD
A[请求流入] --> B[漏桶]
B -- 容量满 --> C{丢弃请求}
B -- 固定速率流出 --> D[请求处理]
2.3.2 Go 语言实现
漏桶算法通常使用一个固定大小的缓冲队列和 Goroutine 来模拟漏出过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| package main
import ( "fmt" "time" )
type LeakyBucketLimiter struct { capacity int rate time.Duration bucket chan struct{} closeChan chan struct{} }
func NewLeakyBucketLimiter(capacity int, rate time.Duration) *LeakyBucketLimiter { limiter := &LeakyBucketLimiter{ capacity: capacity, rate: rate, bucket: make(chan struct{}, capacity), closeChan: make(chan struct{}), } go limiter.leak() return limiter }
func (l *LeakyBucketLimiter) leak() { ticker := time.NewTicker(l.rate) defer ticker.Stop()
for { select { case <-ticker.C: select { case <-l.bucket: default: } case <-l.closeChan: fmt.Println("漏桶已关闭.") return } } }
func (l *LeakyBucketLimiter) Allow() bool { select { case l.bucket <- struct{}{}: return true default: return false } }
func (l *LeakyBucketLimiter) Close() { close(l.closeChan) close(l.bucket) }
func main() { limiter := NewLeakyBucketLimiter(3, 200 * time.Millisecond) defer limiter.Close()
fmt.Println("=== 漏桶限流器测试 ===") for i := 0; i < 15; i++ { time.Sleep(50 * time.Millisecond) if limiter.Allow() { fmt.Printf("请求 %d 允许通过 (进入桶)\n", i+1) } else { fmt.Printf("请求 %d 拒绝 (桶已满)\n", i+1) } }
fmt.Println("\n等待一段时间,看桶中请求是否漏出...") time.Sleep(1 * time.Second)
fmt.Println("\n再次测试漏桶...") for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) if limiter.Allow() { fmt.Printf("请求 %d 允许通过 (进入桶)\n", i+1) } else { fmt.Printf("请求 %d 拒绝 (桶已满)\n", i+1) } } }
|
运行结果示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| === 漏桶限流器测试 === 请求 1 允许通过 (进入桶) 请求 2 允许通过 (进入桶) 请求 3 允许通过 (进入桶) 请求 4 拒绝 (桶已满) 请求 5 拒绝 (桶已满) 请求 6 拒绝 (桶已满) ... (直到请求 15 都是拒绝)
等待一段时间,看桶中请求是否漏出... (此处 Goroutine 在后台持续漏出请求)
再次测试漏桶... 请求 1 允许通过 (进入桶) 请求 2 允许通过 (进入桶) 请求 3 允许通过 (进入桶) 请求 4 拒绝 (桶已满) 请求 5 拒绝 (桶已满)
|
可以看到,在突发流量下,漏桶很快就满了,后续请求都被拒绝。但桶内的请求仍然以恒定速率处理。
2.4 令牌桶算法 (Token Bucket)
2.4.1 算法原理
令牌桶算法是目前最常用且最灵活的限流算法之一。它的工作原理是:
- 一个固定容量的“令牌桶”会以恒定的速率往里添加令牌。
- 每个请求到来时,需要从桶中获取一个令牌。
- 如果桶中有足够的令牌,请求就可以通过,并消耗一个令牌。
- 如果桶中没有令牌,请求可以选择等待令牌的生成,或者直接被拒绝。
优点:
- 允许一定程度的突发流量:桶的容量决定了可以累积的最大令牌数,也就是允许通过的最大突发请求数。
- 输出速率可控:令牌生成速率控制了长期来看的平均处理速率。
- 实现灵活:可以很容易地调整桶容量和生成速率。
缺点:实现比计数器复杂,但比漏桶更灵活。
graph TD
A["令牌生成器 (固定速率)"] --> B[令牌桶]
B -- 含有令牌 --> C[请求通过]
D[请求到达] --> B
B -- 无令牌 --> E{请求等待/拒绝}
2.4.2 Go 语言实现
Go 语言标准库 golang.org/x/time/rate 包提供了高度优化和生产可用的令牌桶限流器。这里我们先实现一个简化版的,再介绍标准库的使用。
简化版实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| package main
import ( "fmt" "sync" "time" )
type TokenBucketLimiter struct { sync.Mutex capacity int tokens int rate time.Duration lastRefill time.Time }
func NewTokenBucketLimiter(capacity int, rate time.Duration) *TokenBucketLimiter { return &TokenBucketLimiter{ capacity: capacity, tokens: capacity, rate: rate, lastRefill: time.Now(), } }
func (l *TokenBucketLimiter) refill() { now := time.Now() duration := now.Sub(l.lastRefill) tokensToAdd := int(duration / l.rate) if tokensToAdd > 0 { l.tokens += tokensToAdd if l.tokens > l.capacity { l.tokens = l.capacity } l.lastRefill = now } }
func (l *TokenBucketLimiter) Allow() bool { l.Lock() defer l.Unlock()
l.refill()
if l.tokens >= 1 { l.tokens-- return true }
return false }
func main() { limiter := NewTokenBucketLimiter(5, 200 * time.Millisecond)
fmt.Println("=== 令牌桶限流器测试 ===") for i := 0; i < 15; i++ { time.Sleep(50 * time.Millisecond) if limiter.Allow() { fmt.Printf("请求 %d 允许通过 (消耗令牌)\n", i+1) } else { fmt.Printf("请求 %d 拒绝 (无可用令牌)\n", i+1) } }
fmt.Println("\n等待 1 秒钟,桶中应补充 5 个令牌...") time.Sleep(1 * time.Second)
for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) if limiter.Allow() { fmt.Printf("请求 %d 允许通过 (消耗令牌)\n", i+16) } else { fmt.Printf("请求 %d 拒绝 (无可用令牌)\n", i+16) } } }
|
运行结果示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| === 令牌桶限流器测试 === 请求 1 允许通过 (消耗令牌) 请求 2 允许通过 (消耗令牌) 请求 3 允许通过 (消耗令牌) 请求 4 允许通过 (消耗令牌) 请求 5 允许通过 (消耗令牌) 请求 6 拒绝 (无可用令牌) 请求 7 拒绝 (无可用令牌) 请求 8 拒绝 (无可用令牌) 请求 9 拒绝 (无可用令牌) 请求 10 拒绝 (无可用令牌) 请求 11 拒绝 (无可用令牌) 请求 12 拒绝 (无可用令牌) 请求 13 拒绝 (无可用令牌) 请求 14 拒绝 (无可用令牌) 请求 15 拒绝 (无可用令牌)
等待 1 秒钟,桶中应补充 5 个令牌... 请求 16 允许通过 (消耗令牌) 请求 17 允许通过 (消耗令牌) 请求 18 允许通过 (消耗令牌) 请求 19 允许通过 (消耗令牌) 请求 20 允许通过 (消耗令牌) 请求 21 拒绝 (无可用令牌) 请求 22 拒绝 (无可用令牌) 请求 23 拒绝 (无可用令牌) 请求 24 拒绝 (无可用令牌) 请求 25 拒绝 (无可用令牌)
|
可以看到,在突发流量下,令牌桶允许了前 5 个请求通过(容量为 5),超出容量的请求则被拒绝。等待一段时间后,桶中再次有了令牌,又能够处理请求。这说明它能有效缓冲突发流量。
使用 golang.org/x/time/rate 标准库
Go 语言官方提供了 golang.org/x/time/rate 包,它实现了令牌桶算法,并且经过了高度优化,是生产环境的首选。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| package main
import ( "context" "fmt" "time"
"golang.org/x/time/rate" )
func main() { limiter := rate.NewLimiter(rate.Limit(3), 5)
fmt.Println("=== 标准库 rate.Limiter 测试 ===")
fmt.Println("\n--- Allow (非阻塞) ---") for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond) if limiter.Allow() { fmt.Printf("Allow 请求 %d 允许通过\n", i+1) } else { fmt.Printf("Allow 请求 %d 拒绝\n", i+1) } }
fmt.Println("\n等待 1 秒,桶中应补充令牌...") time.Sleep(time.Second)
fmt.Println("\n--- Wait (阻塞) ---") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel()
for i := 0; i < 10; i++ { start := time.Now() err := limiter.WaitN(ctx, 1) if err != nil { fmt.Printf("Wait 请求 %d 失败: %v (耗时 %v)\n", i+1, err, time.Since(start)) break } fmt.Printf("Wait 请求 %d 允许通过 (耗时 %v)\n", i+1, time.Since(start)) time.Sleep(100 * time.Millisecond) }
fmt.Println("\n--- Burst (容量) 测试 ---") limiter2 := rate.NewLimiter(rate.Limit(3), 5) for i := 0; i < 10; i++ { start := time.Now() err := limiter2.Wait(context.Background()) if err != nil { fmt.Printf("Burst 请求 %d 失败: %v (耗时 %v)\n", i+1, err, time.Since(start)) break } fmt.Printf("Burst 请求 %d 允许通过 (耗时 %v)\n", i+1, time.Since(start)) } }
|
运行结果示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| === 标准库 rate.Limiter 测试 ===
--- Allow (非阻塞) --- Allow 请求 1 允许通过 Allow 请求 2 允许通过 Allow 请求 3 允许通过 Allow 请求 4 允许通过 Allow 请求 5 允许通过 Allow 请求 6 拒绝 Allow 请求 7 拒绝 Allow 请求 8 拒绝 Allow 请求 9 拒绝 Allow 请求 10 拒绝
等待 1 秒,桶中应补充令牌...
--- Wait (阻塞) --- Wait 请求 1 允许通过 (耗时 53.792µs) Wait 请求 2 允许通过 (耗时 333.399625ms) Wait 请求 3 允许通过 (耗时 333.359208ms) Wait 请求 4 允许通过 (耗时 333.35925ms) Wait 请求 5 允许通过 (耗时 333.359208ms) Wait 请求 6 允许通过 (耗时 333.359209ms) Wait 请求 7 允许通过 (耗时 333.359208ms) Wait 请求 8 失败: context deadline exceeded (耗时 333.359167ms) Wait 请求 9 失败: context deadline exceeded (耗时 0s) Wait 请求 10 失败: context deadline exceeded (耗时 0s)
--- Burst (容量) 测试 --- Burst 请求 1 允许通过 (耗时 12.042µs) Burst 请求 2 允许通过 (耗时 6.417µs) Burst 请求 3 允许通过 (耗时 5.75µs) Burst 请求 4 允许通过 (耗时 6.042µs) Burst 请求 5 允许通过 (耗时 6.166µs) Burst 请求 6 允许通过 (耗时 333.359209ms) Burst 请求 7 允许通过 (耗时 333.359209ms) Burst 请求 8 允许通过 (耗时 333.359208ms) Burst 请求 9 允许通过 (耗时 333.359208ms) Burst 请求 10 允许通过 (耗时 333.359209ms)
|
通过 rate.Limiter 的 Allow() 和 Wait() 方法,我们可以灵活地选择非阻塞或阻塞式的限流策略。Wait() 尤其适用于需要平滑输出速率的场景,因为它会主动等待直到服务容量允许。
三、限流算法的选择与实践
3.1 选择哪个算法?
- 固定窗口计数器:实现最简单,但有临界点问题,不推荐用于精确限流。
- 滑动窗口计数器:解决了固定窗口的临界点问题,比固定窗口更平滑,但内存开销较大。适用于对平滑度要求较高,但请求量不是特别巨大的场景。
- 漏桶算法:强制平滑输出速率,适合需要严格控制下游服务压力的场景,不适合处理突发流量。
- 令牌桶算法:最常用和最灵活的算法。它既能控制平均速率,又允许一定程度的突发流量,能更好地利用系统资源。Go 语言的
rate.Limiter 是生产环境的理想选择。
3.2 实践中的考虑
- 限流粒度:
- 接口级别:通常对每个 API 接口进行限流。
- 用户级别:限制每个用户的请求速率,防止单个用户滥用。
- 服务级别:限制整个服务对外请求的总量。
- IP 级别:根据客户端 IP 进行限流,防止特定 IP 的攻击。
- 分布式限流:
上述实现都是单机限流。在分布式系统中,需要借助外部存储(如 Redis)来同步多个限流器的状态。
- 基于 Redis 的计数器:利用 Redis 的
INCR 和 EXPIRE 命令实现固定/滑动窗口计数器。
- 基于 Redis Sorted Set 的滑动窗口:将请求时间戳存入 Sorted Set,通过
ZREMRANGEBYSCORE 移除过期时间戳,ZCARD 获取总数。
- 基于 Redis 的令牌桶(Redisssemphore):利用
SETNX 和 EXPIRE 实现简单的令牌桶。也有更复杂的基于 Lua 脚本的实现,保证原子性。
- 熔断 (Circuit Breaker) 与降级 (Degradation):限流是预防手段,而熔断和降级是系统在出现问题后的恢复手段。它们通常需要配合使用,共同提高系统的韧性。
- 动态配置:生产环境中的限流参数(阈值、窗口大小等)最好能够动态调整,无需重启服务。
- 监控与告警:对限流器的拒绝率、通过率等指标进行监控,并设置告警,及时发现和处理问题。
四、总结
限流是构建高可用、高并发系统的基石。根据不同的业务场景和对流量平滑性、突发处理能力的要求,可以选择合适的限流算法。在 Go 语言中,对于单机限流,golang.org/x/time/rate 包提供的令牌桶算法是功能最强大、最推荐的解决方案。对于分布式限流,则需要结合 Redis 等外部存储来实现。理解并正确应用这些限流策略,能够有效保护系统资源,提升服务的稳定性和用户体验。