🚀Gev is a lightweight, fast non-blocking TCP network library based on Reactor mode. Support custom protocols to quickly and easily build high-performance servers.

Overview

gev

Github Actions Go Report Card Codacy Badge GoDoc LICENSE Code Size

中文 | English

gev is a lightweight, fast non-blocking TCP network library based on Reactor mode.

Support custom protocols to quickly and easily build high-performance servers.

Features

  • High-performance event loop based on epoll and kqueue
  • Support multi-core and multi-threading
  • Dynamic expansion of read and write buffers implemented by Ring Buffer
  • Asynchronous read and write
  • SO_REUSEPORT port reuse support
  • Automatically clean up idle connections
  • Support WebSocket/Protobuf
  • Support for scheduled tasks, delayed tasks
  • Support for custom protocols

Network model

gev uses only a few goroutines, one of them listens for connections and the others (work coroutines) handle read and write events of connected clients. The count of work coroutines is configurable, which is the core number of host CPUs by default.

Performance Test

📈 Test chart

Test environment: Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

Throughput Test

limit GOMAXPROCS=1(Single thread),1 work goroutine

image

limit GOMAXPROCS=4,4 work goroutine

image

Other Test

Speed ​​Test

Compared with the simple performance of similar libraries, the pressure measurement method is the same as the evio project.

  • gnet
  • eviop
  • evio
  • net (StdLib)

limit GOMAXPROCS=1,1 work goroutine

image

limit GOMAXPROCS=1,4 work goroutine

image

limit GOMAXPROCS=4,4 work goroutine

image

Install

go get -u github.com/Allenxuxu/gev

Getting start

echo demo

package main

import (
	"flag"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	//log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	//log.Println("OnMessage")
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	//log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}

Handler is an interface that programs must implement.

type Handler interface {
	OnConnect(c *connection.Connection)
	OnMessage(c *connection.Connection, ctx interface{}, data []byte) []byte
	OnClose(c *connection.Connection)
}

func NewServer(handler Handler, opts ...Option) (server *Server, err error)

OnMessage will be called back when a complete data frame arrives.Users can get the data, process the business logic, and return the data that needs to be sent.

When there is data coming, gev does not call back OnMessage immediately, but instead calls back an UnPacket function.Probably the execution logic is as follows:

ctx, receivedData := c.protocol.UnPacket(c, buffer)
	if ctx != nil || len(receivedData) != 0 {
		sendData := c.OnMessage(c, ctx, receivedData)
		if len(sendData) > 0 {
			return c.protocol.Packet(c, sendData)
		}
	}

protocol

The UnPacket function will check whether the data in the ringbuffer is a complete data frame. If it is, the data will be unpacked and return the payload data. If it is not a complete data frame, it will return directly.

The return value of UnPacket (interface{}, []byte) will be passed in as a call to OnMessage ctx interface{}, data []byte and callback.Ctx is designed to pass special information generated when parsing data frames in the UnPacket function (which is required for complex data frame protocols), and data is used to pass payload data.

type Protocol interface {
	UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
	Packet(c *Connection, data []byte) []byte
}

type DefaultProtocol struct{}

func (d *DefaultProtocol) UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
	ret := buffer.Bytes()
	buffer.RetrieveAll()
	return nil, ret
}

func (d *DefaultProtocol) Packet(c *Connection, data []byte) []byte {
	return data
}

As above, gev provides a default Protocol implementation that will fetch all data in the receive buffer ( ringbuffer ).In actual use, there is usually a data frame protocol of its own, and gev can be set in the form of a plug-in: it is set by variable parameters when creating Server.

s, err := gev.NewServer(handler,gev.Protocol(&ExampleProtocol{}))

Check out the example Protocol for a detailed.

There is also a Send method that can be used for sending data. But Send puts the data to Event-Loop and invokes it to send the data rather than sending data by itself immediately.

Check out the example Server timing push for a detailed.

func (c *Connection) Send(buffer []byte) error

ShutdownWrite works for reverting connected status to false and closing connection.

Check out the example Maximum connections for a detailed.

func (c *Connection) ShutdownWrite() error

RingBuffer is a dynamical expansion implementation of circular buffer.

WebSocket

The WebSocket protocol is built on top of the TCP protocol, so gev doesn't need to be built in, but instead provides support in the form of plugins, in the plugins/websocket directory.

code
type Protocol struct {
	upgrade *ws.Upgrader
}

func New(u *ws.Upgrader) *Protocol {
	return &Protocol{upgrade: u}
}

func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
	upgraded := c.Context()
	if upgraded == nil {
		var err error
		out, _, err = p.upgrade.Upgrade(buffer)
		if err != nil {
			log.Println("Websocket Upgrade :", err)
			return
		}
		c.SetContext(true)
	} else {
		header, err := ws.VirtualReadHeader(buffer)
		if err != nil {
			log.Println(err)
			return
		}
		if buffer.VirtualLength() >= int(header.Length) {
			buffer.VirtualFlush()

			payload := make([]byte, int(header.Length))
			_, _ = buffer.Read(payload)

			if header.Masked {
				ws.Cipher(payload, header.Mask, 0)
			}

			ctx = &header
			out = payload
		} else {
			buffer.VirtualRevert()
		}
	}
	return
}

func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
	return data
}

The detailed implementation can be viewed by the plugin. The source code can be viewed using the websocket example.

Example

echo server
package main

import (
	"flag"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	//log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	//log.Println("OnMessage")
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	//log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}
Automatically clean up idle connections
package main

import (
	"flag"
	"strconv"
	"time"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/gev/log"
)

type example struct {
}

func (s *example) OnConnect(c *connection.Connection) {
	log.Info(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Infof("OnMessage from : %s", c.PeerAddr())
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Info("OnClose: ", c.PeerAddr())
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.IdleTime(5*time.Second))
	if err != nil {
		panic(err)
	}

	s.Start()
}
Maximum connections
package main

import (
	"log"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/toolkit/sync/atomic"
)

// Server example
type Server struct {
	clientNum     atomic.Int64
	maxConnection int64
	server        *gev.Server
}

// New server
func New(ip, port string, maxConnection int64) (*Server, error) {
	var err error
	s := new(Server)
	s.maxConnection = maxConnection
	s.server, err = gev.NewServer(s,
		gev.Address(ip+":"+port))
	if err != nil {
		return nil, err
	}

	return s, nil
}

// Start server
func (s *Server) Start() {
	s.server.Start()
}

// Stop server
func (s *Server) Stop() {
	s.server.Stop()
}

// OnConnect callback
func (s *Server) OnConnect(c *connection.Connection) {
	s.clientNum.Add(1)
	log.Println(" OnConnect : ", c.PeerAddr())

	if s.clientNum.Get() > s.maxConnection {
		_ = c.ShutdownWrite()
		log.Println("Refused connection")
		return
	}
}

// OnMessage callback
func (s *Server) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Println("OnMessage")
	out = data
	return
}

// OnClose callback
func (s *Server) OnClose(c *connection.Connection) {
	s.clientNum.Add(-1)
	log.Println("OnClose")
}

func main() {
	s, err := New("", "1833", 1)
	if err != nil {
		panic(err)
	}
	defer s.Stop()

	s.Start()
}
Server timing push
package main

import (
	"container/list"
	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"log"
	"sync"
	"time"
)

// Server example
type Server struct {
	conn   *list.List
	mu     sync.RWMutex
	server *gev.Server
}

// New server
func New(ip, port string) (*Server, error) {
	var err error
	s := new(Server)
	s.conn = list.New()
	s.server, err = gev.NewServer(s,
		gev.Address(ip+":"+port))
	if err != nil {
		return nil, err
	}

	return s, nil
}

// Start server
func (s *Server) Start() {
	s.server.RunEvery(1*time.Second, s.RunPush)
	s.server.Start()
}

// Stop server
func (s *Server) Stop() {
	s.server.Stop()
}

// RunPush push message
func (s *Server) RunPush() {
	var next *list.Element

	s.mu.RLock()
	defer s.mu.RUnlock()

	for e := s.conn.Front(); e != nil; e = next {
		next = e.Next()

		c := e.Value.(*connection.Connection)
		_ = c.Send([]byte("hello\n"))
	}
}

// OnConnect callback
func (s *Server) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())

	s.mu.Lock()
	e := s.conn.PushBack(c)
	s.mu.Unlock()
	c.SetContext(e)
}

// OnMessage callback
func (s *Server) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Println("OnMessage")
	out = data
	return
}

// OnClose callback
func (s *Server) OnClose(c *connection.Connection) {
	log.Println("OnClose")
	e := c.Context().(*list.Element)

	s.mu.Lock()
	s.conn.Remove(e)
	s.mu.Unlock()
}

func main() {
	s, err := New("", "1833")
	if err != nil {
		panic(err)
	}
	defer s.Stop()

	s.Start()
}
WebSocket
package main

import (
	"flag"
	"log"
	"math/rand"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/gev/plugins/websocket/ws"
	"github.com/Allenxuxu/gev/plugins/websocket/ws/util"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, data []byte) (messageType ws.MessageType, out []byte) {
	log.Println("OnMessage:", string(data))
	messageType = ws.MessageBinary
	switch rand.Int() % 3 {
	case 0:
		out = data
	case 1:
		msg, err := util.PackData(ws.MessageText, data)
		if err != nil {
			panic(err)
		}
		if err := c.Send(msg); err != nil {
			msg, err := util.PackCloseData(err.Error())
			if err != nil {
				panic(err)
			}
			if e := c.Send(msg); e != nil {
				panic(e)
			}
		}
	case 2:
		msg, err := util.PackCloseData("close")
		if err != nil {
			panic(err)
		}
		if e := c.Send(msg); e != nil {
			panic(e)
		}
	}
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := NewWebSocketServer(handler, &ws.Upgrader{},
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops))
	if err != nil {
		panic(err)
	}

	s.Start()
}
package main

import (
	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/plugins/websocket"
	"github.com/Allenxuxu/gev/plugins/websocket/ws"
)

// NewWebSocketServer 创建 WebSocket Server
func NewWebSocketServer(handler websocket.WebSocketHandler, u *ws.Upgrader, opts ...gev.Option) (server *gev.Server, err error) {
	opts = append(opts, gev.Protocol(websocket.New(u)))
	return gev.NewServer(websocket.NewHandlerWrap(u, handler), opts...)
}
protobuf
package main

import (
	"flag"
	"log"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
	pb "github.com/Allenxuxu/gev/example/protobuf/proto"
	"github.com/Allenxuxu/gev/plugins/protobuf"
	"github.com/golang/protobuf/proto"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	msgType := ctx.(string)

	switch msgType {
	case "msg1":
		msg := &pb.Msg1{}
		if err := proto.Unmarshal(data, msg); err != nil {
			log.Println(err)
		}
		log.Println(msgType, msg)
	case "msg2":
		msg := &pb.Msg2{}
		if err := proto.Unmarshal(data, msg); err != nil {
			log.Println(err)
		}
		log.Println(msgType, msg)
	default:
		log.Println("unknown msg type")
	}

	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.Protocol(&protobuf.Protocol{}))
	if err != nil {
		panic(err)
	}

	log.Println("server start")
	s.Start()
}
package main

import (
	"bufio"
	"fmt"
	"log"
	"math/rand"
	"net"
	"os"

	pb "github.com/Allenxuxu/gev/example/protobuf/proto"
	"github.com/Allenxuxu/gev/plugins/protobuf"
	"github.com/golang/protobuf/proto"
)

func main() {
	conn, e := net.Dial("tcp", ":1833")
	if e != nil {
		log.Fatal(e)
	}
	defer conn.Close()

	var buffer []byte
	for {
		reader := bufio.NewReader(os.Stdin)
		fmt.Print("Text to send: ")
		text, _ := reader.ReadString('\n')
		name := text[:len(text)-1]

		switch rand.Int() % 2 {
		case 0:
			msg := &pb.Msg1{
				Name: name,
				Id:   1,
			}

			data, err := proto.Marshal(msg)
			if err != nil {
				panic(err)
			}
			buffer = protobuf.PackMessage("msg1", data)
		case 1:
			msg := &pb.Msg2{
				Name:  name,
				Alias: "big " + name,
				Id:    2,
			}

			data, err := proto.Marshal(msg)
			if err != nil {
				panic(err)
			}
			buffer = protobuf.PackMessage("msg2", data)
		}

		_, err := conn.Write(buffer)
		if err != nil {
			panic(err)
		}
	}
}

Thanks

Thanks JetBrains for the free open source license

References

Issues
  • 关于doPendingFunc执行的疑问

    关于doPendingFunc执行的疑问

    感谢你开源的高性能通信框架啊,谢谢。 在看源码的时候,我的理解是doPendingFunc函数的执行是通过eventfd来触发的,这样可以统一通过epollWait来调度执行,但有两个疑问: 1,为什么要统一通过epollWait来统一调度呢,不可以想写数据的时候直接就往connfd中write呢?这样会有什么问题呢?谢谢。 2,

    func (l *EventLoop) doPendingFunc() {
    	l.mu.Lock()
    	pf := l.pendingFunc
    	l.pendingFunc = nil
    	l.mu.Unlock()
    
    	length := len(pf)
    	for i := 0; i < length; i++ {  // 遍历所有的pengdingFunc,并执行
    		pf[i]()
    	}
    }
    

    doPendingFunc 这个函数会遍历所有的pendingFunc并执行,那么只要一个pendingFunc写入的eventfd被调度了,那么就会执行所有的pendingFunc,而且定时器也会定时触发这个调度。这样感觉会导致一些异步任务的执行并不是由这个任务本身去触发的,而是由其他的任务触发的,而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了)。 请问是我哪里理解错了吗?谢谢。

    question 
    opened by yiippee 29
  • epoll busy loop

    epoll busy loop

    我写了个客户,启动500个协程去连接服务器后立刻关闭连接。发现有时候客户端这边连接已经关闭完了,服务端资源还没释放,就像下面这样:

    top - 14:59:36 up 29 days, 18:10,  2 users,  load average: 1.01, 0.57, 0.26
    Tasks: 268 total,   1 running, 267 sleeping,   0 stopped,   0 zombie
    Cpu0  : 53.9%us, 46.1%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Cpu1  :  0.7%us,  0.7%sy,  0.0%ni, 98.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Cpu2  :  2.6%us,  0.3%sy,  0.0%ni, 97.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Cpu3  :  0.3%us,  0.7%sy,  0.0%ni, 99.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
    Mem:   1923764k total,  1737412k used,   186352k free,   191004k buffers
    Swap:  4128760k total,    81104k used,  4047656k free,   892780k cached
    
      PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
     5179 root      20   0  981m 6928 2960 S 100.5  0.4   3:11.67 mirserver
     2531 root      20   0  871m 362m 5408 S  2.6 19.3   1218:10 java
     5244 root      20   0  2704 1208  872 R  1.0  0.1   0:10.24 top
    

    测试用的客户端代码是这样的:

    func main(){
        loops, err := strconv.Atoi(os.Args[1])
        if err != nil {
            log.Fatalln(err)
        }
        
        success,failed := 0,0
        
        wg := &sync.WaitGroup{}
        for i:=0;i<loops;i++ {
            go func(){
                wg.Add(1)
                defer wg.Done()
                
                conn, err := net.DialTimeout("tcp", "login.afkplayer.com:7000", time.Second * 60)
                if err != nil {
                    failed ++
                    log.Println(err)
                    return
                }
                success ++
                conn.Close()
            }()
            
        }
        
        wg.Wait()
        log.Println("Test complete...")
        log.Printf("Success: %d Failed: %d\n", success, failed)
    }
    

    执行结果是这样的:

    E:\HardGo\src>go run main.go 500
    2019/12/13 14:57:37 Test complete...
    2019/12/13 14:57:37 Success: 500 Failed: 0
    
    bug 
    opened by defsky 25
  • centos6.5  32位系统下 RussellLuo/timingwheel  运行后 panic

    centos6.5 32位系统下 RussellLuo/timingwheel 运行后 panic

    panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x804a6bc]

    goroutine 1 [running]: runtime/internal/atomic.Xchg64(0xa1201ac, 0xc9fda7b8, 0x16d, 0x0, 0x38b) /usr/lib/golang/src/runtime/internal/atomic/asm_386.s:151 +0xc github.com/Allenxuxu/timingwheel%2eV2.(*bucket).SetExpiration(...) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/bucket.go:74 github.com/Allenxuxu/timingwheel%2eV2.(*TimingWheel).add(0xa0ae0a0, 0xa0ade80, 0xa0ae0a0) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/timingwheel.go:79 +0x167 github.com/Allenxuxu/timingwheel%2eV2.(*TimingWheel).add(0xa0ae000, 0xa0ade80, 0xa0adea0) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/timingwheel.go:106 +0x1f2 github.com/Allenxuxu/timingwheel%2eV2.(*TimingWheel).addOrRun(0xa0ae000, 0xa0ade80) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/timingwheel.go:113 +0x29 github.com/Allenxuxu/timingwheel%2eV2.(*TimingWheel).EveryFunc(0xa0ae000, 0x3b9aca00, 0x0, 0xa0782c0, 0xa07c3a0) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/timingwheel.go:185 +0x190 github.com/Allenxuxu/gev.(*Server).RunEvery(...) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/server.go:84 main.(*Server).Start(0xa07c3a0) /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/example/pushmessage/main.go:33 +0x72 main.main() /home/gohome/pkg/mod/github.com/!allenxuxu/[email protected]/example/pushmessage/main.go:90 +0x85 exit status 2

    系统信息: Linux develop 2.6.32-358.el6.i686 #1 SMP Thu Feb 21 21:50:49 UTC 2013 i686 i686 i386 GNU/Linux

    opened by defsky 24
  • client connection

    client connection

    opened by StringNick 23
  • bugfix: fix(#85)

    bugfix: fix(#85)

    bugfix: fix(#85)

    opened by cs-charles 19
  • 在 k8s 上 pod cpu 满载

    在 k8s 上 pod cpu 满载

    一、问题描述

    我在两套 k8s 环境都有出现,我分配多少CPU他运行一辆天后就占用多少CPU。在没有什么用户量的情况下就会出现占满。

    1、运行环境

    # 运行系统
    alpine:3.11
    # gev 版本
    github.com/Allenxuxu/gev v0.1.9
    # golang 版本
    golang:1.15
    
    

    2、部分编排文件

            livenessProbe:
              tcpSocket:
                port: 6061
              periodSeconds: 3
              initialDelaySeconds: 3
            resources:
              requests:
                cpu: 1
                memory: 2Gi
              limits:
                cpu: 1
                memory: 2Gi
    

    3、没有新的连接,仅有一些连接和断开的日志,基本都是这种日志,大概隔几秒会有一组

    [server_gev_handler.go:25::dm-im/im-gateway/server.(*gevHandler).OnConnect()] Mar 16 09:34:38.953 [D] OnConnect remote address is 172.31.19.178:11738
    [server_gev_handler.go:25::dm-im/im-gateway/server.(*gevHandler).OnConnect()] Mar 16 09:34:38.954 [D] OnConnect remote address is 172.31.28.106:46826
    [server_gev_handler.go:156::dm-im/im-gateway/server.(*gevHandler).OnClose()] Mar 16 09:34:38.954 [D] OnClose remote address is 172.31.19.178:11738
    [server_gev_handler.go:156::dm-im/im-gateway/server.(*gevHandler).OnClose()] Mar 16 09:34:38.954 [D] OnClose remote address is 172.31.28.106:46826
    

    4、pod CPU 信息(当前分配1000M,测试了分配2000M也会满)

    im-gateway-857f97b9f-ld2kt             993m         16Mi
    

    5、连接占满的情况下的连接信息

    /data # netstat -nat |awk '{print $6}'|sort|uniq -c|sort -rn
         19 ESTABLISHED
          8 CLOSE_WAIT
          4 LISTEN
          1 established)
          1 Foreign
    

    6、go profile 信息

    File: app
    Build ID: 053ed7dab2cea4aa2ece55e7125b24b2e6c1a958
    Type: cpu
    Time: Mar 16, 2021 at 9:21am (CST)
    Duration: 30.23s, Total samples = 29.46s (97.47%)
    Entering interactive mode (type "help" for commands, "o" for options)
    (pprof) top
    Showing nodes accounting for 24990ms, 84.83% of 29460ms total
    Dropped 74 nodes (cum <= 147.30ms)
    Showing top 10 nodes out of 28
          flat  flat%   sum%        cum   cum%
       14110ms 47.90% 47.90%    15080ms 51.19%  syscall.Syscall6
        4710ms 15.99% 63.88%     4950ms 16.80%  time.now
        1260ms  4.28% 68.16%     2660ms  9.03%  runtime.mapaccess2
        1190ms  4.04% 72.20%     9050ms 30.72%  github.com/Allenxuxu/gev/eventloop.(*EventLoop).handlerEvent
         770ms  2.61% 74.81%     1680ms  5.70%  github.com/Allenxuxu/gev/connection.(*Connection).HandleEvent
         680ms  2.31% 77.12%     1330ms  4.51%  github.com/Allenxuxu/gev/eventloop.(*EventLoop).doPendingFunc
         630ms  2.14% 79.26%      630ms  2.14%  github.com/Allenxuxu/toolkit/sync/spinlock.(*SpinLock).Lock
         630ms  2.14% 81.40%      690ms  2.34%  runtime.(*itabTableType).find
         510ms  1.73% 83.13%    24690ms 83.81%  github.com/Allenxuxu/gev/poller.(*Poller).Poll
         500ms  1.70% 84.83%      500ms  1.70%  sync.(*entry).load
    

    二、期望 CPU 随业务量增减

    opened by chunlongyuan 18
  • 调用Connection.Send时,必须创建新的buffer,开销略大

    调用Connection.Send时,必须创建新的buffer,开销略大

    `func (c *Connection) Send(buffer []byte) error { if !c.connected.Get() { return ErrConnectionClosed }

    c.loop.QueueInLoop(func() {
    	if c.connected.Get() {
    		c.sendInLoop(c.protocol.Packet(c, buffer))
    	}
    })
    return nil
    

    }` buffer参数需要等到sendInLoop执行完成后,才能被重用或释放 但目前接口设计上来说,Send调用方是无法知道buffer是否已经被使用完毕,导致无法使用Connection的UserBuffer或者buffer池等方法重复使用buffer,而是每次发送必须创建一个新的buffer,使用完毕后等待gc自动回收,效率略差

    暂时想到两个方案:

    1. Connection.Callback增加OnSendInLoopFinish回调,将buffer传参出去,调用方在回调中将buffer重新放回内存池
    2. 修改Protocal的Packet方法的data参数,类型改为interface,允许外部传入protobuf等未打包的对象,在自定义Packet实现时,使用Connection的UserBuffer作为打包buffer

    不知道是否可以支持 ?

    enhancement 
    opened by wangyjyj 14
  • 是否有更详细的文档描述?

    是否有更详细的文档描述?

    首先很开心能分享这样的一个非常不错的框架。但是我还是一个新手,我有一些下面的问题或小小建议:

    1. 文档太糙
    2. 目录结构太随意以及测试*_test.go 正常要放在test目录下更清晰一些,大致看了下目录结构,是否可以这样(实践循环库、网络连接、协议、配置、example、test目录),是否能更简化一些目录?
    3. 本身tcp协议有粘包现象,通讯协议这块是否可以增加自定义预留同时也多开放一些(text、frame、自定义)
    4. 流程是否能清晰一些?或有一个视频讲解?
    opened by taozywu 13
  • 客户端连接后立即关闭,onConnect 回调函数晚于 onClose 执行

    客户端连接后立即关闭,onConnect 回调函数晚于 onClose 执行

    连接关闭时获取到连接的Context为nil导致panic, 看起来是不是框架里面有毛病?明明建立连接的时候设置了context的

    panic onconnect onclose

    opened by defsky 11
  • gev在代理的场景如何应用?

    gev在代理的场景如何应用?

    onConnect的连接是客户端连接,每个客户端连接成功注册为一个downSession,同时希望他进行一系列逻辑处理以后,在每一个downSession里创建一个上行的连接,作为代理转发。这个 C++的libevent库可以很容易处理这种场景,刚接触gev,不是很清楚,如何处理这种场景

    opened by rocqina 11
  • WIP: Feature Connector

    WIP: Feature Connector

    enhancement 
    opened by Allenxuxu 0
  • 建议参加一下 techempower?

    建议参加一下 techempower?

    https://www.techempower.com/benchmarks/#section=data-r20&hw=ph&test=plaintext

    opened by zhwei820 1
  • 使用本项目的公司/项目

    使用本项目的公司/项目

    issue 记录采用本项目的公司或者开源项目

    opened by Allenxuxu 3
  • windows的支持

    windows的支持

    不期望在windows上有什么好的性能,但是对于演示或者小连接量的系统,还是有作用的

    或许可以像evio一样基于std库实现,接口统一就好

    help wanted 
    opened by yangjuncode 8
  • 异步的优势

    异步的优势

    go 使用协程+阻塞的模式来处理并发问题。这样的模式虽然对运行时要求很高,但对程序员却非常友好。这样的代码码也非常容易维护。

    异步模式最大的问题就是回调嵌套,项目大了根本没法维护。我就是不想用回调方式写业务代码才转 go 的。

    你认为这类 go 语言的异步框架有什么优势,要解决什么问题?

    讨论 
    opened by lvht 50
Releases(v0.3.0)
Owner
徐旭
礼法岂是为吾辈而设
徐旭
:vulcan_salute: Fast, modern, easy-to-use network scanner

sx is the command-line network scanner designed to follow the UNIX philosophy. The goal of this project is to create the fastest network scanner with

null 659 Jul 19, 2021
Ethr is a Comprehensive Network Measurement Tool for TCP, UDP & ICMP.

Ethr Ethr is a cross platform network performance measurement tool written in golang. The goal of this project is to provide a native tool for compreh

Microsoft 4.9k Jul 26, 2021
A fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet.

frp README | 中文文档 What is frp? frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the Internet. As of now, it s

null 47k Jul 22, 2021
🚀 gnet is a high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go./ gnet 是一个高性能、轻量级、非阻塞的事件驱动 Go 网络框架。

English | ???? 中文 ?? Introduction gnet is an event-driven networking framework that is fast and lightweight. It makes direct epoll and kqueue syscalls

Andy Pan 4.9k Jul 24, 2021
Package for downloading things from a string URL using a variety of protocols.

go-getter is a library for Go (golang) for downloading files or directories from various sources using a URL as the primary form of input.

HashiCorp 1.2k Jul 22, 2021
Network-wide ads & trackers blocking DNS server

Privacy protection center for you and your devices Free and open source, powerful network-wide ads & trackers blocking DNS server. AdGuard.com | Wiki

AdGuard 8.2k Jul 18, 2021
Hprose 1.0 for Golang (Deprecated). Hprose 2.0 for Golang is here:

Hprose for Golang Introduction Installation Usage Http Server Http Client Synchronous Invoking Synchronous Exception Handling Asynchronous Invoking As

Hprose 138 Jun 28, 2021
HTTP(S)/WS(S)/TCP Tunnels to localhost using only SSH.

An open source serveo/ngrok alternative.

Antonio Mika 1.9k Jul 23, 2021
A Stable & Secure Tunnel based on KCP with N:M multiplexing and FEC. Available for ARM, MIPS, 386 and AMD64。KCPプロトコルに基づく安全なトンネル。KCP 프로토콜을 기반으로 하는 보안 터널입니다。

Disclaimer: kcptun maintains a single website — github.com/xtaci/kcptun. Any websites other than github.com/xtaci/kcptun are not endorsed by xtaci. Re

xtaci 12.5k Jul 27, 2021
Multiplexer over TCP. Useful if target server only allows you to create limited tcp connections concurrently.

tcp-multiplexer Use it in front of target server and let your client programs connect it, if target server only allows you to create limited tcp conne

许嘉华 3 May 27, 2021
Network Diagnostic Tool

myLG, Command line Network Diagnostic Tool myLG is an open source software utility which combines the functions of the different network probes in one

Mehrdad Arshad Rad 2.5k Jul 15, 2021
Dockin CNI - Dockin Container Network Interface

Dockin CNI - Dockin Container Network Interface

WeBankFinTech 24 Jul 8, 2021
Tools for understanding, measuring, and applying network policies effectively in kubernetes

Cyclonus network policy explainer, prober, and test case generator! Parse, explain, and probe network policies to understand their implications and he

Matt Fenwick 45 Jul 15, 2021