2017年1月24日火曜日

Goで毎分100万リクエストを処理する(一万同時アクセス問題を余裕でクリアします!)

勉強の為に引用しました。

I quoted it for studying
http://postd.cc/handling-1-million-requests-per-minute-with-go/

20158 17
Goで毎分100 万リクエストを処理する


本記事は、原著者の許諾のもとに翻訳・掲載しております。

Malwarebytesは、驚くべき成長を見せています。 1年以上前にこのシリコンバレーの会社に入社して以来、私の主な仕事は急成長するセキュリティ企業の力となるシステムの設計と開発です。日々数百万人が利用する製品をサポートするために必要な、全ての基盤をつくります。私は12 年以上、アンチウイルスとアンチマルウェアに関わるいくつかの会社で働いてきました。毎日処理する膨大なデータのせいで、これらのシステムがどれだけ複雑なものになるかを理解しています。
面白いことに、ここ9年ほどで私が携わったWeb のバックエンド開発のほとんどは、Ruby on Railsが使われていました。誤解されないように言っておきますが、私はRuby on Rails が大好きですし、すばらしい環境だと思っています。しかし、Rubyでシステムを設計し始めると忘れてしまうのは、マルチスレッド化や並列化、高速化、メモリオーバーヘッドの低減に力を入れられれば、ソフトウェアのアーキテクチャが効率的で単純なものになるということです。長年、私は C/C++Delphi、そして C#のデベロッパをやっていますが、適切なツールを使えばジョブの複雑さを減らせることに最近気づきました。
私は設計者の代表として、インターネットで常に行われているプログラミング言語・フレームワーク戦争があまり好きではありません。効率性や生産性、そしてコードの保守性は、どれだけシンプルにソリューションを設計できるかにかかっていると信じています。
問題点
匿名のテレメトリと分析システムを使って仕事をしていたときの目標は、多数のエンドポイントからの大量のPOSTリクエストを処理できるようにすることでした。 Webハンドラは、多くのペイロードのコレクションを含むJSONドキュメントを受け取ります。これは後々 MapReduceをこのデータ上で操作できるように、Amazon S3に書き込む必要があります。
私たちはWorker-Tierのアーキテクチャの作成について調査しており、次のようなものを利用しています。
  • Sidekiq
  • Resque
  • Delayed_job
  • Elastic BeanstalkWorkerTier
  • RabbitMQ
  • その他
2つの異なるクラスタをセットアップするのですが、1つは webのフロントエンドのためのクラスタ、もう1つは Workerのためのクラスタです。これで操作可能なバックグラウンドの仕事を拡張することができます。
しかし当初より、私たちのチームはGoを使用するべきだと思っていました。話し合いの段階で、このシステムのトラフィックは膨大になりそうだったからです。私は 2年ほどGoを使っています。仕事でいくつかのシステムを開発しましたが、これだけの量のロードが可能なものは 1つもありませんでした。
私たちはいくつかの構造体を作成し、POSTの呼び出しを通して受け取る Webリクエストのペイロードと、S3のバケットにアップロードするためのメソッドを定義することから始めました。
  1. type PayloadCollection struct {
  2.     WindowsVersion  string    `json:"version"`
  3.     Token           string    `json:"token"`
  4.     Payloads        []Payload `json:"data"`
  5. }
  6.  
  7. type Payload struct {
  8.     // [redacted]
  9. }
  10.  
  11. func (p *Payload) UploadToS3() error {
  12.     // the storageFolder method ensures that there are no name collision in
  13.     // case we get same timestamp in the key name
  14.     storage_path := fmt.Sprintf("%v/%v" , p.storageFolder, time.Now ().UnixNano())
  15.  
  16.     bucket := S3Bucket
  17.  
  18.     b := new(bytes.Buffer )
  19.     encodeErr := json.NewEncoder(b ).Encode(payload )
  20.     if encodeErr != nil {
  21.         return encodeErr
  22.     }
  23.  
  24.     // Everything we post to the S3 bucket should be marked 'private'
  25.     var acl = s3.Private
  26.     var contentType = "application/octet-stream"
  27.  
  28.     return bucket.PutReader(storage_path , b, int64( b.Len()), contentType, acl, s3.Options{})
  29. }
Goルーチンへの素朴なアプローチ
初めに非常に素朴な方法で、POST ハンドラの実装を行いました。ジョブプロセスをシンプルな goroutineで並行処理します。
  1. func payloadHandler(w http.ResponseWriter , r *http.Request) {
  2. if r.Method != "POST" {
  3.     w.WriteHeader(http.StatusMethodNotAllowed )
  4.         return
  5.     }
  6.  
  7.     // Read the body into a string for json decoding
  8.     var content = &PayloadCollection{}
  9.     err := json.NewDecoder(io.LimitReader (r.Body, MaxLength )).Decode(&content )
  10.     if err != nil {
  11.         w.Header().Set ("Content-Type", "application/json ; charset=UTF-8")
  12.         w.WriteHeader(http.StatusBadRequest )
  13.         return
  14.     }
  15.     // Go through each payload and queue items individually to be posted to S3
  16.     for _, payload := range content.Payloads {
  17.         go payload.UploadToS3()   // <----- DON'T DO THIS
  18.     }
  19.  
  20.     w.WriteHeader(http.StatusOK )
  21. }
  22.  
中程度のロードであれば、ほとんどの人にとっては問題ないはずですが、大きなスケールになると、うまくいかないことは即座に明らかになりました。最初のバージョンをデプロイした際、多くのリクエストが来るとは思っていましたが、それが桁違いの量になるとは想定していませんでした。トラフィック量の見積もりを完全に誤ったのです。
上記のアプローチには、いくつか問題点が挙げられます。Goルーチンをどのくらい生成するかコントロールできないのがひとつ、また、毎分 100万のPOSTリクエストを取得すると、このコードはすぐにクラッシュし、炎上してしまいました。
もう一度チャレンジ
というわけで、別の方法が必要です。私たちはリクエストハンドラの寿命をごく短くすることと、同時にバックグラウンドでプロセスを生成することの重要性を議論し始めました。もちろん、これはRuby on Rails 界においては当然なされるべきことですが、さもなければpumaunicorn あるいはpassengerを用いて、全てのWorker web プロセッサをブロックしているでしょう(JRubyの議論はここでは忘れてください)。そこで、Resque SidekiqSQS といったありふれた解決法にレバレッジを効かせれば良かったのでは、という話になりました。そのやり方であればいくらでも案が出てきます。
2番目のイテレーションは、複数のジョブを待機させ、S3にアップロードさせるバッファチャネルを作ることでした。私たちはジョブの待機列のアイテム最大数をコントロールできた上、全てのジョブをメモリに並べられるだけの十分な RAMを保持していたので、ただジョブをチャネル列にバッファすれば良いだろうと考えました。
  1. var Queue chan Payload
  2.  
  3. func init() {
  4.     Queue = make(chan Payload , MAX_QUEUE)
  5. }
  6.  
  7. func payloadHandler(w http.ResponseWriter , r *http.Request) {
  8.     ...
  9.     // Go through each payload and queue items individually to be posted to S3
  10.     for _, payload := range content.Payloads {
  11.         Queue <- payload
  12.     }
  13.     ...
  14. }
そして、ジョブをデキューしプロセスするのに、下記のような方法を使いました。
  1. func StartProcessor() {
  2.     for {
  3.         select {
  4.         case job := <-Queue:
  5.             job.payload.UploadToS3()  // <-- STILL NOT GOOD
  6.         }
  7.     }
  8. }
正直なところ、何も考えていなかったということです。きっと何本ものレッドブルを共にした深夜だったのでしょう。このアプローチには何も利点がありませんでした。欠陥だらけの同時実行と引き換えに、問題を先延ばしするだけのバッファされたキューを得ました。私たちの同期プロセッサは一度に1 ペイロードだけをS3にアップロードするという代物であり、受信リクエストのレートは1 プロセッサがS3にアップロードする能力を大幅に超えていたので、バッファチャネルはすぐに限界に達し、リクエストハンドラが次のアイテムを並べようとする動作をブロックしてしまいました。
単純に問題を避けようとしたところ、かえってシステム滅亡のカウントダウンを始めてしまいました。この欠陥バージョンをデプロイして以来、レイテンシレートはコンスタントに上がり続けてしまっていました。
cloudwatch-latency.png
より良い解決法
私たちは、Goチャネルを使う際、ある一般的なパターンを活用することに決めました。ジョブを待機させ、 JobQueueにおいて同時に動作するWorkerの数をコントロールする 2-Tierのチャネルシステムを作るためです。
このアイデアは、マシンを故障させず、S3から接続エラーを引き起こさない、ある程度の持続可能なレートまで S3のアップロードを並列化処理することでした。そこでジョブWorkerパターンを作ることを選択しました。 JavaC#などをよく知っている人たちは、 Worker threadのスレッドプールをチャネルで代用して実装するというGoでのやり方だと考えてください。
  1. var (
  2.     MaxWorker = os.Getenv("MAX_WORKERS" )
  3.     MaxQueue  = os.Getenv("MAX_QUEUE" )
  4. )
  5.  
  6. // Job represents the job to be run
  7. type Job struct {
  8.     Payload Payload
  9. }
  10.  
  11. // A buffered channel that we can send work requests on.
  12. var JobQueue chan Job
  13.  
  14. // Worker represents the worker that executes the job
  15. type Worker struct {
  16.     WorkerPool  chan chan Job
  17.     JobChannel  chan Job
  18.     quit        chan bool
  19. }
  20.  
  21. func NewWorker(workerPool chan chan Job ) Worker {
  22.     return Worker{
  23.         WorkerPool: workerPool,
  24.         JobChannel: make(chan Job ),
  25.         quit:       make(chan bool )}
  26. }
  27.  
  28. // Start method starts the run loop for the worker, listening for a quit channel in
  29. // case we need to stop it
  30. func (w Worker) Start() {
  31.     go func() {
  32.         for {
  33.             // register the current worker into the worker queue.
  34.             w.WorkerPool <- w.JobChannel
  35.  
  36.             select {
  37.             case job := <-w.JobChannel:
  38.                 // we have received a work request.
  39.                 if err := job.Payload.UploadToS3(); err != nil {
  40.                     log.Errorf("Error uploading to S3: %s" , err.Error())
  41.                 }
  42.  
  43.             case <-w.quit:
  44.                 // we have received a signal to stop
  45.                 return
  46.             }
  47.         }
  48.     }()
  49. }
  50.  
  51. // Stop signals the worker to stop listening for work requests.
  52. func (w Worker) Stop() {
  53.     go func() {
  54.         w.quit <- true
  55.     }()
  56. }
  57.  
Webリクエストハンドラをモディファイし、ペイロードでJob struct のインスタンスを作って、WorkerがピックアップできるようJobQueue チャネルに送ります。
  1. func payloadHandler(w http.ResponseWriter , r *http.Request) {
  2.  
  3.     if r.Method != "POST" {
  4.         w.WriteHeader(http.StatusMethodNotAllowed )
  5.         return
  6.     }
  7.  
  8.     // Read the body into a string for json decoding
  9.     var content = &PayloadCollection{}
  10.     err := json.NewDecoder(io.LimitReader (r.Body, MaxLength )).Decode(&content )
  11.     if err != nil {
  12.         w.Header().Set ("Content-Type", "application/json ; charset=UTF-8")
  13.         w.WriteHeader(http.StatusBadRequest )
  14.         return
  15.     }
  16.  
  17.     // Go through each payload and queue items individually to be posted to S3
  18.     for _, payload := range content.Payloads {
  19.  
  20.         // let's create a job with the payload
  21.         work := Job{Payload: payload}
  22.  
  23.         // Push the work onto the queue.
  24.         JobQueue <- work
  25.     }
  26.  
  27.     w.WriteHeader(http.StatusOK )
  28. }
  29.  
Webサーバを初期化する間にDispatcherを作り、 Workerのプールの作成とJobQueueに現れるジョブをリスニングし始めるため、 Run()を呼び出します。
  1. dispatcher := NewDispatcher(MaxWorker )
  2. dispatcher.Run()
  3.  
以下はディスパッチャを実行するためのコードです。
  1. type Dispatcher struct {
  2.     // A pool of workers channels that are registered with the dispatcher
  3.     WorkerPool chan chan Job
  4. }
  5.  
  6. func NewDispatcher(maxWorkers int ) *Dispatcher {
  7.     pool := make(chan chan Job , maxWorkers)
  8.     return &Dispatcher{WorkerPool: pool}
  9. }
  10.  
  11. func (d *Dispatcher) Run() {
  12.     // starting n number of workers
  13.     for i := 0; i < d.maxWorkers ; i++ {
  14.         worker := NewWorker(d.pool )
  15.         worker.Start()
  16.     }
  17.  
  18.     go d.dispatch()
  19. }
  20.  
  21. func (d *Dispatcher) dispatch() {
  22.     for {
  23.         select {
  24.         case job := <-JobQueue:
  25.             // a job request has been received
  26.             go func(job Job ) {
  27.                 // try to obtain a worker job channel that is available.
  28.                 // this will block until a worker is idle
  29.                 jobChannel := <-d.WorkerPool
  30.  
  31.                 // dispatch the job to the worker job channel
  32.                 jobChannel <- job
  33.             }(job )
  34.         }
  35.     }
  36. }
  37.  
インスタンスが生成され、Workerのプールに付け加えられる最大の Workerの数を与えることに注意してください。このプロジェクトのために、Docker化された Go環境を用いてAmazon Elasticbeanstalkを利用してきたので、常に成果物の中でシステムのコンフィギュレーションを行うために12-factor の方法論に従うようにしてきました。そして環境変数からこれらの値を読み込みます。Workerの数とジョブキューの最大数をコントロールすることができたので、これらの値に手を加えるのにクラスタの再デプロイする必要がありません。
  1. var (
  2.     MaxWorker = os.Getenv("MAX_WORKERS" )
  3.     MaxQueue  = os.Getenv("MAX_QUEUE" )
  4. )
  5.  
即席の結果
デプロイをした直後に、レイテンシレートがわずかな数値にまで下がり、ハンドラリクエストをコントロールする能力が激しく高まっているのが見られました。
cloudwatch-console.png
Elastic Load Balancersの準備ができた後に、ElasticBeanstalk アプリケーションサーバが毎分100万リクエストにまで近づいていることが分かりました。毎分100 万よりもトラフィックが急上昇する深夜帯の時間があります。
新しいコードをデプロイすると、たちまちサーバの数は100から約 20まで著しく下降しました。
elasticbeanstalk-healthy-hosts.png
クラスタとauto-scalingセッティングに適切なコンフィギュレーションを行った後は、 4つのEC2 c4.Largeにまで下げることができるようになりました。 CPU5分間絶えずに 90%を上回った際には新しいインスタンスを生成するようElastic Auto-Scalingを設定しました。
elasticbeanstalk-production-dashboard.png
結論
私は、シンプルさこそが常に正解だと考えています。多くのキュー、バックグラウンドのWorker、複雑なデプロイを用いて複雑なシステムを構築することができますが、代わりに Elasticbeanstalk auto-scalingの力と、Golangがもたらす即使用可能な並行処理に対する効率かつシンプルなアプローチを利用することに決めました。
私が現在使っているMacBook Proよりもよほど貧弱なマシンたった 4台が、毎分100万回 Amazon S3バケットに書き込んでいるPOSTリクエストを処理するなんて、そうそうあることではありません。
ジョブのための正しいツールは常に存在します。時折Ruby on Railシステムがとても強力な Webハンドラを必要とする時は、単純だけれど強力な代替可能なソリューションのためにrubyのエコシステムから少し離れて考えてみてください。
//block-text-content social
// social
翻訳に対するフィードバックがございましたら、メール または GitHubIssue よりお寄せください。

0 コメント:

コメントを投稿