importtimeimportthreadingclassTokenBucket:def__init__(self,capacity:int,refill_rate:float):self.capacity=capacity# 桶最大容量self.refill_rate=refill_rate# 每秒补充令牌数self.tokens=float(capacity)# 初始满桶self.last_refill=time.monotonic()self.lock=threading.Lock()def_refill(self):now=time.monotonic()elapsed=now-self.last_refilladded=elapsed*self.refill_rateself.tokens=min(self.capacity,self.tokens+added)self.last_refill=nowdefallow(self,cost:int=1)->bool:withself.lock:self._refill()ifself.tokens>=cost:self.tokens-=costreturnTruereturnFalse# 使用示例bucket=TokenBucket(capacity=10,refill_rate=2)# 桶容量10,每秒补充2个defhandle_request():ifbucket.allow():return"200 OK"else:return"429 Too Many Requests"
关键特性:允许短时突发——当桶满时连续请求可以瞬间打出 capacity 个,之后降回到 refill_rate 的吞吐速率。适合对突发流量宽容、对长期速率有约束的场景(如 API 限速)。
importtimeimportcollectionsimportthreadingclassSlidingWindowLimiter:def__init__(self,limit:int,window_seconds:float):self.limit=limitself.window=window_secondsself.timestamps=collections.deque()# 有序队列存时间戳self.lock=threading.Lock()defallow(self)->bool:withself.lock:now=time.monotonic()cutoff=now-self.window# 淘汰窗口外的旧请求(队头是最早的)whileself.timestampsandself.timestamps[0]<=cutoff:self.timestamps.popleft()iflen(self.timestamps)<self.limit:self.timestamps.append(now)returnTruereturnFalse# 使用示例limiter=SlidingWindowLimiter(limit=100,window_seconds=60)# 60秒内最多100次defhandle_request():iflimiter.allow():return"200 OK"else:return"429 Too Many Requests"
classBatchTokenBucket:def__init__(self,redis_client,key,capacity,refill_rate,prefetch_size=20):self.r=redis_clientself.key=keyself.capacity=capacityself.refill_rate=refill_rateself.prefetch_size=prefetch_size# 一次从 Redis 取多少令牌self._local_tokens=0self._lock=threading.Lock()# Lua 脚本:原子地从 Redis 桶里取走 N 个令牌LUA_SCRIPT="""
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local cost = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local data = redis.call('HMGET', key, 'tokens', 'ts')
local tokens = tonumber(data[1]) or capacity
local ts = tonumber(data[2]) or now
tokens = math.min(capacity, tokens + (now - ts) * rate)
if tokens >= cost then
tokens = tokens - cost
redis.call('HMSET', key, 'tokens', tokens, 'ts', now)
redis.call('EXPIRE', key, 3600)
return cost
else
local available = math.floor(tokens)
if available > 0 then
redis.call('HMSET', key, 'tokens', 0, 'ts', now)
return available
end
return 0
end
"""def_fetch_tokens(self,amount)->int:"""一次从 Redis 取走 amount 个令牌,返回实际取到的数量"""script=self.r.register_script(self.LUA_SCRIPT)returnint(script(keys=[self.key],args=[self.capacity,self.refill_rate,amount,time.time()]))defallow(self)->bool:withself._lock:ifself._local_tokens>0:self._local_tokens-=1returnTrue# 纯本地,无 Redis# 批量预取got=self._fetch_tokens(self.prefetch_size)ifgot>0:self._local_tokens=got-1returnTruereturnFalse