LOADING...

加载过慢请开启缓存(浏览器默认开启)

loading

令牌桶限流算法-Go实现

2025/8/7 golang

1. 🐱令牌桶算法是什么

目前做服务端的比较熟悉的一个词是限流,是作为对于拥有非常大QPS的重要接口做容灾处理的一个必不可少的部分。
令牌桶算法就是一种限流算法,核心组成部分为:

  • 令牌桶:一个容器,用于存放令牌。
  • 令牌添加速率:系统按照固定的速率(例如,每秒添加r个令牌)向桶中添加令牌。
  • 桶的容量:桶中最多可以存放的令牌数量(记为b)。当桶满时,新添加的令牌将被丢弃。
  • 请求处理:每个请求需要消耗一定数量的令牌(通常为1个,但也可以根据请求的权重消耗多个)。如果桶中有足够的令牌,则请求被处理,并消耗相应数量的令牌;否则,请求被拒绝(或等待直到有足够的令牌)。

2. Go的实现

  • 令牌桶用有缓冲区的channel实现,并且并发进行按照固定速率向令牌桶添加令牌,以及请求来了之后消耗令牌两个任务。
  • 用户自定义路由限流规则,使用正则过滤不合法的接口,带/*为模糊匹配。
  • 使用gin请求中间件来实现对接口的拦截。
  • 监控CPU负载, 动态调整桶大小

1.结构体定义

type LimitRate struct {
    Rate           int64    //限速,最大的QPS
    Resource       []string //拦截的路由,支持/*
    RejectStrategy int      //拒绝策略
    TimeInterval   int64    //令牌桶每隔多少毫秒放置令牌,默认100ms
    /***以上用户自定义*/
    /**以下内部的变量*/
    buffer       int64         //桶大小
    bucket       chan struct{} //令牌桶
    limitNumber  int64         //每TimeInterval往桶里放的令牌数
    waitQueue    []*WaitQueue  //等待队列
    waitMaxSize  int           //等待队列最大长度
    waitMaxTime  int           //等待队列中等待最长时间,ms
    maxQueueSize int           //等待队列最大长度
}

2.初始化变量

  • 桶大小定义为最大QPS大小
  • 令牌桶间隔默认为100毫秒
  • 令牌数= Rate / 100ms

3.令牌桶放置令牌

func (lm *LimitRate) PutToken() {
    ticker := time.NewTicker(time.Duration(lm.TimeInterval) * time.Millisecond)
    for {
        select {
        case <-ticker.C:
            for i := 0; i < lm.limitNumber && len(lm.bucket) < cap(lm.bucket); i++ {
                go func() { lm.bucket <- struct{}{} }()
            }
        }
    }
}

4.令牌消费&监控CPU负载

请求到达,消费令牌

func (lm *LimitRate) IsAllowRequest() bool {
    select {
    case <-lm.bucket:
        return true
    default:
        //拒绝策略
        return false
    }
}

监控CPU负载,第三方库:"github.com/shirou/gopsutil/v3/cpu"

  • 负载75%~100%时线性缩放
  • 25是为了归一化存在,因为percent[0]范围在[75,100]。
func (lm *LimitRate) stateCpu() {
    ticker := time.NewTicker(time.Second)
    for {
        select {
        case <-ticker.C:
            // 系统整体 CPU 使用率
            percent, _ := cpu.Percent(1*time.Second, false)
            log.Printf("Total CPU Usage: %.2f%%\n", percent[0])
            if percent[0] >= 75 {
                // 负载75%~100%时线性缩放
                scaleFactor := 1.0 - (percent[0]-75)/25
                newBuffer := int64(scaleFactor * float64(atomic.LoadInt64(&lm.Rate)))
                atomic.StoreInt64(&lm.buffer, newBuffer)
                lm.resizeBucket(lm.buffer)
            } else {
                atomic.StoreInt64(&lm.buffer, atomic.LoadInt64(&lm.Rate))
                lm.resizeBucket(lm.buffer)
            }
        }
    }
}

5.中间件定义

只监听用户定义的资源,其余接口直接放行。超过限速则返回http429错误码。

func (lm *LimitRate) ListenRequestMiddleWare() gin.HandlerFunc {
    return func(c *gin.Context) {
        //只限制指定接口
        if lm.analyseRoutes(c.Request.URL.Path) {
            if lm.IsAllowRequest() {
                c.Next()
            } else {
                c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": "Too many requests"})
                return
            }
        } else {
            c.Next()
        }
    }
}

6.示例

package main

import (
    "net/http"

    "github.com/D-Watson/limit_rate/token_bucket"
    "github.com/gin-gonic/gin"
)

func Hello(c *gin.Context) {
    name := c.Param("name")
    action := c.Param("action")
    c.String(http.StatusOK, name+" is "+action)
}

func InitRoute() *gin.Engine {
    r := gin.Default()
    v1 := r.Group("/v1")
    {
        v1.GET("/hello", Hello)
    }
    return r
}

func main() {
    r := InitRoute()
    tb := &token_bucket.LimitRate{
        Resource: []string{"/hello"},
        Rate:     1e6,
    }
    r.Use(tb.ListenRequestMiddleWare())
    r.Run(":8080")
}

7.完整代码

🐱👋 http://github.com/D-Watson/limit_rate

img_show