More effective network communication, two-way calling, notify and broadcast supported.

Overview

ARPC - More Effective Network Communication

Mentioned in Awesome Go

GoDoc MIT licensed Build Status Go Report Card Coverage Statusd

Contents

Features

  • Two-Way Calling
  • Two-Way Notify
  • Sync and Async Calling
  • Sync and Async Response
  • Batch Write | Writev | net.Buffers
  • Broadcast
  • Middleware
  • Pub/Sub
  • Opentracing
Pattern Interactive Directions Description
call two-way:
c -> s
s -> c
request and response
notify two-way:
c -> s
s -> c
request without response

Performance

  • simple echo load testing
Framework Protocol Codec Configuration Connection Num Number of Goroutines Per Connection Qps
arpc tcp/localhost encoding/json os: VMWare Ubuntu 18.04
cpu: AMD 3500U 4c8t
mem: 2G
8 10 80-100k
grpc http2/localhost protobuf os: VMWare Ubuntu 18.04
cpu: AMD 3500U 4c8t
mem: 2G
8 10 20-30k

Header Layout

  • LittleEndian
bodyLen reserved cmd flag methodLen sequence method body
4 bytes 1 byte 1 byte 1 bytes 1 bytes 8 bytes methodLen bytes bodyLen-methodLen bytes

Installation

  1. Get and install arpc
$ go get -u github.com/lesismal/arpc
  1. Import in your code:
import "github.com/lesismal/arpc"

Quick Start

package main

import "github.com/lesismal/arpc"

func main() {
	server := arpc.NewServer()

	// register router
	server.Handler.Handle("/echo", func(ctx *arpc.Context) {
		str := ""
		if err := ctx.Bind(&str); err == nil {
			ctx.Write(str)
		}
	})

	server.Run("localhost:8888")
}
package main

import (
	"log"
	"net"
	"time"

	"github.com/lesismal/arpc"
)

func main() {
	client, err := arpc.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	defer client.Stop()

	req := "hello"
	rsp := ""
	err = client.Call("/echo", &req, &rsp, time.Second*5)
	if err != nil {
		log.Fatalf("Call failed: %v", err)
	} else {
		log.Printf("Call Response: \"%v\"", rsp)
	}
}

API Examples

Register Routers

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

// message would be default handled one by one  in the same conn reader goroutine
handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })

// this make message handled by a new goroutine
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)

Router Middleware

See router middleware, it's easy to implement middlewares yourself

import "github.com/lesismal/arpc/extension/middleware/router"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) { ... })
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
handler.Use(func(ctx *arpc.Context) { ... })

Coder Middleware

  • Coder Middleware is used for converting a message data to your designed format, e.g encrypt/decrypt and compress/uncompress
import "github.com/lesismal/arpc/extension/middleware/coder/gzip"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.UseCoder(gzip.New())
handler.Handle("/echo", func(ctx *arpc.Context) { ... })

Client Call, CallAsync, Notify

  1. Call (Block, with timeout/context)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
  1. CallAsync (Nonblock, with callback and timeout/context)
request := &Echo{...}

timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
	response := &Echo{}
	ctx.Bind(response)
	...	
}, timeout)
  1. Notify (same as CallAsync with timeout/context, without callback)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)

Server Call, CallAsync, Notify

  1. Get client and keep it in your application
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
	client = ctx.Client
	// release client
	client.OnDisconnected(func(c *arpc.Client){
		client = nil
	})
})

go func() {
	for {
		time.Sleep(time.Second)
		if client != nil {
			client.Call(...)
			client.CallAsync(...)
			client.Notify(...)
		}
	}
}()
  1. Then Call/CallAsync/Notify

Broadcast - Notify

var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})

func broadcast() {
	var svr *arpc.Server = ... 
	msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
	mux.RLock()
	for client := range clientMap {
		client.PushMsg(msg, arpc.TimeZero)
	}
	mux.RUnlock()
}

Async Response

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

asyncResponse := true // default is true, or set false
handler.Handle("/echo", func(ctx *arpc.Context) {
	req := ...
	err := ctx.Bind(req)
	if err == nil {
		ctx.Write(data)
	}
}, asyncResponse)

Handle New Connection

// package
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleConnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleConnected(func(c *arpc.Client) {
	...
})

Handle Disconnected

// package
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})

Handle Client's send queue overstock

// package
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})

Custom Net Protocol

// server
var ln net.Listener = ...
svr := arpc.NewServer()
svr.Serve(ln)

// client
dialer := func() (net.Conn, error) { 
	return ... 
}
client, err := arpc.NewClient(dialer)

Custom Codec

import "github.com/lesismal/arpc/codec"

var codec arpc.Codec = ...

// package
codec.Defaultcodec = codec

// server
svr := arpc.NewServer()
svr.Codec = codec

// client
client, err := arpc.NewClient(...)
client.Codec = codec

Custom Logger

import "github.com/lesismal/arpc/log"

var logger arpc.Logger = ...
log.SetLogger(logger) // log.DefaultLogger = logger

Custom operations before conn's recv and send

arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) {
	// ...
})

arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) {
	// ...
})

Custom arpc.Client's Reader by wrapping net.Conn

arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) {
	// ...
})

Custom arpc.Client's send queue capacity

arpc.DefaultHandler.SetSendQueueSize(4096)

JS Client

Web Chat Examples

Pub/Sub Examples

  • start a server
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	s := pubsub.NewServer()
	s.Password = password

	// server publish to all clients
	go func() {
		for i := 0; true; i++ {
			time.Sleep(time.Second)
			s.Publish(topicName, fmt.Sprintf("message from server %v", i))
		}
	}()

	s.Run(address)
}
  • start a subscribe client
import "github.com/lesismal/arpc/log"
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func onTopic(topic *pubsub.Topic) {
	log.Info("[OnTopic] [%v] \"%v\", [%v]",
		topic.Name,
		string(topic.Data),
		time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000"))
}

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	// subscribe topic
	if err := client.Subscribe(topicName, onTopic, time.Second); err != nil {
		panic(err)
	}

	<-make(chan int)
}
  • start a publish client
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	for i := 0; true; i++ {
		if i%5 == 0 {
			// publish msg to all clients
			client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		} else {
			// publish msg to only one client
			client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		}
		time.Sleep(time.Second)
	}
}

More Examples

Issues
  • 问一下 server 调用 client代码的样例 感觉我没用对, 又不知道哪里没对, 最终导致异常了

    问一下 server 调用 client代码的样例 感觉我没用对, 又不知道哪里没对, 最终导致异常了

    我的想问问 当 client 和 server已经连通之后, 在什么样的时机或者机制可以 通过ctx.Client.Call(....) 我的实际场景是, 当server拿到客户端传过来的数据之后, 需要经过耗时的运算, 然后将server的数据再通过同一个tcp连接 传回去, 但是不太方便直接在 在 server的handler中直接 ctx.Write(). 所以才想到用server端去"call client"

    client端

    
    client, err := arpc.NewClient(func() (net.Conn, error) {
        return net.DialTimeout("tcp", "localhost:10001", time.Second*3)
    })
    	
    
    payload := Payload{}
    client.Call("/send_payload", &payload, &rsp, time.Second*2)
    
    client.Handler.Handle("/receive_payload", func(ctx *arpc.Context) {
    
        log.Print("server call received")
        var payload protocol.Payload
        
        if err := ctx.Bind(&payload); err == nil {
    	    log.Print("req payload: ", payload.ToString())
    	    ctx.Write([]byte("ok"))
        }
    })
    

    server端 出错的的案例

    server := arpc.NewServer()
        
        ready_chan := make(chan bool)
        server.Handler.Handle("/send_payload", func(ctx *arpc.Context) {
        var payload protocol.Payload
        
        if err := ctx.Bind(&payload); err == nil {
    	    log.Print("req payload: ", payload.ToString())
    	    client = ctx.Client
    	    ctx.Write([]byte("ok"))
    	    ready_chan <- true
        }
        })
        
        go func() {
        <-ready_chan
        log.Print("ctx.Client", client) // 这一行注释掉就崩了, 异常参见下一个段落
        res := ""
        
        payload := protocol.NewPayload(
    	    1,
    	    1,
    	    1,
    	    []byte("server call client"),
        )
        client.Call("/receive_payload", &payload, &res, time.Second*20)
        log.Print(fmt.Sprintf("server call client, req:%s, res:%s", payload.ToString(), res))
        
        }()
        
        server.Run("localhost:10001")
    
    
    

    上面写法异常也看不到程序本身的行数, 应该是有地方没 recover到

    2021/09/09 15:16:13.832 [ERR] runtime error: runtime error: invalid memory address or nil pointer dereference
    traceback:
    goroutine 25 [running]:
    runtime/debug.Stack(0xc000107c20, 0x1b6980, 0x2bf960)
            c:/go/src/runtime/debug/stack.go:24 +0xa5
    github.com/lesismal/arpc/util.Recover()
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/util/util.go:21 +0x5e
    panic(0x1b6980, 0x2bf960)
            c:/go/src/runtime/panic.go:969 +0x1c7
    github.com/lesismal/arpc.(*handler).OnMessage(0xc0000bc000, 0xc0000ba000, 0xc00001c200)
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/handler.go:596 +0x2f4
    github.com/lesismal/arpc.(*Client).recvLoop(0xc0000ba000)
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/client.go:735 +0x2bf
    github.com/lesismal/arpc/util.Safe(0xc00003e520)
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/util/util.go:28 +0x50
    created by github.com/lesismal/arpc.(*Client).run
            C:/Users/go/pkg/mod/github.com/lesismal/[email protected]/client.go:680 +0x108
    
    opened by shoaly 22
  • Benchmark test between v1.1.9 and v1.1.8

    Benchmark test between v1.1.9 and v1.1.8

    **Test environment **

    On the same machine, the Windows/Linux test results are relatively consistent.

    Use official documents:

    • arpc/examples/bench/client/client.go
    • arpc/examples/bench/server/server.go

    arpc v1.1.9 client.Call

    2021/08/17 09:47:32 [qps: 25756], [avg: 25796 / s], [total: 644904, 25 s]
    2021/08/17 09:47:33 [qps: 25208], [avg: 25773 / s], [total: 670112, 26 s]
    2021/08/17 09:47:34 [qps: 24706], [avg: 25734 / s], [total: 694818, 27 s]
    2021/08/17 09:47:35 [qps: 26389], [avg: 25757 / s], [total: 721207, 28 s]
    2021/08/17 09:47:36 [qps: 24529], [avg: 25715 / s], [total: 745736, 29 s]
    2021/08/17 09:47:37 [qps: 21087], [avg: 25560 / s], [total: 766823, 30 s]
    

    arpc v1.1.8 client.Call

    2021/08/17 09:48:59 [qps: 35669], [avg: 35430 / s], [total: 885762, 25 s]
    2021/08/17 09:49:00 [qps: 33920], [avg: 35372 / s], [total: 919682, 26 s]
    2021/08/17 09:49:01 [qps: 34806], [avg: 35351 / s], [total: 954488, 27 s]
    2021/08/17 09:49:02 [qps: 34576], [avg: 35323 / s], [total: 989064, 28 s]
    2021/08/17 09:49:03 [qps: 33994], [avg: 35277 / s], [total: 1023058, 29 s]
    2021/08/17 09:49:04 [qps: 34774], [avg: 35261 / s], [total: 1057832, 30 s]
    

    v1.1.9 client.Notify

    for k := 0; true; k++ {
    	rand.Read(data)
    	req := &HelloReq{Msg: base64.RawStdEncoding.EncodeToString(data)}
    	// rsp := &HelloRsp{}
    	err = client.Notify(method, req, time.Second*5)
    	if err != nil {
    		log.Printf("Call failed: %v", err)
    	// } else if rsp.Msg != req.Msg {
    	// 	log.Fatal("Call failed: not equal")
    	} else {
    		atomic.AddUint64(&qpsSec, 1)
    	}
    }
    
    2021/08/17 09:54:04 [qps: 6568], [avg: 63304 / s], [total: 1582624, 25 s]
    2021/08/17 09:54:05 [qps: 52315], [avg: 62882 / s], [total: 1634939, 26 s]
    2021/08/17 09:54:08 [qps: 51121], [avg: 62446 / s], [total: 1686060, 27 s]
    2021/08/17 09:54:08 [qps: 96364], [avg: 63658 / s], [total: 1782424, 28 s]
    2021/08/17 09:54:08 [qps: 15375], [avg: 61993 / s], [total: 1797799, 29 s]
    2021/08/17 09:54:09 [qps: 43706], [avg: 61383 / s], [total: 1841505, 30 s]
    

    v1.1.8 client.Notify

    for k := 0; true; k++ {
    	req := &HelloReq{Msg: "hello from client.Call"}
    	// rsp := &HelloRsp{}
    	err = client.Notify(method, req, time.Second*5)
    	if err != nil {
    		log.Printf("Call failed: %v", err)
    	} else {
    		//log.Printf("Call Response: \"%v\"", rsp.Msg)
    		atomic.AddUint64(&qpsSec, 1)
    	}
    }
    
    2021/08/17 09:51:12 [qps: 464933], [avg: 515004 / s], [total: 12875112, 25 s]
    2021/08/17 09:51:13 [qps: 340793], [avg: 508304 / s], [total: 13215905, 26 s]
    2021/08/17 09:51:14 [qps: 445335], [avg: 505971 / s], [total: 13661240, 27 s]
    2021/08/17 09:51:15 [qps: 498246], [avg: 505695 / s], [total: 14159486, 28 s]
    2021/08/17 09:51:16 [qps: 385566], [avg: 501553 / s], [total: 14545052, 29 s]
    2021/08/17 09:51:17 [qps: 504710], [avg: 501658 / s], [total: 15049762, 30 s]
    

    v1.1.9-server, v1.1.8-client client.Notify

    2021/08/17 09:55:59 [qps: 349016], [avg: 327355 / s], [total: 8183891, 25 s]
    2021/08/17 09:56:00 [qps: 335811], [avg: 327680 / s], [total: 8519702, 26 s]
    2021/08/17 09:56:01 [qps: 354877], [avg: 328688 / s], [total: 8874579, 27 s]
    2021/08/17 09:56:02 [qps: 302469], [avg: 327751 / s], [total: 9177048, 28 s]
    2021/08/17 09:56:03 [qps: 364375], [avg: 329014 / s], [total: 9541423, 29 s]
    2021/08/17 09:56:04 [qps: 341278], [avg: 329423 / s], [total: 9882701, 30 s]
    
    opened by fufuok 22
  • issue with client

    issue with client

    你好,请问如果在连接成功的时候,客户端需要发送一条消息给服务端,是可以像以下代码这样发送消息么?

    client.Handler.HandleConnected(func(connectedClient *arpc.Client) {
    		req := ""
    		err := connectedClient.Call("/callAfterConnected","", &req, time.Second * 5)
    		if err == nil {
    			fmt.Println("Call After Connected Success.")
    		}else {
    			fmt.Println("Call After Connected Failed. ", err, "\n", req)
    		}
    	})
    

    我尝试了一下,发现很奇怪的现象,有时候这个 Handler 不会触发,如果触发该 Handler ,则必定错误,原因均为 timeout,此时 ARPC 会报 [WRN] [ARPC CLI] OnMessage: session not exist or expired 错误。

    备注:当前 client 并不需要执行 Stop 操作,所以确定不是因为 Stop 触发的。

    opened by spxvszero 14
  • 可以增加一个最大重试次数限制或者回调么?

    可以增加一个最大重试次数限制或者回调么?

    我在使用的过程中,发现通过 Client 建立的请求,当 Server 端意外失联之后,Client 端会不断的尝试重连,重试间隔 1s,无法中断,查看 client.go 的代码后发现有如下逻辑:

    https://github.com/lesismal/arpc/blob/b7b66250afb12b445295063631cb8072a35f0788/client.go#L752-L771

    期望: 是否可以在此处增加重连次数的回调?可以让调用者决定多少次之后放弃重新建立连接,转而处理其他业务? 如果考虑到 Handler 的调用次数可能过于频繁的话,那是否可以在 Client 的配置中增加个限制最大重试次数,超过之后直接走 Disconnected 的 Handler,这个 Disconnected 的 Handler 目前似乎只有在 Client 端主动 Stop 的时候才会执行。

    我尝试 fork 增加了部分相关代码,目前使用上是满足我的业务了,不过没有经过测试,也不敢提 pull request,所以在这里提出这个改进建议,谢谢阅读。

    opened by spxvszero 11
  • pubsub not removing disconnected clients

    pubsub not removing disconnected clients

    When subscribing client disconnects from server without unsubscribing first pupsub.Server.Publish() gives error: 2021/09/22 19:04:49.612 [ERR] [Publish] [topic: 'Broadcast'] failed client stopped, from Server to 127.0.0.1:35306

    From reading the code regarding pupsub.NewServer() svr.Handler.HandleDisconnected(svr.deleteClient) Publish() should no longer be trying to send to a disconnected client but this does not seem to be working?

    I do see the client disconnecting in log: 2021/09/22 19:04:30.206 [ERR] [APS SVR] 127.0.0.1:35306 Disconnected: EOF

    I tried also to do this manually but when I add to code:

    srv.Handler.HandleDisconnected(func (c *arpc.Client) {
        log.Error().Msg("HANDLE_DISCONNECTED_TEST")
    })
    

    Nothing happens either..

    opened by sebstyle 8
  • Bugs For arpc.js

    Bugs For arpc.js

    您好,请问一下,有关 websocket 的消息截断的问题,在 onMessage 函数下,通过 offset 去循环处理一段消息,这里使用 offset 的原因是什么?

    https://github.com/lesismal/arpc/blob/9ff9485288edf8f855b16f113061712e5737cf51/extension/jsclient/arpc.js#L179-L188

    我在使用的时候,发现 bodyLen 的长度计算不正确,例如我从服务器推送一条 Notify 的消息,在这里断点,得到的 event.data.byteLength 长度是 645,然而通过 bodyLen 计算出来的结果是 119,即使这是在 while 循环中,offset 会在下一次循环开始前偏移,但是消息已经在第一次循环里就发送给 handle 了

    https://github.com/lesismal/arpc/blob/9ff9485288edf8f855b16f113061712e5737cf51/extension/jsclient/arpc.js#L211-L214

    还有另一个问题是,第二次循环中,header 的内容并不存在,计算的 method 或者其他信息都是原来消息体的一部分信息,即使想要组装数据,似乎也没法完成。

    https://github.com/lesismal/arpc/blob/9ff9485288edf8f855b16f113061712e5737cf51/extension/jsclient/arpc.js#L190-L192

    以下是我使用的部分代码:

           //JS:
           client = new ArpcClient("ws://localhost:8888/ws", null)
           //订阅模式的 Handler
           client.handle("/broadcast/info", function(ctx) {
                console.log("[Info] ", ctx)
           });
           //请求订阅,成功之后通过上面的 Handler 获取服务器推送的消息
           client.call("/registerBroadcast", "xxxx", 5000, function(resp) {
                console.log("response ", resp)
           })
    
    
    //Server:
    func InitServer(port string)  {
    	ln, _ := websocket.Listen("localhost:"+port, nil)
    	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    		fmt.Println("url: %v", r.URL.String())
    		if r.URL.Path == "/" {
    			http.ServeFile(w, r, "chat.html")
    		} else if r.URL.Path == "/arpc.js" {
    			http.ServeFile(w, r, "arpc.js")
    		} else {
    			http.NotFound(w, r)
    		}
    	})
    
    	http.HandleFunc("/ws", ln.(*websocket.Listener).Handler)
    	go func() {
    		err := http.ListenAndServe("localhost:"+port, nil)
    		if err != nil {
    			fmt.Println("ListenAndServe: ", err)
    			panic(err)
    		}
    	}()
    
    	server := arpc.NewServer()
    
    	/*
    		Broadcast auth
    
    		request  :
    			AccessKey	string
    		response :
    			true or false
    	*/
    	server.Handler.Handle(pinApi.AddressForApi(pinApi.BROADCAST_AUTH), func(context *arpc.Context) {
    		var accessKey string
    		if err := context.Bind(&accessKey); err != nil {
    			context.Write(code.Failed)
    			return
    		}
    		if len(accessKey) > 0 && accessKey == default.AccessKey  {
    			BroadcastClients = append(BroadcastClients, context.Client)
    			context.Write(define.RPC_SUCCESS)
    		}else {
    			context.Write(code.Failed)
    		}
    	})
    
    	//broadcast info
    	go broadcastInfo()
    
    	server.Serve(ln)
    }
    func broadcastServerInfo() {
    	for true {
    		if len(BroadcastClients) > 0 {
    			log.Println(len(BroadcastClients), " clients listen Broad casting...")
                            // jsonData 是个较长的 json 数据
    			msg := RPCServer.NewMessage(arpc.CmdNotify,"/broadcast/info", jsonData)
    			for _, client := range BroadcastClients {
    				client.PushMsg(msg, arpc.TimeZero)
    			}
    		}
    		time.Sleep(time.Second * 1)
    	}
    }
    
    
    bug 
    opened by spxvszero 8
  • 例子里的micro下面的client.go的serviceManager.ClientBy获取不到服务

    例子里的micro下面的client.go的serviceManager.ClientBy获取不到服务

    package main
    
    import (
    	"net"
    	"time"
    
    	"github.com/lesismal/arpc/extension/micro"
    	"github.com/lesismal/arpc/extension/micro/etcd"
    	"github.com/lesismal/arpc/log"
    )
    
    func dialer(addr string) (net.Conn, error) {
    	return net.DialTimeout("tcp", addr, time.Second*3)
    }
    
    func main() {
    	var (
    		appPrefix = "app"
    		service   = "echo"
    
    		endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"}
    
    		serviceManager = micro.NewServiceManager(dialer)
    	)
    	discovery, err := etcd.NewDiscovery(endpoints, appPrefix, serviceManager)
    	if err != nil {
    		log.Error("NewDiscovery failed: %v", err)
    		panic(err)
    	}
    	defer discovery.Stop()
    
    		for {
    		time.Sleep(time.Second * 1)
    		client, err := serviceManager.ClientBy(service)
                    // 这里会报错,获取不到服务,必须要在上面先停留一段时间才能获取到。
    		if err != nil {
    			log.Error("get Client failed: %v", err)
    		} else {
    			req := "arden"
    			rsp := ""
    			err = client.Call("/echo", &req, &rsp, time.Second*5)
    			if err != nil {
    				log.Info("Call /echo failed: %v", err)
    			} else {
    				log.Info("Call /echo Response: \"%v\"", rsp)
    			}
    		}
    		time.Sleep(time.Second)
    	}
    }
    

    time.Sleep(time.Second * 1) client, err := serviceManager.ClientBy(service)' // 这里会报错,获取不到服务,必须要在上面先停留一段时间才能获取到。

    opened by arden 6
  • extension/micro: feature request

    extension/micro: feature request

    Thanks for your work, arpc is awesome.

    1, In game scenario, sometimes a request from the frontend server must be transferred to a special server, not a random one.

    ref: https://github.com/lesismal/arpc/blob/master/extension/micro/service.go#L210

    2, RPCX has metadata called state which is very useful in production.

    ref: https://github.com/rpcxio/rpcx-examples/blob/master/state/client/client.go#L21

    Are you considering adding these above?

    Thanks!

    opened by landbed 6
  • Missing SetLogLevel

    Missing SetLogLevel

    Hello,

    Just performed an update to the latest version and noticed SetLogLevel function has disappeared. This code no longer compiles arpc.SetLogLevel(arpc.LogLevelError).

    opened by ghost 6
  • Is there an elegant way to handle multiple client sessions?

    Is there an elegant way to handle multiple client sessions?

    I would like to work with arpc because the benchmark numbers of tests I made are very good. It has a great performance. Fantastic work @lesismal !

    In my test environment I have replication of containers with different IP addresses within one network. One container should connect to multiple clients. And if a connection is lost it should start to retry to connect. A simplification of the code would look like:

    ...
    var clients = make(map[string]*arpc.Client)
    ...
    func connectClients() {
    	// find all IP addresses of the the host (e.g. in docker, tasks.foo)
    	// Lets say we'll find 10.0.1.3 and 10.0.1.4
    	ips, err := net.LookupIP(addrClientLookup) 
    	...
    	for _, ip := range ips {
    		if _, inList := clients[ip.String()]; !inList {
    			client, err := arpc.NewClient(func() (net.Conn, error) {
    				return net.DialTimeout("tcp", ip.String()+":8888", time.Second*1)
    			}, 1)
    			if err != nil {
    				...
    				return
    			}
    			...
    			client.Handler.HandleDisconnected(func(c *arpc.Client) {
    				...
    				delete(clients, host)
    			})
    			...
    			clients[ip.String()] = client
    		}
    	}
    }
    

    connectClients is called every second because it can happen that a new container has been added. My issue is if a connection gets lost. In some cases it starts a Reconnect Trying x-loop but in other cases it replaces the target IP address from c.Conn with one from an other client. Like

    Map[10.0.1.3][ 10.0.1.3:44108 -> 10.0.1.3:8888 ]
    Map[10.0.1.4][ 10.0.1.3:45780 -> 10.0.1.4:8888 ] // correct
    

    to somewhat like

    Map[10.0.1.3][ 10.0.1.3:44108 -> 10.0.1.3:8888 ]
    Map[10.0.1.4][ 10.0.1.3:44120 -> 10.0.1.3:8888 ] // wrong
    

    without even touching the HandleDisconnected handler. Map[10.0.1.4] should be deleted because the connection is lost. Anyway is there a better way to handle multiple clients in a way that they don't interfere with each other in a failure situation?

    opened by randree 5
  • panic: unaligned 64-bit atomic operation on 32 bit

    panic: unaligned 64-bit atomic operation on 32 bit

    goroutine 38 [running]:
    runtime/internal/atomic.panicUnaligned()
    	C:/Program Files/Go/src/runtime/internal/atomic/unaligned.go:8 +0x2d
    runtime/internal/atomic.Xadd64(0x1212a09c, 0x1)
    	C:/Program Files/Go/src/runtime/internal/atomic/atomic_386.s:125 +0x11
    github.com/lesismal/arpc.(*Client).newRequestMessage(0x1212a060, 0x1, {0x7946f350, 0x10}, {0x79457880, 0x11c68000}, 0x0, 0x1, {0x0, 0x0, ...})
    	F:/Go/pkg/mod/github.com/lesismal/[email protected]/client.go:564 +0x10d
    github.com/lesismal/arpc.(*Client).CallAsync(0x1212a060, {0x7946f350, 0x10}, {0x79457880, 0x11c68000}, 0x7947da14, 0x12a05f200, {0x0, 0x0, 0x0})
    

    貌似原子操作 32 位要对齐?

    opened by Invincibl-e 5
Releases(v1.2.9)
Owner
lesismal
less is more.
lesismal
An implementation of a distributed KV store backed by Raft tolerant of node failures and network partitions 🚣

barge A simple implementation of a consistent, distributed Key:Value store which uses the Raft Concensus Algorithm. This project launches a cluster of

Shehjad Khan 0 Nov 24, 2021
Parallel Digital Universe - A decentralized identity-based social network

Parallel Digital Universe Golang implementation of PDU. What is PDU? Usage Development Contributing PDU PDU is a decentralized identity-based social n

PDU.PUB 39 Jun 16, 2022
Like Go channels over the network

libchan: like Go channels over the network Libchan is an ultra-lightweight networking library which lets network services communicate in the same way

Docker 2.4k Jun 18, 2022
A Golang implementation of the Umee network, a decentralized universal capital facility in the Cosmos ecosystem.

Umee A Golang implementation of the Umee network, a decentralized universal capital facility in the Cosmos ecosystem. Umee is a Universal Capital Faci

null 139 Jun 27, 2022
Network connecter for storage.

database Quick fast connection to database use gorm. Installation $ go get -u github.com/coolstina/connecter Example package main import ( "fmt" "

coolstina 0 Dec 4, 2021
Golang client library for adding support for interacting and monitoring Celery workers, tasks and events.

Celeriac Golang client library for adding support for interacting and monitoring Celery workers and tasks. It provides functionality to place tasks on

Stefan von Cavallar 72 Jun 24, 2022
dht is used by anacrolix/torrent, and is intended for use as a library in other projects both torrent related and otherwise

dht Installation Install the library package with go get github.com/anacrolix/dht, or the provided cmds with go get github.com/anacrolix/dht/cmd/....

Matt Joiner 238 Jun 20, 2022
AppsFlyer 483 Jun 23, 2022
Take control of your data, connect with anything, and expose it anywhere through protocols such as HTTP, GraphQL, and gRPC.

Semaphore Chat: Discord Documentation: Github pages Go package documentation: GoDev Take control of your data, connect with anything, and expose it an

Jexia.com 74 May 22, 2022
A feature complete and high performance multi-group Raft library in Go.

Dragonboat - A Multi-Group Raft library in Go / 中文版 News 2021-01-20 Dragonboat v3.3 has been released, please check CHANGELOG for all changes. 2020-03

lni 4.3k Jun 23, 2022
High performance, distributed and low latency publish-subscribe platform.

Emitter: Distributed Publish-Subscribe Platform Emitter is a distributed, scalable and fault-tolerant publish-subscribe platform built with MQTT proto

emitter 3.3k Jun 26, 2022
Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.

Gleam Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize. Gleam is built

Chris Lu 3.1k Jun 28, 2022
Simple, fast and scalable golang rpc library for high load

gorpc Simple, fast and scalable golang RPC library for high load and microservices. Gorpc provides the following features useful for highly loaded pro

Aliaksandr Valialkin 651 Jun 20, 2022
🌧 BitTorrent client and library in Go

rain BitTorrent client and library in Go. Running in production at put.io. Features Core protocol Fast extension Magnet links Multiple trackers UDP tr

Cenk Altı 742 Jun 24, 2022
A Go library for master-less peer-to-peer autodiscovery and RPC between HTTP services

sleuth sleuth is a Go library that provides master-less peer-to-peer autodiscovery and RPC between HTTP services that reside on the same network. It w

null 352 Jun 21, 2022
Full-featured BitTorrent client package and utilities

torrent This repository implements BitTorrent-related packages and command-line utilities in Go. The emphasis is on use as a library from other projec

Matt Joiner 4.4k Jun 22, 2022
Go Open Source, Distributed, Simple and efficient Search Engine

Go Open Source, Distributed, Simple and efficient full text search engine.

ego 6.1k Jun 25, 2022