Distributed WebSocket broker

Related tags

go redis websocket distributed
Overview

dSock

dSock is a distributed WebSocket broker (in Go, using Redis).

Clients can authenticate & connect, and you can send text/binary message as an API.

Features

Multiple clients per user & authentication

dSock can broadcast a message to all clients for a certain user (identified by user ID and optionally session ID) or a certain connection (by ID). Users can be authenticated using claims or JWTs (see below).

Distributed

dSock can be scaled up easily as it uses Redis as a central database & pub/sub, with clients connecting to worker. It's designed to run on the cloud using scalable platforms such as Kubernetes or Cloud Run.

Text & binary messaging

dSock is designed for text and binary messaging, enabling JSON (UTF-8), Protocol Buffers, or any custom protocol.

Lightweight & fast

dSock utilized Go's concurrency for great performance at scale, with easy distribution and safety. It is available as Docker images for convenience.

Disconnects

Disconnect clients from an external event (logout) from a session ID or for all user connections.

Uses

The main use case for dSock is having stateful WebSocket connections act as a stateless API.

This enables you to not worry about your connection handling and simply send messages to all (or some) of a user's clients as any other HTTP API.

Chat service

Clients connect to dSock, and your back-end can broadcast messages to a specific user's clients

More!

Clients

Use a client to interact with the dSock API easily. Your language missing? Open a ticket!

Architecture

dSock is separated into 2 main services:

  • dSock Worker This is the main server clients connect to. The worker distributed the messages to the clients ("last mile")

  • dSock API The API receives messages and distributes it to the workers for target clients

This allows the worker (connections) and API (gateway) to scale independently and horizontally.

dSock uses Redis as a backend data store, to store connection locations and claims.

Terminology

Word
WebSocket Sockets "over" HTTP(S)
JWT JSON Web Token
Claim dSock authentication mention using a pre-registered claim ("token")
Redis Open-source in-memory key-value database

Flow

  • Authentication:
    • Client does request to your API, you either:
      • Hit the dSock API to create a claim for the user
      • Generate & sign a JWT for the user
    • You return the claim or JWT to client
  • Connection:
    • User connects to a worker with claim or JWT
  • Sending:
    • You hit the dSock API (POST /send) with the target (user, session optionally) and the message as body
    • Message sent to target(s)

Setup

Installation

dSock is published as binaries and as Docker images.

Binaries

Binaries are available on the releases pages.

You can simply run the binary for your architecture/OS.

You can configure dSock using environment variables or a config (see below).

Docker images

Docker images are published on Docker Hub:

The images are small (~15MB) and expose on port 80 by default (controllable by setting the PORT environment variable).

It is recommended to use the environment variables to configure dSock instead of a config when using the images. Configs are still supported (can be mounted to /config.toml or /config.$EXT, see below).

Options

dSock can be configured using a config file or using environment variables.

  • PORT (port, integer, or DSOCK_PORT environment variable): Port to listen to. Defaults to 6241
  • DSOCK_ADDRESS (address, string, deprecated):: Address to listen to. Defaults to :6241. Uses port if empty.
  • Redis:
    • DSOCK_REDIS_HOST (redis_host, string): Redis host. Defaults to localhost:6379
    • DSOCK_REDIS_PASSWORD (redis_password, string): Redis password. Defaults to no password
    • DSOCK_REDIS_DB (redis_db, integer): Redis database. Defaults to 0
    • DSOCK_REDIS_MAX_RETRIES (redis_max_retries, integer): Maximum retries before failing Redis connection. Defaults to 10
    • DSOCK_REDIS_TLS (redis_tls, boolean): Whether to enable TLS for Redis. Defaults to false
  • DSOCK_DEFAULT_CHANNELS (default_channels, comma-delimited string, optional): When set, clients will be automatically subscribed to these channels
  • Authentication:
    • DSOCK_TOKEN (token, string): Authentication token to do requests to the API
    • DSOCK_JWT_SECRET (jwt_secret, string, optional): When set, enables JWT authentication
  • DSOCK_DEBUG (debug, boolean): Enables debugging, useful for development. Defaults to false
  • DSOCK_LOG_REQUESTS (log_requests, boolean): Enables request logging. Defaults to false
  • DSOCK_MESSAGING_METHOD (messaging_method, string): The messages method for communication from API to worker. Can be: redis, direct. Defaults to redis

Worker only

  • DSOCK_DIRECT_MESSAGE_HOSTNAME (direct_message_hostname, string, worker only): If method_method is set to direct, this is the hostname of the worker accessible from the API. Defaults to first local non-loopback IPv4
  • DSOCK_DIRECT_MESSAGE_PORT (direct_message_port, string, worker only): If method_method is set to direct, this is the port that the worker is listening on. Defaults to port

You can write your config file in TOML (recommended), JSON, YAML, or any format supported by viper

Configs are loaded from (in order):

  • $PWD/config.$EXT
  • $HOME/.config/dsock/config.$EXT
  • /etc/dsock/config.$EXT

A default config will be created at $PWD/config.toml if no config is found.

Usage

All API calls will return a success boolean. If it is false, it will also add error (message) and errorCode (constant from common/errors.go).

All API calls (excluding /connect endpoint) requires authentication with a token query parameter, or set as a Authorization header in the format of: Bearer $TOKEN.

Having an invalid or missing token will result in the INVALID_AUTHORIZATION error code.

Most errors starting with ERROR_ are downstream errors, usually from Redis. Check if your Redis connection is valid!

When targeting, the precedence order is: id, channel, user.

Client authentication

Claims

Claims are the recommended way to authenticate with dSock. Before a client connects, they should hit your API (which you can use your usual authentication), and your API requests the dSock API to create a "claim", which you then return to the client.

Once a client has a claim, it can then connect to the worker using the claim query parameter.

You can create them by accessing the API as POST /claim with the following query options:

  • user (required, string): The user ID
    • session (optional, string): The session ID (scoped per user)
  • channels (optional, comma-delimited string): Channels to subscribe on join (merged with default_channels)
  • Time-related (not required, default expiration is 1 minute after the claim is created, only one used):
    • expiration (integer, seconds from epoch): Time the claim expires (takes precedence over duration)
    • duration (integer, seconds): Duration of the claim
  • token (required, string): Authorization token for API set in config. Can also be a Authorization Bearer token
  • id (optional, string): The claim ID to use. This should not be guessed, so long random string or UUIDv4 is recommended. If not set, it will generate a random string (recommended to let dSock generate the ID)

The returned body will contain the following keys:

  • claim: The claim data
    • id: The claim ID
    • expiration: The expiration in seconds from epoch
    • user: The user for the claim
    • session (if session is provided): The user session for the claim
    • channels: The channels to subscribe on join (excludes defaults)

A claim is single-use, so once a client connects, it will instantly expire.

Examples

Create a claim for a user (1) expiring in 10 seconds, with 2 channels:

POST /claim?token=abcxyz&user=1&duration=10&channels=group-1,group-2

Create a claim for a user (1) with a session (a) with a claim ID (a1b2c3) expiring at some time:

POST /claim?user=1&session=a&expiration=1588473164&id=a1b2c3
Authorization: Bearer abcxyz
Errors

Creating a claim has the follow possible errors:

  • USER_ID_REQUIRED: If the user parameter is not set
  • INVALID_EXPIRATION: If the expiration is invalid (not parsable as integer)
  • NEGATIVE_EXPIRATION: If the expiration is negative
  • INVALID_DURATION: If the duration is invalid (not parsable as integer)
  • NEGATIVE_DURATION: If the duration is negative
  • ERROR_CHECKING_CLAIM: If an error occurred during checking if a claim exist (Redis error)
  • CLAIM_ID_ALREADY_USED: If the claim ID is set and is already used

JWT

To authenticate a client, you can also create a JWT token and deliver it to the client before connecting. To enable this, set the jwt_secret to with your JWT secret (HMAC signature secret)

Payload options:

  • sub (required, string): The user ID
  • sid (optional, string): The session ID (scoped per user)
  • channels (optional, array of string): Channels to subscribe on join (merged with default_channels)
  • Time-related (one is required):
    • iat (integer, in seconds from epoch): Time the JWT is issued (expires 1 minute after this time)
    • exp (integer, in seconds from epoch): Expiration time for the JWT, takes precedence over iat

Client connections

Connect using a WebSocket to ws://worker/connect with the one of the following query parameter options:

  • claim: The authentication claim created previously (takes precedence over jwt)
  • jwt: JWT created previously

You can load-balance a cluster of workers, as long as the load-balancer supports WebSockets.

Errors

The following errors can happen during connection:

  • ERROR_GETTING_CLAIM: If an error occurred during fetching the claim (Redis error)
  • MISSING_CLAIM: If the claim ID doesn't exists. This can also happen if the claim has expired
  • INVALID_EXPIRATION: If the claim has an invalid expiration (shouldn't happen unless Redis error)
  • EXPIRED_CLAIM: If the claim has expired, but Redis hasn't expired the claim on it's own
  • INVALID_JWT: If the JWT is malformed (bad JSON/JWT format) or is not signed with proper key
  • MISSING_AUTHENTICATION: If no authentication is provided (no claim/JWT)

Sending message

Sending a message is done through the POST /send API endpoint.

Query param options:

  • Targeting (one is required):
    • user (string): The user ID to target
      • session (optional, string, when user is set): The specific session(s) to target from the user
    • id (string UUID): The specific internal connection ID
    • channel (string): The channel to target
  • type (required, string): Message (body) type. Can be text (UTF-8 text) or binary. This becomes the WebSocket message type.
  • token (required, string): Authorization token for API set in config. Can also be a Authorization Bearer token

The body of the request is used as the message. This can be text/binary, and the Content-Type header is not used internally (only type is used).

Examples

Send a JSON message to a user (1)

POST /send?token=abcxyz&user=1&type=text

{"message":"Hello world!","from":"Charles"}

Send a text value to a user (1) with a session (a)

POST /send?user=1&session=a&type=text
Authorization: Bearer abcxyz

<Cretezy> Hey!

Send a binary value to all clients subscribed in a channel:

POST /send?channel=group-1&type=binary
Authorization: Bearer abcxyz

# Binary...

Errors

The following errors can happen during sending a message:

  • INVALID_AUTHORIZATION: Invalid authentication (token). See errors section under usage
  • ERROR_GETTING_CONNECTION: If could not fetch connection(s) (Redis error)
  • ERROR_GETTING_USER: If user is set and could not fetch user (Redis error)
  • ERROR_GETTING_CHANNEL: If channel is set and could not fetch channel (Redis error)
  • MISSING_TARGET: If target is not provider
  • INVALID_MESSAGE_TYPE: If the type is invalid
  • ERROR_READING_MESSAGE: If an error occurred during reading the request body
  • ERROR_MARSHALLING_MESSAGE: If an error occurred during preparing to send the message to the workers (shouldn't happen)

Disconnecting

You can disconnect a client by user (and optionally session) ID.

This is useful when logging out a user, to make sure it also disconnects any connections. Make sure to include a session in your claim/JWT to be able to disconnect only some of a user's connections.

The API endpoint is POST /disconnect, with the following query params:

  • Targeting (one is required):
    • user (string): The user ID to target
      • session (optional, string, when user is set): The specific session(s) to target from the user
    • id (string UUID): The specific internal connection ID
    • channel (string): The channel to target
  • token (required, string): Authorization token for API set in config. Can also be a Authorization Bearer token
  • keepClaims (optional, boolean): When set to true, keeps active claims for the target. By default, dSock will remove claims for the target to prevent race conditions

Examples

Disconnect a user (1) with a session (a):

POST /send?token=abcxyz&user=1&session=a

Errors

The following errors can happen during disconnection:

  • INVALID_AUTHORIZATION: Invalid authentication (token). See errors section under usage
  • ERROR_GETTING_CONNECTION: If could not fetch connection(s) (Redis error)
  • ERROR_GETTING_USER: If user is set and could not fetch user (Redis error)
  • ERROR_GETTING_CHANNEL: If channel is set and could not fetch channel (Redis error)
  • MISSING_TARGET: If target is not provider
  • ERROR_GETTING_CLAIM: If an error occurred during fetching the claim(s) (Redis error)
  • ERROR_MARSHALLING_MESSAGE: If an error occurred during preparing to send the message to the workers (shouldn't happen)

Info

You can access info about connections and claims using the GET /info API endpoint. The following query params are supported:

  • Targeting (one is required):
    • user (string): The user ID to query
      • session (optional, string, when user is set): The specific session(s) to query from the user
    • id (string UUID): The specific internal connection ID
    • channel (string): The channel to query
  • token (required, string): Authorization token for API set in config. Can also be a Authorization Bearer token

The API will return all opened connections and non-expired claims for the target.

The returned object contains:

  • connections (array of objects): List of open connections for the target
    • id: Internal connection ID
    • worker: Internal worker holding the connection
    • lastPing: Last ping from client in seconds from epoch
    • user: The connection's user
    • session (optional): The connection's session
    • channels: The connection's subscribe channels (includes default_channels)
  • claims (array of objects): List of non-expired claims for the target:
    • id: Claim ID (what a client would connect with)
    • expiration: Claim expiration in seconds from epoch
    • user: The claim's user
    • session (optional): The claim's session

Examples

Get info for a user (1) with a session (a):

GET /info?token=abcxyz&user=1&session=a

Errors

The following errors can happen during getting info:

  • INVALID_AUTHORIZATION: Invalid authentication (token). See errors section under usage
  • ERROR_GETTING_CLAIM: If an error occurred during fetching the claim(s) (Redis error)
  • ERROR_GETTING_CONNECTION: If could not fetch connection(s) (Redis error)
  • ERROR_GETTING_USER: If user is set and could not fetch user (Redis error)
  • ERROR_GETTING_CHANNEL: If channel is set and could not fetch channel (Redis error)
  • MISSING_TARGET: If target is not provider

Channels

You can subscribe/unsubscribe clients to a channel using POST /channel/subscribe/$CHANNEL or POST /channel/unsubscribe/$CHANNEL.

This will subscribe the connections and claims (optional) for the target provided.

The follow query parameters are accepted:

  • Targeting (one is required):
    • user (string): The user ID to query
      • session (optional, string, when user is set): The specific session(s) to query from the user
    • id (string UUID): The specific internal connection ID
    • channel (string): The channel to query
  • ignoreClaims (optional, boolean): When set to true, doesn't add channel to claims for target. By default, dSock will add the channel to the target claims (for when the client does join)
  • token (required, string): Authorization token for API set in config. Can also be a Authorization Bearer token

Examples

Subscribe a user (1) to a channel (a):

POST /channel/subscribe/a?token=abcxyz&user=1

Unsubscribe all clients in a channel from a channel (a):

POST /channel/unsubscribe/a?token=abcxyz&channel=a

Errors

The following errors can happen during channel subscription/unsubscription:

  • INVALID_AUTHORIZATION: Invalid authentication (token). See errors section under usage
  • ERROR_GETTING_CONNECTION: If could not fetch connection(s) (Redis error)
  • ERROR_GETTING_USER: If user is set and could not fetch user (Redis error)
  • ERROR_GETTING_CHANNEL: If channel is set and could not fetch channel (Redis error)
  • MISSING_TARGET: If target is not provider
  • ERROR_MARSHALLING_MESSAGE: If an error occurred during preparing to send the message to the workers (shouldn't happen)

Internals

dSock uses Redis as it's database (for claims and connection information) and for it's publish/subscribe capabilities. Redis was chosen because it is widely used, is performant, and supports all requried features.

Claims

When creating a claim, dSock does the following operations:

  • Set claim:$id to the claim information (user, session, expiration)
  • Add the claim ID to claim-user:$user (to be able to lookup all of a user's claims)
  • Add the claim ID to claim-user-session:$user-$session if session is passed (to be able to lookup all of a user session's claims)
  • Add the claim ID to claim-channel:$channel if channel is passed (to be able to lookup all of a channel's claims)

When a user connects, dSock retrieves the claim by ID and validates it's expiration. It then removes the claim from the user and user session storages.

When getting information or disconnecting, it retrieves or deletes the claim(s).

Connections

When a user connects and authenticates, dSock does the following operations:

  • Set conn:$id to the connection's information (using a random UUID, with user, session, worker ID, and last ping)
  • Add the connection ID to user:$user (to be able to lookup all of a user's connections)
  • Add the connection ID to user-sesion:$user-$session (if session was in authentication, to be able to lookup all of a user session's connections)
  • Add the connection ID to channel:$channel (for each channel in authentication, to be able to lookup all of a channel's connections)

When receiving a ping or pong from the client, it updates the last ping time. A ping is sent from the server every minute.

Connections are kept alive until a client disconnects, or is forcibly disconnected using POST /disconnect

Sending

When sending a message, the API resolves of all of the workers that hold connections for the target user/session/connection, and sends the message through Redis to that worker's channel (worker:$id).

API to worker messages are encoded using Protocol Buffer for efficiency; they are fast to encode/decode, and binary messages to not need to be encoded as strings during communication.

Channels

Channels are assosiated to claims/JWTs (before a client connects) and connections.

When (un)subscribing a target to a channel, it looks up all of the target's claims and adds the claim (if ignoreClaim is not set), and broadcasts to workers with connections that are connected through the $workerId:channel Redis channel.

The worker then resolves all connections for the target and adds them to the channel.

Channels are found under channel:$channel and contain the list of connection IDs which are subscribed.

Claim channels are found under claim-channel:$channel and contain the list of claim IDs which will become subscribed, and is also stored under channels in the claim.

FAQ

Why is built-in HTTPS not supported?

To remove complexity inside dSock, TLS is not implemented. It is expected that the API and worker nodes are behind load-balancers, which would be able to do TLS termination.

If you need TLS, you can either add a TLS-terminating load-balancer, or a reverse proxy (such as nginx or Caddy).

How can I do a health-check on dSock?

You can use the /ping endpoint on the API & worker to monitor if the service is up. It will response pong.

Development

Setup

Protocol Buffers

If making changes to the Protocol Buffer definitions (under protos), make sure you have the protoc compiler and protoc-gen-go.

Once changes are done to the definitions, run task build:protos to generate the associated Go code.

Docker

You can build the Docker images by running task build:docker. This will create the dsock-worker and dsock-api images.

Tests

dSock has multiple types of tests to ensure stability and maximum coverage.

You can run all tests by running task tests. You can also run individual test suites (see below)

End-to-end (E2E)

You can run the E2E tests by running task tests:e2e. The E2E tests are located inside the e2e directory.

Unit

You can run the unit tests by running task tests:unit. The units tests are located inside the common/api/worker directories.

Contributing

Pull requests are encouraged!

License & support

dSock is MIT licensed (see license).

Community support is available through GitHub issues. For professional support, please contact [email protected].

Credit

Icon made by Freepik from flaticon.com.

Project was created & currently maintained by Charles Crete.

Issues
  • TCP Source

    TCP Source

    Hi there! I'm working on a project that involves a bidirectional TCP to websocket connection (e.g. a remote terminal using xterm.js and addon xterm-addon-attach). I'm trying to upgrade it to allow for multiple websocket clients.

    The multi-client functionality of dSock looks very promising for that purpose! From what I can tell, incoming messages come via a POST endpoint, and I'm trying to figure out if there's a way to set it up with TCP.

    I see that you have a project item for bidirectional data, as well as one for TCP connections instead of websockets. Would either of these allow me to set up bidirectional TCP to websocket connections? Do you have any plans to support a TCP connection on the server?

    question 
    opened by dacohenii 4
  • What format are the releases?

    What format are the releases?

    Hello

    I downloaded from the releases and they have no extension .zip .exe. Is it an application release or archive?

    question 
    opened by ghost 4
  • Add locks

    Add locks

    Add locks for global maps inside worker.

    enhancement 
    opened by Cretezy 2
  • chat example using flutter and superstate ?

    chat example using flutter and superstate ?

    Seems like a logical architecture marriage ?

    documentation 
    opened by joe-getcouragenow 2
  • correct unsubscribe url in README file

    correct unsubscribe url in README file

    documentation 
    opened by abdullah-aghayan 1
  • Add testing

    Add testing

    • [x] E2E tests
      • [x] Connect
      • [x] Send
      • [x] Info
      • [x] Disconnect
      • [x] Claims
    • [x] CI (GitHub Actions)
    • [x] Unit tests

    Other changes:

    • Breaking: Changes format of claim creation to have all claim-related data under claim
    enhancement 
    opened by Cretezy 0
  • Add error documentation, fix minor error code mismatch

    Add error documentation, fix minor error code mismatch

    bug documentation 
    opened by Cretezy 0
  • Refactor errors, add better comments

    Refactor errors, add better comments

    enhancement 
    opened by Cretezy 0
  • Replace scripts directory with Task

    Replace scripts directory with Task

    enhancement 
    opened by Cretezy 0
  • Additional transports

    Additional transports

    This is a question regarding this websocket broker implementation.

    I case of a websocket connection does not succeeds does it fallback to another type of transport? i.e. XHR-streaming, Long-Polling, etc?

    If not, are there any plans to implement something like this soon?

    question 
    opened by afdecastro879 1
Releases(v0.4.1)
Owner
Charles Crete
Charles Crete
🐺 Deploy Databases and Services Easily for Development and Testing Pipelines.

Peanut provides an API and a command line tool to deploy and configure the commonly used services like databases, message brokers, graphing tools ... etc. It perfectly suited for development, manual testing, automated testing pipelines where mocking is not possible and test drives.

Ahmed 35 Jul 24, 2021
Go language driver for RethinkDB

RethinkDB-go - RethinkDB Driver for Go Go driver for RethinkDB Current version: v6.2.1 (RethinkDB v2.4) Please note that this version of the driver on

RethinkDB 1.6k Jul 15, 2021
Simple key-value store abstraction and implementations for Go (Redis, Consul, etcd, bbolt, BadgerDB, LevelDB, Memcached, DynamoDB, S3, PostgreSQL, MongoDB, CockroachDB and many more)

gokv Simple key-value store abstraction and implementations for Go Contents Features Simple interface Implementations Value types Marshal formats Road

Philipp Gillé 350 Jul 27, 2021
RethinkDB-go - RethinkDB Driver for Go

Go language driver for RethinkDB

RethinkDB 1.6k Jul 15, 2021
redis client implement by golang, inspired by jedis.

godis redis client implement by golang, refers to jedis. this library implements most of redis command, include normal redis command, cluster command,

piaohao 93 Jul 19, 2021
Type-safe Redis client for Golang

Redis client for Golang ❤️ Uptrace.dev - distributed traces, logs, and errors in one place Join Discord to ask questions. Documentation Reference Exam

null 12.1k Jul 25, 2021
Type-safe Redis client for Golang

Redis client for Golang ❤️ Uptrace.dev - distributed traces, logs, and errors in one place Join Discord to ask questions. Documentation Reference Exam

null 12k Jul 20, 2021
Go client library for Pilosa

Go Client for Pilosa Go client for Pilosa high performance distributed index. What's New? See: CHANGELOG Requirements Go 1.12 and higher. Install Down

Pilosa 46 Jun 13, 2021
Redis client library for Go

go-redis go-redis is a Redis client library for the Go programming language. It's built on the skeleton of gomemcache. It is safe to use by multiple g

Alexandre Fiori 45 Jul 14, 2020