Go Lib ratelimit 的悲观锁与乐观锁

2020-10-18

在 Gin 的官方示例中有这么一个限流示例:

func leakBucket() gin.HandlerFunc {
	prev := time.Now()
	return func(ctx *gin.Context) {
    limit.Take()
    // ...
	}
}

完整示例见: https://github.com/gin-gonic/examples/blob/master/ratelimiter/

leakBucket 是一个 Gin 的 middleware, limit.Take() 是由 go.uber.org/ratelimit 提供的限流方法.

如果设置 RPS 为 100: ratelimit.New(100), 那么每个请求之间的间隔为 1s / 100 = 10ms, limit.Take() 会保证请求间隔不小于 10ms.

ratelimiter 的文档里说这实现了 leaky-bucket rate limit algorithm, 但读了源码之后发现…

源码结构

https://github.com/uber-go/ratelimit

.
├── CHANGELOG.md
├── LICENSE
├── Makefile
├── README.md
├── go.mod
├── go.sum
├── internal
│   └── clock
│       ├── clock.go
│       ├── interface.go
│       ├── real.go
│       └── timers.go
├── mutexbased.go
├── ratelimit.go
├── ratelimit_bench_test.go
├── ratelimit_test.go
└── tools
    ├── go.mod
    └── go.sum

package clock

我们先看一下 internal/clock/, 其中 clock.go 是一堆 Mock, 可以暂时忽略, timers.go 没啥用也忽略.

interface.go 声明了 clock 的接口:

type Clock interface {
	AfterFunc(d time.Duration, f func())
	Now() time.Time
	Sleep(d time.Duration)
}

作者的意图是让 Clock 可以由其他实现替换, 他自己写的实现在 real.go 中:

package clock

import "time"

// clock implements a real-time clock by simply wrapping the time package functions.
type clock struct{}

// New returns an instance of a real-time clock.
func New() Clock {
	return &clock{}
}

func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) }

func (c *clock) AfterFunc(d time.Duration, f func()) {
	// TODO maybe return timer interface
	time.AfterFunc(d, f)
}

func (c *clock) Now() time.Time { return time.Now() }

func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }

clock 类型是空 struct, 作用只是充当实现接口的骨架, 接口方法代理给 time 包中的方法.

对这个库来说, 有用的方法只有 Now()Sleep(), 你肯定也猜到了, 限流的核心就是让后面的请求睡觉😴

mutexbased.go

这是库包含了两种实现, mutexbased.go 是基于 mutex 的版本, 更容易理解.

type mutexLimiter struct {
	sync.Mutex
	last       time.Time // 上一个请求的到达时刻
	sleepFor   time.Duration // 需要等待的时间
	perRequest time.Duration // 由 RPS 计算得出的请求间隔
	maxSlack   time.Duration // 强制设置的请求间隔
	clock      Clock // 计时器: 获得当前时间 / 休眠一段时间
}

使用 mutex 的作用是, 当多个携程同时要求限流时, 通过抢夺mutex锁来串行化, 从而达到排队的效果.

Take() 限流的原理是比较当前时间跟上次请求的时间差:

  • 如果大于限流间隔, Take() 立即返回, 也就不影响新请求的执行;
  • 如果小于限流间隔, 通过 sleep 把间隔补齐到要求的间隔, 然后再返回.

ratelimit.go

ratelimit.go 是 Gin 示例使用的版本, 是基于 sync/atomic 实现的.

type state struct {
	last     time.Time
	sleepFor time.Duration
}

type limiter struct {
	state unsafe.Pointer
	//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
	// of this rate limiter in case of collocation with other frequently accessed memory.
	padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.

	perRequest time.Duration
	maxSlack   time.Duration
	clock      Clock
}

这里把 lastsleepFor 属性单独拆到一个 state 结构体中, 便于在 sync.atom 中存取.

限流思路跟上面一样, 区别是锁的控制.

mutexbased.go 使用 mutex 是一种悲观锁, 无论请求频率如何, 都有 mutex 的操作.

ratelimit.go 利用 sync.atom, 使用 CompareAndSwap 的方式, 先计算时间间隔, 在给赋值前检查state:

  • 如果没更新过, 说明没有其他携程的并发干扰, 可以赋值;
  • 如果被更新过, 那么就重新计算时间间隔, 重走当前流程.