首页  ·  知识 ·  架构设计
电商应用的订单队列架构思想
林冠宏  博客园  综合  编辑:81192   图片来源:网络
每种方式有各自的特点,因为本文谈的是订单队列的架构思想,所以下面我们来看下如何在订单系统中引入订单队列。

本文所要分享的思路就是电商应用中常用的 订单队列 。

一般的订单流程

电商应用中,简单直观的用户从下单到付款,最终完成整个流程的步骤可以用下图表示:

其中, 订单信息持久化 ,就是存储数据到数据库中。而最终客户端完成支付后的 更新订单状态的操作是由第三方支付平台进行回调设置好的回调链接 NotifyUrl ,来进行的。

补全订单状态的更新流程,如下图表示:

思考瓶颈点

服务端的直接 瓶颈点 ,首先要考虑 TPS 。去除细分点,我们主要看 订单信息持久化 瓶颈点。

在高并发业务场景中,例如 秒杀 、 优惠价抢购 等。短时间内的下单请求数会很多,如果 订单信息持久化 部分,不做优化,而是直接对数据库层进行 频繁的 读写操作,数据库会承受不了,容易成为第一个垮掉的服务,比如下图的所示的常规写单流程:

可以看到,  持久化一个订单信息,一般要经历网络连接操作(链接数据库),以及多个 I/O 操作。

得益于 连接池 技术,我们可以在链接数据库的时候,不用每次都重新发起一次完整的HTTP请求,而可以直接从池中获取已打开了的连接句柄,而直接使用,这点和线程池的原理差不多。

此外,我们还可以在上面的流程中加入更多的优化,例如对于一些需要读取的信息,可以事先存置到内存缓存层,并加于更新维护,这样在使用的时候,可以快速读取。

即使我们都具备了上述的一些优化手段,但是对于 写操作 的 I/O 阻塞耗时,在 高并发请求的时候,依然容易导致数据库承受不住,容易出现 链接多开异常 , 操作超时 等问题。

在该层进行优化的操作,除了上面谈到的之外,还有下面一些手段:

  • 数据库集群,采用读写分离,减少写时压力

  • 分库,不同业务的表放到不同的数据库,会引入分布式事务问题

  • 采用队列模型削峰

每种方式有各自的特点,因为本文谈的是 订单队列 的架构思想,所以下面我们来看下如何在订单系统中引入订单队列。

订单队列

网上有不少文章谈到订单队列的做法,大部分都漏了说明请求与响应的一致性问题。

第一种订单队列 流程图:

上图是大多文章提到的队列模型,有两个没有解析的问题:

notifyUrl

首先,要肯定的是,上面的订单流程图是没有问题的。它有下面的优缺点,所提到的两个问题也是有解决方案的。

优点:

搭配中间件

缺点:

  • 多订单入队时,② 步骤的处理速度跟不上。从而导致第二点问题。

  • 实现较复杂

上面谈及的问题点,我后面都会给出解决方案。下面我们来看下另外一种订单队列流程图。

第二种订单队列 流程图:

第二种订单队列的设计模型,注意它的 同步等待 持久化处理的结果,解决了持久化与响应的一致性问题,但是有个严重的耗时等待问题,它的优缺点如下:

优点:

  1. 持久化与响应的强一致性。

  2. 持久化处理,采用排队的先来先处理,不会像上面谈到的高并发请求一起冲击数据库层面的情况。

  3. 实现简单

缺点:

  1. 多订单入队时,持久化单元处理速度跟不上,造成客户端同步等待响应。

这类订单队列,我下面会放出 Golang 实现的版本代码。

总结

对比上面两种常见的订单模型,如果从 用户体验的角度 去优先考虑,第一种不需要用户等待 持久化处理 结果的是明显优于第二种的。如果技术团队完善,且技术过硬,也应该考虑第一种的实现方式。

如果仅仅想要达到 宁愿用户等待到超时 也不愿意存储层服务被冲垮,那么有限考虑第二种。

实现队列的选择

在这里,我们进一步细分一下,实现队列模块的功能有哪些选择。

相信很多后端开发经验比较老道的同志已经想到了,使用现有的中间件,比如知名的 Redis 、 RocketMQ ,以及 Kafka 等,它们都是一种选择。

此外地,我们还可以直接编写代码,在当前的服务系统中实现一个消息队列来达到目的,下面我用图来分类下队列类型。

不同的队列实现方式,能直接导致不同的功能,也有不同的优缺点:

一级缓存优点:

  1. 一级缓存,最快。无需链接,直接从内存层获取;

  2. 如果不考虑持久化和集群,那么它实现简单。

一级缓存缺点:

  1. 如果考虑持久化和集群,那么它实现比较复杂。

  2. 不考虑持久化情况下,如果服务器断电或其它原因导致服务中断,那么排队中的订单信息将丢失

中间件的优点:

增量

中间件的缺点:

  1. 分布式部署时,需要建立链接通讯,导致读写操作需要走网络通讯。

解答

回到第一种订单模型中:

问题1:

如果订单存在第三方支付情况,① 和 ② 的一致性如何保证?

首先我们看下,不一致性的时候,会产生什么结果:

响应页面

上述的情况,明显地,只有 3 是需要恢复订单信息的,应对的方案有:

  • 当服务端支付回调接口被第三方支付平台访问时,无法找到对应的订单信息。那么先将这类支付了却没订单信息的数据存储起来先,比如存储到 表A 。同时启动一个 定时任务B 专门遍历表A,然后去订单列表寻找是否已经有了对应的订单信息,有则更新,没则继续,或跟随制定的检测策略走。

  • 当 ② 是由于服务端的 非崩溃性原因 而导致失败时:

    • 失败的时候同时将原始订单数据重新插入到 队列头部 ,等待下一次的重新持久化处理。

  • 当 ② 因服务端的 崩溃性 原因而导致失败时:

    • 定时任务B 在进行了多次检测无果后,那么根据第三方支付平台在回调时候传递过来的 订单附属信息 对订单进行恢复。

  • 整个过程订单恢复的过程,用户查看订单信息为空白。

  • 定时任务B 所在服务 最好 和回调链接 notifyUrl 所在的接口服务一致,这样能保证当 B 挂掉的时候,回调服务也跟随挂掉,然后第三方支付平台在调用回调失败的情况下,他们会有 重试逻辑 ,依赖这个,在回调服务重启时,可以完成订单信息恢复。

问题2:

如果订单存在第三方支付情况,① 完成了支付,且三方支付平台回调了 notifyUrl,而此时 ② 还在排队等待处理,这种情况又如何处理?

应对的方案参考 问题1 的 定时任务B 检测修改机制。

第二种队列的 Go 版本例子代码

定义一些常量

const (
   QueueOrderKey   = "order_queue"    
   QueueBufferSize = 1024              // 请求队列大小
   QueueHandleTime = time.Second * 7   // 单个 mission 超时时间)

定义出入队接口,方便多种实现

// 定义出入队接口,方便多种实现type IQueue interface {
   Push(key string,data []byte) error
   Pop(key string) ([]byte,error)
}

定义请求与响应实体

// 定义请求与响应实体type QueueTimeoutResp struct {
   Timeout  bool  // 超时标志位
   Response chan interface{}
}type QueueRequest struct {
   ReqId       string  `json:"req_id"`  // 单次请求 id
   Order       *model.OrderCombine `json:"order"` // 订单信息 bean
   AccessTime  int64   `json:"access_time"` // 请求时间
   ResponseChan *QueueTimeoutResp `json:"-"`}

定义队列实体

// 定义队列实体type Queue struct {
   mapLock sync.Mutex
   RequestChan  chan *QueueRequest // 缓存管道,装载请求    RequestMap   map[string]*QueueTimeoutResp
   Queue IQueue}

实例化队列,接收接口参数

// 实例化队列,接收接口参数func NewQueue(queue IQueue) *Queue {    return &Queue{
       mapLock:     sync.Mutex{},
       RequestChan: make(chan *QueueRequest, QueueBufferSize),
       RequestMap:  make(map[string]*QueueTimeoutResp, QueueBufferSize),
       Queue:       queue,
   }
}

接收请求

// 接收请求func (q *Queue) AcceptRequest(req *QueueRequest) interface{} {    if req.ResponseChan == nil {
       req.ResponseChan = &QueueTimeoutResp{
           Timeout:  false,
           Response: make(chan interface{},1),
       }
   }
   userKey := key(req)  // 唯一 key 生成函数
   req.ReqId = userKey
   q.mapLock.Lock()
   q.RequestMap[userKey] = req.ResponseChan // 内存层存储对应的 req 的 resp 管道指针
   q.mapLock.Unlock()
   q.RequestChan <- req  // 接收请求
   log("userKey : ", userKey)
   ticker := time.NewTicker(QueueHandleTime) // 以超时时间 QueueHandleTime 启动一个定时器
   defer func() {
       ticker.Stop() // 释放定时器
       q.mapLock.Lock()        delete(q.RequestMap,userKey)  // 当处理完一个 req,从 map 中移出
       q.mapLock.Unlock()
   }()    select {    case <-ticker.C:  // 超时
       req.ResponseChan.Timeout = true
       Queue_TimeoutCounter++  // 辅助计数,int 类型
       log("timeout: ",userKey)        return lghError.HandleTimeOut  // 返回超时错误的信息
   case result := <-req.ResponseChan.Response:  // req 被完整处理
       return result
   }
}

从请求管道中取出 req 放入到队列容器中,该函数在 gorutine 中运行

// 从请求管道中取出 req 放入到队列容器中,该函数在 gorutine 中运行func (q *Queue) addToQueue() {    for {
       req := <-q.RequestChan // 取出一个 req
       data, err := json.Marshal(req)        if err != nil {
           log("redis queue parse req failed : ", err.Error())            continue
       }        if err = q.Queue.Push(QueueOrderKey, data);err != nil {  // push 入队,这里有时间消耗
           log("lpush req failed. Error : ", err.Error())            continue
       }
       log("lpush req success. req time: ", req.AccessTime)
   }
}

取出 req 处理,该函数在 gorutine 中运行

// 取出 req 处理,该函数在 gorutine 中运行func (q *Queue) readFromQueue() {    for {
       data, err := q.Queue.Pop(QueueOrderKey) // pop 出队,这里也有时间消耗
       if err != nil {
           log("lpop failed. Error : ", err.Error())            continue
       }        if data == nil || len(data) == 0 {
           time.Sleep(time.Millisecond * 100) // 空数据的 req,停顿下再取
           continue
       }
       req := &QueueRequest{}        if err = json.Unmarshal(data, req);err != nil {
           log("Lpop: json.Unmarshal failed. Error : ", err.Error())            continue
       }
       userKey := req.ReqId
       q.mapLock.Lock()
       resultChan, ok := q.RequestMap[userKey] // 取出对应的 resp 管道指针
       q.mapLock.Unlock()        if !ok {            // 中间件重启时,比如 redis 重启而读取旧 key,会进入这里
           Queue_KeyNotFound ++ // 计数 int 类型
           log("key not found, rollback: ", userKey)            continue
       }
       simulationTimeOutReq4(req) // 模拟出来任务的函数,入参为 req
       if resultChan.Timeout {            // 处理期间,已经超时,这里做可以拓展回滚操作
           Queue_MissionTimeout ++
           log("handle mission timeout: ", userKey)            continue
       }
       log("request result send to chan succeee, userKey : ", userKey)
       ret := util.GetCommonSuccess(req.AccessTime)
       resultChan.Response <- &ret // 输入处理成功
   }
}

启动

func (q *Queue) Start()  {    go q.addToQueue()    go q.readFromQueue()
}

运行例子

func test(){
   ...
   runtime.GOMAXPROCS(4)
   redisQueue := NewQueue(NewFastCacheQueue())
   redisQueue.Start()
   reqNumber := testReqNumber
   wg := sync.WaitGroup{}
   wg.Add(reqNumber)    for i :=0;i<reqNumber;i++ {        go func(index int) {
           combine := model.OrderCombine{}
           ret := AcceptRequest(&QueueRequest{
               UserId:       int64(index),
               Order:        &combine,
               AccessTime:   time.Now().Unix(),
               ResponseChan: nil,
           })
           fmt.Println("ret: ------------- ",ret.String())
           wg.Done()
       }(i)
   }
   wg.Wait()
   time.Sleep(3*time.Second)
   fmt.Println("TimeoutCounter: ",Queue_TimeoutCounter,"KeyNotFound: ",Queue_KeyNotFound,"MissionTimeout: ",Queue_MissionTimeout)
}


本文作者:林冠宏 来源:博客园
CIO之家 www.ciozj.com 微信公众号:imciow
    >>频道首页  >>网站首页   纠错  >>投诉
版权声明:CIO之家尊重行业规范,每篇文章都注明有明确的作者和来源;CIO之家的原创文章,请转载时务必注明文章作者和来源;
延伸阅读