Tomoki Ota's Blog

article icon

【Go言語】レートリミッターを実装する

作成日 

負荷テスト

負荷テストにはtsenart/vegetaを使用します。 インストールはgo installもしくはbrewでできます。

go install github.com/tsenart/vegeta@latest

使い方は以下のように-rateに1秒で送られるリクエスト数、-durationに負荷をかける時間を指定します。

$ echo "GET http://127.0.0.1:8080/" | vegeta attack -rate=5 -duration=1s | tee results.bin | vege
ta report

フレームワークの実装例

ここではGoのフレームワークのfiberで30秒ごとの最大5リクエストのレートリミッターを導入してみます。

func main() {
    app := fiber.New()
    app.Use(limiter.New(limiter.Config{
        Expiration: 10 * time.Second,
        Max:        5,
    }))
 
    app.Get("/", func(c *fiber.Ctx) error {
        return c.JSON(fiber.Map{
            "status": "OK!",
        })
    })
 
    app.Listen(":8080")
}
$ echo "GET http://127.0.0.1:8080/" | vegeta attack -rate=5 -duration=1s | tee results.bin | vege
ta report
Requests      [total, rate, throughput]  5, 6.25, 0.00
Duration      [total, attack, wait]      800.146501ms, 799.780751ms, 365.75µs
Latencies     [mean, 50, 95, 99, max]    679.075µs, 578.917µs, 1.3525ms, 1.3525ms, 1.3525ms
Bytes In      [total, mean]              85, 17.00
Bytes Out     [total, mean]              0, 0.00
Success       [ratio]                    0.00%
Status Codes  [code:count]               429:5  
Error Set:
429 Too Many Requests
$ echo "GET http://127.0.0.1:8080/" | vegeta attack -rate=50 -duration=1s | tee results.bin | vegeta re
port
Requests      [total, rate, throughput]  50, 50.84, 5.08
Duration      [total, attack, wait]      983.887584ms, 983.383917ms, 503.667µs
Latencies     [mean, 50, 95, 99, max]    406.565µs, 371.792µs, 854.582µs, 1.469874ms, 1.469875ms
Bytes In      [total, mean]              845, 16.90
Bytes Out     [total, mean]              0, 0.00
Success       [ratio]                    10.00%
Status Codes  [code:count]               200:5  429:45  
Error Set:
429 Too Many Requests

config struct の実装は以下のようになっている

Config struct

deprecatedのものは削除している。

type Config struct {
    // Next defines a function to skip this middleware when returned true.
    //
    // Optional. Default: nil
    Next func(c *fiber.Ctx) bool
 
    // Max number of recent connections during `Expiration` seconds before sending a 429 response
    //
    // Default: 5
    Max int
 
    // KeyGenerator allows you to generate custom keys, by default c.IP() is used
    //
    // Default: func(c *fiber.Ctx) string {
    //   return c.IP()
    // }
    KeyGenerator func(*fiber.Ctx) string
 
    // Expiration is the time on how long to keep records of requests in memory
    //
    // Default: 1 * time.Minute
    Expiration time.Duration
 
    // LimitReached is called when a request hits the limit
    //
    // Default: func(c *fiber.Ctx) error {
    //   return c.SendStatus(fiber.StatusTooManyRequests)
    // }
    LimitReached fiber.Handler
 
    // When set to true, requests with StatusCode >= 400 won't be counted.
    //
    // Default: false
    SkipFailedRequests bool
 
    // When set to true, requests with StatusCode < 400 won't be counted.
    //
    // Default: false
    SkipSuccessfulRequests bool
 
    // Store is used to store the state of the middleware
    //
    // Default: an in memory store for this process only
    Storage fiber.Storage
 
    // LimiterMiddleware is the struct that implements a limiter middleware.
    //
    // Default: a new Fixed Window Rate Limiter
    LimiterMiddleware LimiterHandler
}

ConfigのLimiterMiddlewareというフィールドでレートリミッタのミドルウェアを指定する。デフォルトでは、Fixed Window Rate Limiter固定ウィンドカウンタのレートリミッタが使用されている。

Sliding Windowが指定できる。スライディングウィンドウログアルゴリズム

app.Use(limiter.New(limiter.Config{
    LimiterMiddleware: limiter.SlidingWindow{},
}))

以下のようにすると、IPアドレスごとにレートリミットを適応することができる。

app.Use(limiter.New(limiter.Config{
      KeyGenerator: func(c *fiber.Ctx) string {
      return c.IP()
  },
}))

固定ウィンドウカウンタ

fiberの固定ウィンドウカウンタの実装
package limiter
 
import (
    "strconv"
    "sync"
    "sync/atomic"
 
    "github.com/gofiber/fiber/v2"
    "github.com/gofiber/fiber/v2/utils"
)
 
type FixedWindow struct{}
 
// 固定ウィンドウ方式のレートリミッターを実装するミドルウェアの生成
func (FixedWindow) New(cfg Config) fiber.Handler {
    var (
        // 同時アクセスを管理するためのミューテックス
        mux        = &sync.RWMutex{}
        // 最大リクエスト数を文字列に変換(レスポンスヘッダー用)
        max        = strconv.Itoa(cfg.Max)
        // 有効期限(秒単位)を計算
        expiration = uint64(cfg.Expiration.Seconds())
    )
 
    // ストレージ操作を簡略化するためのマネージャーを生成
    manager := newManager(cfg.Storage)
 
    // タイムスタンプを毎秒更新する(グローバルタイムスタンプを使用)
    utils.StartTimeStampUpdater()
 
    // ミドルウェアとして使用される関数を返す
    return func(c *fiber.Ctx) error {
        // cfg.Next が設定されており、その条件を満たす場合は次の処理を実行
        if cfg.Next != nil && cfg.Next(c) {
            return c.Next()
        }
 
        // リクエストから一意のキーを生成(通常はクライアントのIPアドレス)
        key := cfg.KeyGenerator(c)
 
        // ミューテックスをロックして並列処理を防ぐ
        mux.Lock()
 
        // 該当するキーのエントリを取得
        e := manager.get(key)
 
        // 現在のタイムスタンプを取得
        ts := uint64(atomic.LoadUint32(&utils.Timestamp))
 
        // エントリが存在しない場合、または有効期限が切れている場合は初期化
        if e.exp == 0 {
            // 新しい有効期限を設定
            e.exp = ts + expiration
        } else if ts >= e.exp {
            // 有効期限切れの場合、カウンターをリセットし、新しい有効期限を設定
            e.currHits = 0
            e.exp = ts + expiration
        }
 
        // 現在のリクエスト数をインクリメント
        e.currHits++
 
        // ウィンドウリセットまでの残り時間を計算
        resetInSec := e.exp - ts
 
        // 残りの許可されたリクエスト数を計算
        remaining := cfg.Max - e.currHits
 
        // エントリをストレージに更新
        manager.set(key, e, cfg.Expiration)
 
        // ミューテックスを解除
        mux.Unlock()
 
        // 残りのリクエスト数が0を下回る場合、制限に達したと判断
        if remaining < 0 {
            // Retry-After ヘッダーを設定し、制限解除までの時間を通知
            c.Set(fiber.HeaderRetryAfter, strconv.FormatUint(resetInSec, 10))
 
            // 制限に達した場合のハンドラーを呼び出す
            return cfg.LimitReached(c)
        }
 
        // 制限を超えていない場合は、次のミドルウェアまたはハンドラーを実行
        err := c.Next()
 
        // 成功/失敗に応じたリクエストのカウント調整
        if (cfg.SkipSuccessfulRequests && c.Response().StatusCode() < fiber.StatusBadRequest) ||
            (cfg.SkipFailedRequests && c.Response().StatusCode() >= fiber.StatusBadRequest) {
            // ミューテックスを再びロックしてカウンターを調整
            mux.Lock()
            e = manager.get(key)
            e.currHits-- // カウンターをデクリメント
            remaining++
            manager.set(key, e, cfg.Expiration)
            mux.Unlock()
        }
 
        // レートリミット関連のヘッダーをレスポンスに設定
        c.Set(xRateLimitLimit, max)
        c.Set(xRateLimitRemaining, strconv.Itoa(remaining))
        c.Set(xRateLimitReset, strconv.FormatUint(resetInSec, 10))
 
        return err
    }
}
読みやすくした固定ウィンドウカウンタの実装(gptなので動作未検証)
package limiter
 
import (
    "strconv"
    "sync"
    "sync/atomic"
 
    "github.com/gofiber/fiber/v2"
    "github.com/gofiber/fiber/v2/utils"
)
 
type FixedWindow struct{}
 
// New は固定ウィンドウ方式のレートリミッターを実装する
func (FixedWindow) New(cfg Config) fiber.Handler {
    // ミューテックスと設定値を初期化
    mux := &sync.RWMutex{}
    max := strconv.Itoa(cfg.Max)
    expiration := uint64(cfg.Expiration.Seconds())
 
    // ストレージ管理用のマネージャーを作成
    manager := newManager(cfg.Storage)
 
    // タイムスタンプの更新をスタート
    utils.StartTimeStampUpdater()
 
    // レートリミッターの処理を持つ関数を宣言
    handler := func(c *fiber.Ctx) error {
        // 次のミドルウェアをスキップする条件を確認
        if cfg.Next != nil && cfg.Next(c) {
            return c.Next()
        }
 
        // リクエストから一意のキーを生成
        key := cfg.KeyGenerator(c)
 
        // ミューテックスをロック
        mux.Lock()
 
        // キーに対応するエントリを取得
        e := manager.get(key)
 
        // 現在のタイムスタンプを取得
        ts := uint64(atomic.LoadUint32(&utils.Timestamp))
 
        // 新しいエントリを初期化、または期限切れのエントリをリセット
        if e.exp == 0 || ts >= e.exp {
            e.currHits = 0
            e.exp = ts + expiration
        }
 
        // リクエストカウントをインクリメント
        e.currHits++
 
        // 残りのリクエスト数とリセットまでの時間を計算
        resetInSec := e.exp - ts
        remaining := cfg.Max - e.currHits
 
        // エントリを更新
        manager.set(key, e, cfg.Expiration)
 
        // ミューテックスを解除
        mux.Unlock()
 
        // 許可されたリクエスト数を超えた場合の処理
        if remaining < 0 {
            c.Set(fiber.HeaderRetryAfter, strconv.FormatUint(resetInSec, 10))
            return cfg.LimitReached(c)
        }
 
        // 次のハンドラーに処理を渡す
        err := c.Next()
 
        // 成功または失敗リクエストをスキップする設定を確認し、カウントを調整
        if (cfg.SkipSuccessfulRequests && c.Response().StatusCode() < fiber.StatusBadRequest) ||
            (cfg.SkipFailedRequests && c.Response().StatusCode() >= fiber.StatusBadRequest) {
            mux.Lock()
            e = manager.get(key)
            e.currHits-- // カウントをデクリメント
            remaining++
            manager.set(key, e, cfg.Expiration)
            mux.Unlock()
        }
 
        // レートリミット情報をレスポンスヘッダーに追加
        c.Set(xRateLimitLimit, max)
        c.Set(xRateLimitRemaining, strconv.Itoa(remaining))
        c.Set(xRateLimitReset, strconv.FormatUint(resetInSec, 10))
 
        return err
    }
 
    return handler
}

フレームワークを使わずに実装する

ここでは練習のためフレームワークを使わずに標準ライブラリだけで実装していきます。 簡単のためリーキーバケットアルゴリズムで実装していきます。

type LeakyBucket struct {
    capacity  int
    rate      time.Duration
    water     int
    lastCheck time.Time
    mu        sync.Mutex
}
 
func NewLeakyBucket(capacity int, rate time.Duration) *LeakyBucket {
    return &LeakyBucket{
        capacity:  capacity,
        rate:      rate,
        lastCheck: time.Now(),
    }
}
 
func (b *LeakyBucket) Allow() bool {
    b.mu.Lock()
    defer b.mu.Unlock()
 
    now := time.Now()
    elapsed := now.Sub(b.lastCheck)
    b.lastCheck = now
 
    leakAmount := int(elapsed / b.rate)
    if leakAmount > 0 {
        b.water -= leakAmount
        if b.water < 0 {
            b.water = 0
        }
    }
 
    if b.water < b.capacity {
        b.water++
        return true
    }
    return false
}
 
var bucket = NewLeakyBucket(1, time.Second)
 
func handler(w http.ResponseWriter, r *http.Request) {
    if !bucket.Allow() {
        http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
        return
    }
    fmt.Fprintf(w, "Hello, World!")
}
 
func main() {
    serverAddr := "0.0.0.0:8080"
    http.HandleFunc("/", handler)
    fmt.Println("Starting server on", serverAddr)
    err := http.ListenAndServe(serverAddr, nil)
    if err != nil {
        fmt.Println("Error starting server:", err)
    }
}
この記事をシェアするx icon
アイコン画像
Tomoki Ota

フルスタックエンジニア。Goが好き。趣味はカメラと旅行です📷