Featured image of post 限流的实现和问题

限流的实现和问题

浅谈两种限流算法的实现和特点以及一些思考

限流(Rate Limiting)是服务端的核心防护机制,主要有两种经典策略:令牌桶(Token Bucket)滑动窗口(Sliding Window)

先来看 Claude 生成的一个交互式演示 HTML,形象地体现出了上述两种方案的区别(推荐使用 Firefox、 Google Chrome 等浏览器查看):

令牌桶(Token Bucket)

以固定速率往桶里补充令牌;每次请求消耗一枚令牌,桶空则拒绝。允许短时突发(桶满时)。

8 2/s
8
当前令牌数
0
通过请求
0
拒绝请求
令牌桶(蓝=有令牌,灰=空位)
请求历史 · ●通过 ●拒绝

滑动窗口(Sliding Window)

维护一段时间内的请求记录;窗口随时间向右滑动,超出的旧请求自动丢弃,只统计窗口内的计数。

5s 5
0
窗口内请求数
0
通过请求
0
拒绝请求
滑动窗口(蓝色区域 = 窗口范围)
请求历史 · ●通过 ●拒绝

令牌桶

核心思路:一个有容量上限的桶,系统以固定速率向桶中补充令牌。每次请求到来时取走一枚令牌,桶空则拒绝。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
import threading

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity        # 桶最大容量
        self.refill_rate = refill_rate  # 每秒补充令牌数
        self.tokens = float(capacity)   # 初始满桶
        self.last_refill = time.monotonic()
        self.lock = threading.Lock()

    def _refill(self):
        now = time.monotonic()
        elapsed = now - self.last_refill
        added = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + added)
        self.last_refill = now

    def allow(self, cost: int = 1) -> bool:
        with self.lock:
            self._refill()
            if self.tokens >= cost:
                self.tokens -= cost
                return True
            return False

# 使用示例
bucket = TokenBucket(capacity=10, refill_rate=2)  # 桶容量10,每秒补充2个

def handle_request():
    if bucket.allow():
        return "200 OK"
    else:
        return "429 Too Many Requests"

关键特性:允许短时突发——当桶满时连续请求可以瞬间打出 capacity 个,之后降回到 refill_rate 的吞吐速率。适合对突发流量宽容、对长期速率有约束的场景(如 API 限速)。

滑动窗口

核心思路:维护一个时间窗口,记录窗口内每个请求的时间戳。每次新请求到来前,先淘汰所有"已过期"的旧记录,再判断当前窗口内的请求数是否超限。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import time
import collections
import threading

class SlidingWindowLimiter:
    def __init__(self, limit: int, window_seconds: float):
        self.limit = limit
        self.window = window_seconds
        self.timestamps = collections.deque()  # 有序队列存时间戳
        self.lock = threading.Lock()

    def allow(self) -> bool:
        with self.lock:
            now = time.monotonic()
            cutoff = now - self.window

            # 淘汰窗口外的旧请求(队头是最早的)
            while self.timestamps and self.timestamps[0] <= cutoff:
                self.timestamps.popleft()

            if len(self.timestamps) < self.limit:
                self.timestamps.append(now)
                return True
            return False

# 使用示例
limiter = SlidingWindowLimiter(limit=100, window_seconds=60)  # 60秒内最多100次

def handle_request():
    if limiter.allow():
        return "200 OK"
    else:
        return "429 Too Many Requests"

与固定窗口计数器的区别:固定窗口在两个窗口交界处会出现"双倍流量"漏洞(窗口末尾 + 新窗口开头各来一批),滑动窗口通过精确追踪时间戳彻底解决了这个问题。

一些思考

分布式场景下的性能问题

单机内存实现在多实例部署时会失效,需要借助 Redis 做共享状态。

令牌桶用 Redis:用 Lua 脚本保证原子性,避免 TOCTOU 竞争:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-- token_bucket.lua
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local cost = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now

-- 补充令牌
local elapsed = now - last_refill
tokens = math.min(capacity, tokens + elapsed * refill_rate)

if tokens >= cost then
    tokens = tokens - cost
    redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
    redis.call('EXPIRE', key, 3600)
    return 1  -- 允许
else
    redis.call('HMSET', key, 'last_refill', now)
    return 0  -- 拒绝
end

滑动窗口用 Redis ZSet:利用有序集合天然按时间戳排序的特性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import redis
import time

r = redis.Redis()

def sliding_window_redis(user_id: str, limit: int, window: int) -> bool:
    key = f"ratelimit:{user_id}"
    now = time.time()
    cutoff = now - window

    pipe = r.pipeline()
    pipe.zremrangebyscore(key, '-inf', cutoff)   # 清理过期
    pipe.zadd(key, {str(now): now})              # 记录当前请求
    pipe.zcard(key)                              # 计数
    pipe.expire(key, window + 1)                 # 设置 TTL
    results = pipe.execute()

    count = results[2]
    if count > limit:
        r.zrem(key, str(now))  # 超限则回滚
        return False
    return True

但是每次请求都需要至少一次 Redis 往返(RTT),在高并发场景下都可能成为瓶颈,两个方案分别可以这样优化:

令牌桶可以一次请求预申请多个令牌:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class BatchTokenBucket:
    def __init__(self, redis_client, key, capacity, refill_rate,
                 prefetch_size=20):
        self.r = redis_client
        self.key = key
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.prefetch_size = prefetch_size   # 一次从 Redis 取多少令牌
        
        self._local_tokens = 0
        self._lock = threading.Lock()
    
    # Lua 脚本:原子地从 Redis 桶里取走 N 个令牌
    LUA_SCRIPT = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local rate = tonumber(ARGV[2])
    local cost = tonumber(ARGV[3])
    local now = tonumber(ARGV[4])
    local data = redis.call('HMGET', key, 'tokens', 'ts')
    local tokens = tonumber(data[1]) or capacity
    local ts = tonumber(data[2]) or now
    tokens = math.min(capacity, tokens + (now - ts) * rate)
    if tokens >= cost then
        tokens = tokens - cost
        redis.call('HMSET', key, 'tokens', tokens, 'ts', now)
        redis.call('EXPIRE', key, 3600)
        return cost
    else
        local available = math.floor(tokens)
        if available > 0 then
            redis.call('HMSET', key, 'tokens', 0, 'ts', now)
            return available
        end
        return 0
    end
    """
    
    def _fetch_tokens(self, amount) -> int:
        """一次从 Redis 取走 amount 个令牌,返回实际取到的数量"""
        script = self.r.register_script(self.LUA_SCRIPT)
        return int(script(keys=[self.key],
                          args=[self.capacity, self.refill_rate,
                                amount, time.time()]))
    
    def allow(self) -> bool:
        with self._lock:
            if self._local_tokens > 0:
                self._local_tokens -= 1
                return True  # 纯本地,无 Redis
            
            # 批量预取
            got = self._fetch_tokens(self.prefetch_size)
            if got > 0:
                self._local_tokens = got - 1
                return True
            return False

也可以把一个大桶拆成多个小桶,每个实例随机打到一个分片,分散写压力,即令牌桶分片:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import random

class ShardedTokenBucket:
    def __init__(self, redis_client, key, capacity, refill_rate, shards=10):
        self.r = redis_client
        self.base_key = key
        self.shards = shards
        # 每个分片分配 capacity/shards 个令牌,速率也均分
        self.shard_capacity = capacity // shards
        self.shard_rate = refill_rate / shards
    
    def allow(self) -> bool:
        shard = random.randint(0, self.shards - 1)
        key = f"{self.base_key}:shard:{shard}"
        # 对单个分片执行 Lua 脚本,分散了 Redis 的写热点
        return self._lua_allow(key, self.shard_capacity, self.shard_rate)

而滑动窗口可以在每个服务实例内存里维护一个本地小窗口,批量向 Redis 预申请配额,而不是每次请求都打 Redis:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import threading
import time

class LocalBatchSlidingWindow:
    def __init__(self, redis_client, key, global_limit, window, 
                 local_quota=50, sync_interval=0.1):
        self.r = redis_client
        self.key = key
        self.global_limit = global_limit
        self.window = window
        self.local_quota = local_quota       # 每次向 Redis 预申请的配额
        self.sync_interval = sync_interval   # 多久同步一次
        
        self._local_remaining = 0            # 本地剩余配额
        self._lock = threading.Lock()
        self._last_sync = 0

    def _fetch_quota_from_redis(self) -> int:
        """向 Redis 申请一批配额,返回实际获得的数量"""
        now = time.time()
        cutoff = now - self.window
        
        pipe = self.r.pipeline()
        pipe.zremrangebyscore(self.key, '-inf', cutoff)
        pipe.zcard(self.key)
        results = pipe.execute()
        current_count = results[1]
        
        available = self.global_limit - current_count
        quota = min(self.local_quota, max(0, available))
        
        if quota > 0:
            # 批量预占:插入 quota 个未来时间戳作为占位符
            batch = {f"_pre_{now}_{i}": now for i in range(quota)}
            self.r.zadd(self.key, batch)
            self.r.expire(self.key, int(self.window) + 1)
        
        return quota

    def allow(self) -> bool:
        with self._lock:
            if self._local_remaining > 0:
                self._local_remaining -= 1
                return True   # 纯内存操作,无 Redis 开销
            
            # 本地配额耗尽,去 Redis 补充
            quota = self._fetch_quota_from_redis()
            if quota > 0:
                self._local_remaining = quota - 1
                return True
            return False

当然,无论用哪种限流策略,都可以在流量进入服务之前就拦截,用 lua-resty-limit-traffic 在 Nginx 里做内存级令牌桶,完全不走应用层和 Redis,延迟降到微秒级:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# nginx.conf
lua_shared_dict my_limit_req_store 100m;  # 共享内存,所有 worker 共用

location /api/ {
    access_by_lua_block {
        local limit_req = require "resty.limit.req"
        local lim = limit_req.new("my_limit_req_store", 2000, 1000)
        -- 2000 req/s,允许 1000 的突发
        local delay, err = lim:incoming(ngx.var.binary_remote_addr, true)
        if not delay then
            if err == "rejected" then
                ngx.exit(429)
            end
        end
    }
    proxy_pass http://backend;
}

那么问题来了。

为什么不直接在 Nginx 侧用 OpenResty 实现令牌桶或滑动窗口限流,而要在后端服务中做呢?

原因有二:

1. Nginx 层访问 Redis 的代价

Nginx 本来是纯内存、零阻塞的热路径。一旦每个请求都要等 Redis,Nginx 的延迟优势就消失了。而且 Nginx 的 cosocket(非阻塞网络)虽然不阻塞 event loop,但连接池管理、错误处理、Redis 超时降级的复杂度全部需要用 Lua 手写,运维成本很高。

这里我个人还在尝试中遇到一个坑,OpenResty 是无法像 Python、Java 等程序一样直接读取环境变量的。所以不想在 nginx.conf 或者 systemd 的配置中明文配置 Redis 口令的话,需要在配置中配置加密后的口令,并配合外部解密脚本使用,非常麻烦。

2. 限流逻辑往往依赖业务语义,Nginx 层拿不到

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
-- Nginx 能拿到的信息
ngx.var.remote_addr        -- IP
ngx.req.get_headers()      -- HTTP 头
ngx.var.uri                -- 路径

-- 但限流规则经常需要
user_id          -- 需要解析 JWT / Session,Nginx 层做很重
account_tier     -- 需要查数据库(免费用户100次/天,付费用户10000次/天)
api_key_quota    -- 需要关联业务数据
request_cost     -- 不同操作消耗不同令牌数(上传视频 vs 查询列表)

解析 JWT、查用户等级这些事放在 Nginx Lua 里做,实际上是把业务逻辑下沉到了基础设施层,职责严重混乱。

两层限流不是替代关系,而是各司其职。所以工程实践上的共识是:Nginx 做防御性的粗粒度限流(不访问任何外部存储),应用层做业务语义的精细限流(Redis 协调全局状态),两层互补,而不是用一层替代另一层。

鉴权应该在限流前做吗?

一般情况下,不建议把鉴权放在限流之前,顺序通常是:

先做基础限流 → 再做鉴权 → 再做精细限流(按用户)

其原因是鉴权本身有计算成本,而限流的目的之一就是保护这些昂贵操作。

鉴权通常涉及:

  • 解析并验证 JWT 签名(RSA/ECDSA 非对称加密,CPU 密集)
  • 查询 Session Store / Redis
  • 查数据库验证 API Key 是否有效、是否被吊销

如果攻击者用 10 万 QPS 打过来,先做鉴权意味着每个请求都要跑一次 JWT 验证或 Redis 查询,限流反而成了摆设。先限流把绝大多数请求在入口拦截,鉴权只面对漏过来的合法流量。

那问题又来了。

如果攻击者用 10 万 QPS 打过来,那正常用户的请求也会被限流吧?

关键在于让正常用户尽早建立身份。攻击者打的是匿名流量,而已认证的正常用户带着身份,两个配额池完全独立。攻击者把匿名池打满,不影响已登录用户的配额。同时配合分层漏斗、特征识别等手段,而非直接粗暴根据 IP 限流(公司、学校、运营商 NAT 后面可能有成千上万人,共享出口 IP 的正常用户也被拦)。

当攻击规模极大时,识别已经来不及,要在架构层面做隔离。例如大厂(CloudFlare、AWS Shield等)对抗 DDoS 的核心思路就是流量按身份路由到不同集群,把攻击流量和正常流量物理隔离。攻击者打垮匿名集群,登录用户集群以及 API Key 集群(按合同配额)完全不受影响。

Licensed under CC BY-NC-SA 4.0
Built with Hugo
Theme Stack designed by Jimmy