本系列

前言

这篇文章分析下go开源限流组件juju-ratelimit的使用方式和源码实现细节

源码地址:https://github.com/juju/ratelimit 版本:v1.0.2

其提供了一种高效的令牌桶限流实现

令牌桶相比于其他限流算法(如漏桶算法)的一个显著优势,就是在突发流量到来时,可以短时间内提供更多的处理能力,以应对这些额外的请求

直观上来说,令牌桶算法可以实现为:桶用channel实现

  • 在后台每隔一段固定的时间向桶中发放令牌
  • 要获取令牌时,从channel取数据

后台定时填充令牌:

 func Start(bucket chan struct{}, interval time.Duration) {
 	go func() {
 		ticker := time.NewTicker(interval)
 		defer ticker.Stop()
         // 每隔interval时间往channel中塞一个令牌
 		for range ticker.C {
 			select {
                 // 放令牌
 			case bucket <- struct{}{} :
 				// 桶满了,丢弃
 			default:
 				
 			}
 		}
 	}()
 }

业务请求要获取获取令牌时(非阻塞式):

func GetToken(bucket chan struct{}) bool {
	select {
	case <- bucket:
		return true
	default:
		return false
	}
}

但这样有多少容量就要开多少空间,对内存不友好

更好的方式是只用一个int变量availableTokens维护桶中有多少token:

  1. 每次有请求要获取令牌时,先根据当前时间now减去上次获取令牌的时间last,计算因为这段时间流逝,应该给桶中补充多少令牌,并加到availableTokens
  2. 如果availableTokens < 本次请求的令牌数 request,说明令牌不够。否则令牌够,放行请求,并根据本次需求的令牌数在桶中扣减 :availableTokens -= request

数据结构

type Bucket struct {
	clock Clock

	// bucket创建时间,仅初始化一次,用于计算时间相对偏移量
	startTime time.Time

	// 令牌桶最大容量
	capacity int64

	// 每次放入多少个令牌
	quantum int64

	// 每次放入令牌桶的间隔
	fillInterval time.Duration

	mu sync.Mutex

	// 当前桶中有多少token,注意:可能为负值
	availableTokens int64

	// 上一次访问令牌桶的tick
	latestTick int64
}

注意:桶中剩余令牌availableTokens可能为负值,至于为啥在下文分析扣减令牌流程时说明

对外提供接口

初始化令牌桶

其提供了几种根据不同需求初始化令牌桶的方法:

fillInterval时间放1个令牌,桶容量为capacity

func NewBucket(fillInterval time.Duration, capacity int64) *Bucket 

fillInterval时间放入quantum个令牌

func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket 

这两个方法都是简单设置Bucket的字段,比较简单

每秒放入rate个令牌:

func NewBucketWithRate(rate float64, capacity int64) *Bucket 

其构造方法如下:主要是需要根据参数rate推算出tb.fillInterval以及tb.quantum

func NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket {
    tb := NewBucketWithQuantumAndClock(1, capacity, 1, clock)

    // 每次放入令牌数quantum从1开始,每轮 * 1.1
    for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {
            // 假设quantum=1, rate=10,即每次放1个,每秒放10个 => 则放令牌的间隔是0.1s
            fillInterval := time.Duration(1e9 * float64(quantum) / rate)
            if fillInterval <= 0 {
                    continue
            }
            tb.fillInterval = fillInterval
            tb.quantum = quantum

            // 误差小于0.01就返回tb
            if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {
                    return tb
            }
    }

}

虽然这里用了for循环寻找最合适的fillInterval和quantum,但只要rate小于10亿,都会在执行第一轮循环后退出

所以可以近似看做quantum=1fillInterval=1e9 / rate

例如:quantum=1,当要求 rate=10,即每次放1个,每秒放10个时,计算出来放令牌的时间间隔时是 0.1s

nextQuantum方法:

// 每次增大1.1倍
func nextQuantum(q int64) int64 {
   q1 := q * 11 / 10
   if q1 == q {
      q1++
   }
   return q1
}

Rate方法:根据quantumfillInterval,计算每秒放入多少个令牌

func (tb *Bucket) Rate() float64 {
    // 举个例子:假设每次放入2个,每500ms放一个 => 每秒就放4个
    return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
}

获取令牌

非阻塞获取count个令牌,返回的Duration表示调用者需要等待的时间才能获得令牌

func (tb *Bucket) Take(count int64) time.Duration 

如果在maxWait时间内能获取到count个令牌就获取,否则不获取

返回获取成功时需要等待的时间,是否获取成功

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) 

非阻塞获取最多count个令牌,桶中还有多少获取多少,返回实际获得的令牌数(可能小于count)

func (tb *Bucket) TakeAvailable(count int64) int64

获取count个令牌,并在方法内部等待直到获取到令牌

func (tb *Bucket) Wait(count int64) 

只有当最多阻塞等待maxWait时间,能获取到count个令牌时才等待,并在内部等待。如果不能返回false

func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool

这5个方法中,业务上实用的是WaitMaxDuration,毕竟被限流的请求不能无限等下去

核心方法

adjustavailableTokens

将桶中的令牌数调整为tick时间的令牌数,相当于给桶补充token

  • 计算在tick与上一次lastTick之间能够生产多少个令牌

    • 一般是计算从上次请求到本次请求中产生的令牌数
  • 追加令牌到桶中

func (tb *Bucket) adjustavailableTokens(tick int64) {
    lastTick := tb.latestTick
    tb.latestTick = tick
    if tb.availableTokens >= tb.capacity {
            return
    }
    // 补充令牌
    tb.availableTokens += (tick - lastTick) * tb.quantum
    if tb.availableTokens > tb.capacity {
            tb.availableTokens = tb.capacity
    }
    return
}

其参数tick如何计算的?调currentTick方法

currentTick

计算当前时间与开始时间(桶的初始化时间)之间,需要放入多少次令牌

func (tb *Bucket) currentTick(now time.Time) int64 {
    return int64(now.Sub(tb.startTime) / tb.fillInterval)
}

用于计算当前到哪个tick了

take

Take系列的方法,底层会调到take方法:

Take:

func (tb *Bucket) Take(count int64) time.Duration {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    // 可以等待无限大的时间
    d, _ := tb.take(tb.clock.Now(), count, infinityDuration)
    return d
}

TakeMaxDuration:

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    return tb.take(tb.clock.Now(), count, maxWait)
}

差别在于Take会等待无限大的时间,直到拿到token。TakeMaxDuration最多只会等待maxWait时间


take是获取令牌的核心方法,其流程如下:

  1. 计算出当前时刻可用的令牌数,并补充到令牌桶中
  2. 如果当前令牌桶存量够,在桶中扣减令牌
  3. 如果当前令牌桶存量不够,在桶中预扣减令牌,并返回需要等待的时间waitTime
/** 
当前时间是now
要获取count个令牌
最多等待maxWait时间
*/
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
    if count <= 0 {
            return 0, true
    }

    tick := tb.currentTick(now)
    // 给桶补充token
    tb.adjustavailableTokens(tick)
    avail := tb.availableTokens - count
    // 当前桶中的令牌就能满足需求,直接返回
    if avail >= 0 {
            tb.availableTokens = avail
            return 0, true
    }

    // 到这avail是负数,-avail就是还需要产生多少个令牌,
    // 要产生avail个令牌,还需要多少个tick(向上取整)
    // 再加上当前tick,就是能满足需求的tick
    endTick := tick + (-avail+tb.quantum-1)/tb.quantum
    // 需要等到这个时间才有令牌
    endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
    // 需要等待的时间
    waitTime := endTime.Sub(now)

    // 如果需要等待的时间超过了最大等待时间,则不等待,也不扣减,直接返回
    if waitTime > maxWait {
            return 0, false
    }

    // 这里将availableTokens更新为负值
    tb.availableTokens = avail
    // 返回要等待的时间,且已经在桶里预扣减了令牌
    return waitTime, true
}

怎么理解桶中的availableTokens变为负值?表示有请求已经提前预支了令牌,相当于欠账

  • 之后有请求要获取令牌时,需要先等时间流逝,把这些账还了,才能获取成功

    • TakeAvailable方法
  • 当然也可以在之前已欠账的基础上继续欠账,这样要等待更久的时间才能获取令牌成功

    • Take,TakeMaxDuration方法

例如:令牌桶每秒产生1个令牌,假设在**t1时刻**桶中已经没有令牌了
  1. 请求A在t1时刻调Take()获取5个令牌,将availableTokens更新为-5,并返回5s,表示需要等待5s才能让请求放行
  2. 时间过去1s,此时时刻t2 = t1 + 1s
  3. 请求B在t2时刻调Take()获取5个令牌,首先因为时间流逝,将availableTokens更新为-4。再将availableTokens更新为-4 - 5 = -9,也就是继续欠账。返回9s,表示要等待9s才能让请求放行
    在这里插入图片描述

TakeAvailable

非阻塞获取最多count个令牌,桶中还有多少获取多少,返回实际获得的令牌数(可能小于count)

func (tb *Bucket) TakeAvailable(count int64) int64 {
	tb.mu.Lock()
	defer tb.mu.Unlock()
	return tb.takeAvailable(tb.clock.Now(), count)
}

// 返回可用的令牌数(可能小于count),如果没可用的令牌,将返回0,不会阻塞
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
	if count <= 0 {
		return 0
	}
    // 基于当前时间,给桶补充令牌
	tb.adjustavailableTokens(tb.currentTick(now))
	if tb.availableTokens <= 0 {
		return 0
	}

    // 获取max(count, availableTokens)个令牌,也就是有多少就获取多少
	if count > tb.availableTokens {
		count = tb.availableTokens
	}
	tb.availableTokens -= count
	return count
}

Wait系列

Wait系列的两个方法Wait和WaitMaxDuration就是对Take的封装,也就是如果需要等待,在Wait方法内部等待

func (tb *Bucket) Wait(count int64) {
	if d := tb.Take(count); d > 0 {
        // 在内部等待
		tb.clock.Sleep(d)
	}
}

func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {
	d, ok := tb.TakeMaxDuration(count, maxWait)
	if d > 0 {
		tb.clock.Sleep(d)
	}
	return ok
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部