本文以Eosc(一個(gè)高性能中間件開發(fā)框架)中的代碼為例子,看看如何在我們的實(shí)際項(xiàng)目中,實(shí)現(xiàn)這樣的功能。
 
背景
在實(shí)際項(xiàng)目中,我們經(jīng)常需要異步處理事件與數(shù)據(jù)。比如MVC模型中處理請(qǐng)求的Filter鏈,又如在nginx中或是linux的iptables中,都會(huì)有一個(gè)處理鏈條,來一步步的順序處理一個(gè)請(qǐng)求。此外基于集中存儲(chǔ)與分發(fā)的模式,實(shí)現(xiàn)事件與數(shù)據(jù)的異步處理,對(duì)于提升系統(tǒng)響應(yīng)程度,實(shí)現(xiàn)業(yè)務(wù)處理的解耦至關(guān)重要。本文以eosc(一個(gè)高性能中間件開發(fā)框架)中的代碼為例子,看看如何在我們的實(shí)際項(xiàng)目中,實(shí)現(xiàn)這樣的功能
代碼
eosc提供了關(guān)于dispatcher的關(guān)鍵實(shí)現(xiàn)的兩個(gè)文件,分別是dispatch.go和data-dispatch.go,具體的代碼地址是https://github.com/eolinker/eosc/tree/main/common/dispatcher。
這兩個(gè)文件中實(shí)現(xiàn)的結(jié)構(gòu)體與接口的關(guān)系如圖所示:

dispatcher關(guān)鍵接口與結(jié)構(gòu)體的關(guān)系
dispatch.go文件
在dispatch.go文件中,esco提供了IEvent、CallBackHandler、IListener三個(gè)重要的接口。
同時(shí)通過CallBackFunc來實(shí)現(xiàn)接口CallBackHandler, tListener來實(shí)現(xiàn)IListener。
//2個(gè)接口
type CallBackHandler interface {
   DataEvent(e IEvent) error
}
type IListener interface {
   Leave()
   Event() <-chan IEvent
}
/*CallBackFunc實(shí)現(xiàn)了CallBackHandler,同時(shí)CallBackFunc又是一個(gè)接受IEvent為參數(shù),
返回error的函數(shù)*/
type CallBackFunc func(e IEvent) error
func (f CallBackFunc) DataEvent(e IEvent) error {
   return f(e)
}
//實(shí)現(xiàn)了IListener接口
func (t *tListener) Leave() {
   t.Once.Do(func() {
      atomic.StoreUint32(&t.closed, 1)
      close(t.c)
   })
}
func (t *tListener) Event() <-chan IEvent {
   return t.c
}
注意:tListener還提供了一個(gè)Handler方法,這個(gè)方法的參數(shù)與返回結(jié)果與CallBackFunc一樣,也就是說它實(shí)現(xiàn)的Handler方法是一種CallBackFunc,這個(gè)在后面的分發(fā)處理邏輯的注冊(cè)中會(huì)用到。
func (t *tListener) Handler(e IEvent) error {
   if atomic.LoadUint32(&t.closed) == 0 {
      t.c <- e
      return nil
   }
   return ErrorIsClosed
}data-dispatch.go文件
該文件提供了兩種dispatcher創(chuàng)建方法,分別是NewDataDispatchCenter、NewEventDispatchCenter。這兩個(gè)方法都是創(chuàng)建了DataDispatchCenter結(jié)構(gòu)體(這個(gè)結(jié)構(gòu)體后面會(huì)講到),但是啟動(dòng)的處理協(xié)程不同,NewDataDispatchCenter啟動(dòng)的是doDataLoop,NewEventDispatchCenter啟動(dòng)的是doEventLoop。
//兩種DispatchCenter創(chuàng)建方法
func NewDataDispatchCenter() IDispatchCenter {
   ctx, cancelFunc := context.WithCancel(context.Background())
   center := &DataDispatchCenter{
      ctx:          ctx,
      cancelFunc:   cancelFunc,
      addChannel:   make(chan *_CallbackBox, 10),
      eventChannel: make(chan IEvent),
   }
   go center.doDataLoop()
   return center
}
func NewEventDispatchCenter() IDispatchCenter {
   ctx, cancelFunc := context.WithCancel(context.Background())
   center := &DataDispatchCenter{
      ctx:          ctx,
      cancelFunc:   cancelFunc,
      addChannel:   make(chan *_CallbackBox, 10),
      eventChannel: make(chan IEvent),
   }
   go center.doEventLoop()
   return center
}
//DataDispatchCenter 數(shù)據(jù)廣播中心
type DataDispatchCenter struct {
   addChannel   chan *_CallbackBox
   eventChannel chan IEvent
   ctx        context.Context
   cancelFunc context.CancelFunc
}
DataDispatchCenter這個(gè)結(jié)構(gòu)體中有兩個(gè)chan,一個(gè)是addChannel,一個(gè)是eventChannel。
addChannel  | 接受_CallbackBox,這個(gè)_CallbackBox提供了邏輯處理Handler  | 
eventChannel  | 接受IEvent,觸發(fā)  | 
doEventLoop邏輯:
NewEventDispatchCenter方法中啟動(dòng)的doEventLoop,邏輯相對(duì)簡單,創(chuàng)建的channels用于存儲(chǔ)addChannel發(fā)送過來的_CallbackBox,即事件處理Handler.當(dāng)eventChannel收到事件后,遍歷channels中的每一個(gè)_CallbackBox,并調(diào)用相應(yīng)的Handler處理。

doEventLoop狀態(tài)圖
具體代碼可以查看https://github.com/eolinker/eosc/blob/main/common/dispatcher/data-dispatch.go#L48。
doDataLoop邏輯:
NewDataDispatchCenter方法中啟動(dòng)的doDataLoop,這個(gè)邏輯稍微復(fù)雜點(diǎn)。其實(shí)它的大致流程和doEventLoop,不同的是每個(gè)新增加的_CallbackBox,需要對(duì)當(dāng)前接收并緩存的所有Event鍵值對(duì)進(jìn)行處理。而doEventLoop是不會(huì)的,新增加的_CallbackBox,只會(huì)對(duì)在它之后接收的Event生效。下面的代碼InitEvent(data.GET())很有意思。
- 首先InitEvent實(shí)現(xiàn)了IEvent接口,是一種IEvent。
 - type InitEvent map[string]map[string][]byte 
(代碼鏈接:https://github.com/eolinker/eosc/blob/main/common/dispatcher/data.go#L88)InitEvent是一個(gè)map,可以通過InitEvent(data.GET())初始化。
 
func (d *DataDispatchCenter) doDataLoop() {
   data := NewMyData(nil)
   channels := make([]*_CallbackBox, 0, 10)
   isInit := false
   for {
      select {
      case event, ok := <-d.eventChannel:
         if ok {
            isInit = true
            data.DoEvent(event)
            next := channels[:0]
            for _, c := range channels {
               if err := c.handler(event); err != nil {
                  close(c.closeChan)
                  continue
               }
               next = append(next, c)
            }
            channels = next
         }
      case hbox, ok := <-d.addChannel:
         {
            if ok {
               if !isInit {
                  channels = append(channels, hbox)
               } else {
                  if err := hbox.handler(InitEvent(data.GET())); err == nil {
                     channels = append(channels, hbox)
                  }
               }
            }
         }
      }
   }
}應(yīng)用
創(chuàng)建EventServer。
type EventServer struct {
   IDispatchCenter
}
func NewEventServer() *EventServer {
   es := &EventServer{
      IDispatchCenter: NewDataDispatchCenter(),
   }
   return es
}定義事件。
type MyEvent struct {
  namespace string
  key       string
  event     string
  data      []byte
}
func (m *MyEvent) Namespace() string {
  return m.namespace
}
func (m *MyEvent) Event() string {
  return m.event
}
func (m *MyEvent) Key() string {
  return m.key
}
func (m *MyEvent) Data() []byte {
  return m.data
}定義Handler并注冊(cè)。
func Handler(e IEvent) error {
   //根據(jù)自己的業(yè)務(wù)要求
}
es.Register(Handler)發(fā)送事件。
es.Send(&MyEvent{
   namespace: "a",
   key:       "b",
   event:     "set",
   data:      []byte(fmt.Sprint(index)),
})