Featured image of post Golang实现支持中间件的简易TCP框架

Golang实现支持中间件的简易TCP框架

Golang实现支持中间件的简易TCP框架

在最近写的一个项目中需要用到一个TCP反向代理的模块,但是在golang的标准库中没有为tcp直接提供像http那样简单易用的服务框架,本篇则简单实现一个易用的TCP服务器

主体思路

我们的实现的主题思路分为以下四个内容

  1. 监听服务
  2. 获取构建新连接对象并设置超时时间及keepalive
  3. 设置方法退出时连接关闭
  4. 调用回调接口 TcpHandler

主要结构体和接口

首先是TCPServer的结构体,我们希望用户可以自由构建TcpServer并设置超时时间等自定义选项

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type TcpServer struct {
   Addr    string
   Handler TCPHandler  <- 对外提供的服务方法接口
   err     error
   BaseCtx context.Context

   WriteTimeout     time.Duration
   ReadTimeout      time.Duration
   KeepAliveTimeout time.Duration

   mu         sync.Mutex
   inShutdown int32
   doneChan   chan struct{}
   l          *onceCloseListener
}

像httpHandler一样,对外提供抽象的ServeTCP方法

1
2
3
type TCPHandler interface {
   ServeTCP(ctx context.Context, conn net.Conn)
}

服务启动方法

用户可以通过自行构建TcpServer实例再通过ListenAndServe()调用服务,或通过tcp.ListenAndServe(":8080", handler) 使用默认的TcpServer实例快速启动服务。

ListenAndServe() 方法中,进行参数的校验和初始化操作

Serve(l net.Listener) 方法中,通过 l.Accept() 接收信息,包装接收到的conn并另起一个协程处理服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func ListenAndServe(addr string, handler TCPHandler) error {
	server := &TcpServer{Addr: addr, Handler: handler, doneChan: make(chan struct{})}
	return server.ListenAndServe()
}

func (s *TcpServer) ListenAndServe() error {
   if s.shuttingDown() {
      return ErrServerClosed
   }
   if s.doneChan == nil {
      s.doneChan = make(chan struct{})
   }
   addr := s.Addr
   if addr == "" {
      return errors.New("need addr")
   }
   ln, err := net.Listen("tcp", addr)
   if err != nil {
      return err
   }
   return s.Serve(tcpKeepAliveListener{
      ln.(*net.TCPListener)})
}

func (s *TcpServer) Serve(l net.Listener) error {
	s.l = &onceCloseListener{Listener: l}
	defer s.l.Close() //执行listener关闭
	if s.BaseCtx == nil {
		s.BaseCtx = context.Background()
	}
	baseCtx := s.BaseCtx
	ctx := context.WithValue(baseCtx, ServerContextKey, s) <- 将TcpServer实例存入context中
	for {
		rw, e := l.Accept()
		if e != nil {
			select {
			case <-s.getDoneChan():
				return ErrServerClosed
			default:
			}
			fmt.Printf("accept fail, err: %v\n", e)
			continue
		}
		c := s.newConn(rw)
		go c.serve(ctx)
	}
	return nil
}

包装 net.Conntcp.conn

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type conn struct {
   	server     *TcpServer 	// 反引用TcpServer
   	remoteAddr string 		// 发送端地址
	rwc        net.Conn
}

func (s *TcpServer) newConn(rwc net.Conn) *conn {
   c := &conn{
      server: s,
      rwc:    rwc,
   }
   // 设置参数
   if d := c.server.ReadTimeout; d != 0 {
      c.rwc.SetReadDeadline(time.Now().Add(d))
   }
   if d := c.server.WriteTimeout; d != 0 {
      c.rwc.SetWriteDeadline(time.Now().Add(d))
   }
   if d := c.server.KeepAliveTimeout; d != 0 {
      if tcpConn, ok := c.rwc.(*net.TCPConn); ok {
         tcpConn.SetKeepAlive(true)
         tcpConn.SetKeepAlivePeriod(d)
      }
   }
   return c
}

tcp.conn.Server(ctx) 调用回调函数进行服务处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (c *conn) serve(ctx context.Context) {
   defer func() {
      if err := recover(); err != nil && err != ErrAbortHandler {
         const size = 64 << 10
         buf := make([]byte, size)
         buf = buf[:runtime.Stack(buf, false)]
         fmt.Printf("tcp: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
      }
      c.close()
   }()
   c.remoteAddr = c.rwc.RemoteAddr().String()
   ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
   if c.server.Handler == nil {
      panic("handler empty")
   }
   c.server.Handler.ServeTCP(ctx, c.rwc)
}

这样,一个简单易用的TCP服务框架就搭建完成了,其中一些像 close() 等方法在此处没有展示出来,更多详细代码可在我的代码仓库中查看:https://github.com/Kirov7/fayUtils/tree/master/net/tcp

扩展中间件的实现

扩展中间件功能的实现思路

  • 方法构建
    • 构建中间件URL路由
    • 构建URL的中间件方法数组
    • 使用Use方法整合路由与方法数组
  • 方法调用
    • 构建方法请求逻辑
    • 封装TCPHandler接口与TcpServer整合

TcpSliceRouter.Group(path) 方法,初始化路由分组(默认只能全局)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 创建 Group
func (g *TcpSliceRouter) Group(path string) *TcpSliceGroup {
   if path != "/" {
      panic("only accept path=/")
   }
   return &TcpSliceGroup{
      TcpSliceRouter: g,
      path:           path,
   }
}

TcpSliceGroup.Use(middlewares ...TcpHandlerFunc) 构造回调方法

调用 Use 方法传入中间件集合,添加到切片 c.handlers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 构造回调方法
func (g *TcpSliceGroup) Use(middlewares ...TcpHandlerFunc) *TcpSliceGroup {
   g.handlers = append(g.handlers, middlewares...)
   existsFlag := false
   for _, oldGroup := range g.TcpSliceRouter.groups {
      if oldGroup == g {
         existsFlag = true
      }
   }
   if !existsFlag {
      g.TcpSliceRouter.groups = append(g.TcpSliceRouter.groups, g)
   }
   return g
}

通过 NewTcpSliceRouterHandler 方法传入最后调用的逻辑方法coreFunc并传入已经 Use 了中间件的, TcpSliceRouter

1
2
3
4
5
6
func NewTcpSliceRouterHandler(coreFunc func(*TcpSliceRouterContext) tcp_server.TCPHandler, router *TcpSliceRouter) *TcpSliceRouterHandler {
   return &TcpSliceRouterHandler{
      coreFunc: coreFunc,
      router:   router,
   }
}

最终的回调函数 ServeTCP((ctx context.Context, conn net.Conn),初始化 context 之后将 coreFunc 追加到 c.handlers,重置执行光标,从第一个 c.handlers 开始执行中间件

1
2
3
4
5
6
7
8
func (w *TcpSliceRouterHandler) ServeTCP(ctx context.Context, conn net.Conn) {
   c := newTcpSliceRouterContext(conn, w.router, ctx)
   c.handlers = append(c.handlers, func(c *TcpSliceRouterContext) {
      w.coreFunc(c).ServeTCP(ctx, conn)
   })
   c.Reset()
   c.Next()
}

在中间件中自行调用Next()Abort()等中间件逻辑,最后所有中间件执行完毕之后执行 coreFunc(已经被追加到c.handlers的最后位置)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 从最先加入中间件开始回调
func (c *TcpSliceRouterContext) Next() {
   c.index++
   for c.index < int8(len(c.handlers)) {
      c.handlers[c.index](c)
      c.index++
   }
}

// 跳出中间件方法
func (c *TcpSliceRouterContext) Abort() {
	c.index = abortIndex
}
Built with Hugo
主题 StackJimmy 设计