1. 🐱令牌桶算法是什么
目前做服务端的比较熟悉的一个词是限流,是作为对于拥有非常大QPS的重要接口做容灾处理的一个必不可少的部分。
令牌桶算法就是一种限流算法,核心组成部分为:
- 令牌桶:一个容器,用于存放令牌。
- 令牌添加速率:系统按照固定的速率(例如,每秒添加r个令牌)向桶中添加令牌。
- 桶的容量:桶中最多可以存放的令牌数量(记为b)。当桶满时,新添加的令牌将被丢弃。
- 请求处理:每个请求需要消耗一定数量的令牌(通常为1个,但也可以根据请求的权重消耗多个)。如果桶中有足够的令牌,则请求被处理,并消耗相应数量的令牌;否则,请求被拒绝(或等待直到有足够的令牌)。
2. Go的实现
- 令牌桶用有缓冲区的channel实现,并且
并发进行按照固定速率向令牌桶添加令牌,以及请求来了之后消耗令牌两个任务。 - 用户自定义路由限流规则,使用正则过滤不合法的接口,带/*为模糊匹配。
- 使用gin请求中间件来实现对接口的拦截。
- 监控
CPU负载, 动态调整桶大小
1.结构体定义
type LimitRate struct {
Rate int64 //限速,最大的QPS
Resource []string //拦截的路由,支持/*
RejectStrategy int //拒绝策略
TimeInterval int64 //令牌桶每隔多少毫秒放置令牌,默认100ms
/***以上用户自定义*/
/**以下内部的变量*/
buffer int64 //桶大小
bucket chan struct{} //令牌桶
limitNumber int64 //每TimeInterval往桶里放的令牌数
waitQueue []*WaitQueue //等待队列
waitMaxSize int //等待队列最大长度
waitMaxTime int //等待队列中等待最长时间,ms
maxQueueSize int //等待队列最大长度
}
2.初始化变量
- 桶大小定义为最大QPS大小
- 令牌桶间隔默认为100毫秒
- 令牌数= Rate / 100ms
3.令牌桶放置令牌
func (lm *LimitRate) PutToken() {
ticker := time.NewTicker(time.Duration(lm.TimeInterval) * time.Millisecond)
for {
select {
case <-ticker.C:
for i := 0; i < lm.limitNumber && len(lm.bucket) < cap(lm.bucket); i++ {
go func() { lm.bucket <- struct{}{} }()
}
}
}
}
4.令牌消费&监控CPU负载
请求到达,消费令牌
func (lm *LimitRate) IsAllowRequest() bool {
select {
case <-lm.bucket:
return true
default:
//拒绝策略
return false
}
}
监控CPU负载,第三方库:"github.com/shirou/gopsutil/v3/cpu"
- 负载75%~100%时线性缩放
- 25是为了归一化存在,因为percent[0]范围在[75,100]。
func (lm *LimitRate) stateCpu() {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
// 系统整体 CPU 使用率
percent, _ := cpu.Percent(1*time.Second, false)
log.Printf("Total CPU Usage: %.2f%%\n", percent[0])
if percent[0] >= 75 {
// 负载75%~100%时线性缩放
scaleFactor := 1.0 - (percent[0]-75)/25
newBuffer := int64(scaleFactor * float64(atomic.LoadInt64(&lm.Rate)))
atomic.StoreInt64(&lm.buffer, newBuffer)
lm.resizeBucket(lm.buffer)
} else {
atomic.StoreInt64(&lm.buffer, atomic.LoadInt64(&lm.Rate))
lm.resizeBucket(lm.buffer)
}
}
}
}
5.中间件定义
只监听用户定义的资源,其余接口直接放行。超过限速则返回http429错误码。
func (lm *LimitRate) ListenRequestMiddleWare() gin.HandlerFunc {
return func(c *gin.Context) {
//只限制指定接口
if lm.analyseRoutes(c.Request.URL.Path) {
if lm.IsAllowRequest() {
c.Next()
} else {
c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": "Too many requests"})
return
}
} else {
c.Next()
}
}
}
6.示例
package main
import (
"net/http"
"github.com/D-Watson/limit_rate/token_bucket"
"github.com/gin-gonic/gin"
)
func Hello(c *gin.Context) {
name := c.Param("name")
action := c.Param("action")
c.String(http.StatusOK, name+" is "+action)
}
func InitRoute() *gin.Engine {
r := gin.Default()
v1 := r.Group("/v1")
{
v1.GET("/hello", Hello)
}
return r
}
func main() {
r := InitRoute()
tb := &token_bucket.LimitRate{
Resource: []string{"/hello"},
Rate: 1e6,
}
r.Use(tb.ListenRequestMiddleWare())
r.Run(":8080")
}
7.完整代码
🐱👋 http://github.com/D-Watson/limit_rate
