加入收藏 | 设为首页 | 会员中心 | 我要投稿 温州站长网 (https://www.0577zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 建站资源 > 优化 > 正文

使用Go处理每分钟百万请求

发布时间:2019-07-03 07:35:29 所属栏目:优化 来源:MarcioCastilho
导读:副标题#e# 这篇文章在medium上很火,作者以实际案例来分析,讲得很好。 我们经常听说使用Go的goroutine和channel很容易实现高并发,那是不是全部代码都放在goroutine中运行就可以实现高并发程序了呢?很显然并不是。这篇文章将教大家如何一步一步写出一个简

我们需要找到另一种的方法。从一开始我们就开始讨论如何让请求处理程序的生命周期尽可能的短,并在后台产生处理。当然,这是在 RubyonRails必须要做的事情,否则,不管你是使用puma,unicorn还是 passenger,你的所有的可用的web worker都将阻塞。

那么我们就需要利用常见的解决方案来完成这项工作,比如Resque,Sidekiq, SQS等。当然还有其他工具,因为有很多方法可以实现。

因此,我们第二次改进是创建一个buffer channel,我们可以将一些作业请求扔进队列并将它们上传到S3,由于我们可以控制队列的最大长度,并且有足够的RAM来排队处理内存中的作业,因此我们认为只要在通道队列中缓冲作业就行了。

  1. var Queue chan Payload 
  2.  
  3. func init() { 
  4.     Queue = make(chan Payload, MAX_QUEUE) 
  5.  
  6. func payloadHandler(w http.ResponseWriter, r *http.Request) { 
  7.     ... 
  8.     // Go through each payload and queue items individually to be posted to S3 
  9.     for _, payload := range content.Payloads { 
  10.         Queue <- payload 
  11.     } 
  12.     ... 

然后,为了将任务从buffer channel中取出并处理它们,我们正在使用这样的方式:

  1. func StartProcessor() { 
  2.     for { 
  3.         select { 
  4.         case job := <-Queue: 
  5.             job.payload.UploadToS3()  // <-- STILL NOT GOOD 
  6.         } 
  7.     } 

说实话,我不知道我们在想什么,这肯定是一个难熬的夜晚。这种方法并没有给我们带来什么提升,我们用一个缓冲的队列替换了有缺陷的并发,也只是推迟了问题的产生时间而已。我们的同步处理器每次只向S3上传一个有效载荷,由于传入请求的速率远远大于单个处理器上传到S3的能力,因此我们的buffer channel迅速达到极限,队列已经阻塞并且无法再往里边添加作业。

我们只是简单的绕过了这个问题,最终导致我们的系统完全崩溃。在我们部署这个有缺陷的版本后,我们的延迟持续的升高。

使用Go处理每分钟百万请求

更好的解决方案

我们决定在Go channel上使用一个通用模式来创建一个 2-tier(双重)channel系统,一个用来处理排队的job,一个用来控制有多少worker在 JobQueue上并发工作。

这个想法是将上传到S3的并行速度提高到一个可持续的速度,同时不会造成机器瘫痪,也不会引发S3的连接错误。

所以我们选择创建一个 Job/Worker模式。对于那些熟悉Java,C#等的人来说,可以将其视为Golang使用channel来实现WorkerThread-Pool的方式。

  1. var ( 
  2.     MaxWorker = os.Getenv("MAX_WORKERS") 
  3.     MaxQueue  = os.Getenv("MAX_QUEUE") 
  4.  
  5. // Job represents the job to be run 
  6. type Job struct { 
  7.     Payload Payload 
  8.  
  9. // A buffered channel that we can send work requests on. 
  10. var JobQueue chan Job 
  11.  
  12. // Worker represents the worker that executes the job 
  13. type Worker struct { 
  14.     WorkerPool  chan chan Job 
  15.     JobChannel  chan Job 
  16.     quit        chan bool 
  17.  
  18. func NewWorker(workerPool chan chan Job) Worker { 
  19.     return Worker{ 
  20.         WorkerPool: workerPool, 
  21.         JobChannel: make(chan Job), 
  22.         quit:       make(chan bool)} 
  23.  
  24. // Start method starts the run loop for the worker, listening for a quit channel in 
  25. // case we need to stop it 
  26. func (w Worker) Start() { 
  27.     go func() { 
  28.         for { 
  29.             // register the current worker into the worker queue. 
  30.             w.WorkerPool <- w.JobChannel 
  31.  
  32.             select { 
  33.             case job := <-w.JobChannel: 
  34.                 // we have received a work request. 
  35.                 if err := job.Payload.UploadToS3(); err != nil { 
  36.                     log.Errorf("Error uploading to S3: %s", err.Error()) 
  37.                 } 
  38.  
  39.             case <-w.quit: 
  40.                 // we have received a signal to stop 
  41.                 return 
  42.             } 
  43.         } 
  44.     }() 
  45.  
  46. // Stop signals the worker to stop listening for work requests. 
  47. func (w Worker) Stop() { 
  48.     go func() { 
  49.         w.quit <- true 
  50.     }() 

(编辑:温州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读