【译文】原文地址
问题
从事匿名遥测和分析系统,我们的目标是能够处理来自大量客户端的POST请求。我们的web服务将接收JSON文档内容包括很多的负载需要发送到亚马逊S3存储,为了后续使用map-reduce来处理这些数据。
传统方式我们将创建worker-tier架构,使用包含如下中间件:
- SideKiq
- Resque
- DelayedJob
- Elasticbeanstalk worker tier
- RabbitMQ
搭建两个集群,一个部署前端另一个用于workers,这样就可以通过扩展来应对大量后端任务。自开始,团队就知道应该使用Go,因为在讨论的过程中就看出来可能是个很大流量的系统。作者使用go大概两年,也开发了一些系统但是没有处理过这种大流量的。
起初我们创建一些struct来定义web服务POST请求接收的负载和上传数据到S3桶的方法。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// 待实现
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
简单的Go协程方法
开始我们使用一个很简单的POST handler来实现,仅将任务放进一个简单goroutine中来并行处理。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 将body读取到字符串并使用json解码
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
//迭代每个payload并逐个上传到S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
对于流量不是很大的情况,可以应对大多数人请求,但是在大规模情况下很快上面的方法就被证明不是很好了。我们预期有很多的请求,但和我们部署第一个版本到生产环境中所看到的不一样。我们完全低估了流量。以上方法有很多不足地方。无法控制goroutine的数量。当达到每分钟1百万POST请求的时候,代码直接瘫痪了。
再次优化
需要另找方法。一开始我们就讨论如何保持处理请求生命周期非常短,并在后台处理。当然这个在Ruby中是必须做的,否则将阻塞所有可用的worker。然后我们就使用常规解决方案来做,比如Resque、Sidekiq、SQS等。很多处理这种问题的方法。
因此第二个版本通过创建带缓冲的channel,这样就可以缓存一些jobs,并逐步上传到S3,而且可以控制缓存队列的长度,有足够内存也能够存放这些job。我们认为将job存放到channel队列中是可以的。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
然后消费队列处理jobs,使用类似如下方式:
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
老实说,我对我们所想的一点底都没有。这个方法并没有让我们轻松,我们用缓存来应对并发只是延缓了问题的爆发。同步处理机制,每次上传一个负载到S3,因为接受请求的速度太快,远比一个处理器上传到S3点速度快,导致缓存很快就挤满了,导致后面来的请求直接阻塞。我们只是在回避这个问题,直到倒计时我们的系统最终死亡。在我们部署了这个有缺陷的版本之后,我们的延迟率在几分钟内以恒定的速率不断增加。
更好的方法
我们决定在使用go channel时利用一个通用的模式,创建一个两层的channel系统,一个用来存放jobs另一个用于控制并发处理job队列的workers的数量。为了保持一定层度的并发上传数据到S3,一方面不会使系统拖垮,另一方面不会出现连接S3错误。因此我们选择创建一个job/worker模式。这在java、C#等中经常使用。考虑以golang方式实现可以通过使用channels来代替worker线程池。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
//Start方法通过循环监听任务请求和停止信号。
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
//注册当前worker到worker队列
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// 接收到一个工作请求
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// 接收到停止工作信号
return
}
}
}()
}
//Stop方法通知worker停止监听工作请求
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
修改web请求处理函数,创建一个Job结构体实例并将Job实例发送到JobQenue channel供worker处理。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
//将body读取到字符串并使用json解码
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// 迭代每个payload并逐个上传到S3
for _, payload := range content.Payloads {
//创建Job实例
work := Job{Payload: payload}
//将worker发送到队列
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
在web服务器初始化的时候创建一个Dispather和调用Run()来创建workers池,并开始监听JobQueue中jobs。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
以下是dispatcher实现:
type Dispatcher struct {
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
注意我们设置了workers的实例最大数量,并添加到worker池中。因为我们项目在docker环境中使用了亚马逊Elasticbeanstalk,所以在生产环境下我们总是遵循12-factor原则来配置我们系统,通过环境变量的方式读取配置。这种方式我们可以控制workers的数量和JobQueue的长度,因此可以快速调整这些值不需要重新部署集群。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
中间结果
在部署上面的优化方案之后,很快我们就发现延时下降到可接受范围之内,并且可以处理波动很大的请求量。
在几分钟后当弹性负载均衡起作用后,看到ElasticBeanstalk应用服务处理近1百度请求每分钟。经常在早上几小时流量还能飚升到超过一百万每分钟。
新代码部署后,服务器的数量很快从100多个降20个。
总结
在这里使用了简单的方法。本来我们可以设计复杂的系统包含很多的队列、后台workers、复杂的部署,但是我们决定使用Elasticbeanstalk自动扩展能力和go提供的高效简单的并发方法。