Reflex stream client for redis streams

Overview

rredis

A reflex stream client for a redis streams using the radix client implementation.

It provides an API for inserting data into a stream and for consuming data from a stream with at-least-once semantics. It also provides a reflex.CursorStore implementation for storing cursors in redis.

Usage

// Define your consumer business logic
fn := func(ctx context.Context, fate fate.Fate, e *reflex.Event) error {
  fmt.Print("Consuming redis stream event", e)
  return fate.Tempt() // Fate injects application errors at runtime, enforcing idempotent logic.
}

// Define some more variables
var namespace, streamName, redisAddr, consumerName string
var ctx context.Context  

// Connect to redis
con, _ := radix.Dial(ctx, "tcp", redisAddr)

// Setup rredis and reflex
stream := rredis.NewStream(con, namespace, stream)
cstore := rredis.NewCursorStore(con, namespace)

consumer := reflex.NewConsumer(consumerName, fn)
spec := reflex.NewSpec(stream.Stream, cstore, consumer)

// Insert some data concurrently
go func() {
  for {
    _ = stream.Insert(ctx, []byte(fmt.Sprint(time.Now())))
    time.Sleep(time.Second)
  }
}()

// Stream forever!
// Progress is stored in the cursor store, so restarts or any error continue where it left off.
for {
  err := reflex.Run(context.Backend(), spec)
  if err != nil { // Note Run always returns non-nil error
    log.Printf("stream error: %v", err)
  }
}

Notes

  • Since reflex events have specific fields (type, foreignID, timestamp, data) the redis stream entries need to adhere to a specific format. It is therefore advised to use Stream.Insert to ensure the correct format.
  • At-least-once semantics are also provided by redis consumer groups, but it doesn't provide strict ordering. rredis maintains strict ordering and can provide sharding using reflex/rpatterns.Parralel.
wire protocol for multiplexing connections or streams into a single connection, based on a subset of the SSH Connection Protocol

qmux qmux is a wire protocol for multiplexing connections or streams into a single connection. It is based on the SSH Connection Protocol, which is th

Jeff Lindsay 189 Jun 24, 2022
Walrus 🕑 Real-time event streaming platform built on top of gRPC streams

Walrus ?? Real-time event streaming platform built on top of gRPC streams Table of Contents About the project Built With How it works Getting Started

Matheus Mosca 13 Feb 14, 2022
Eventproc - A proof-of-concept for exploring event based architecture utilizing bi-directional gRPC streams

eventproc A proof-of-concept for exploring event based architecture utilizing bi

null 0 Jan 25, 2022
A library to simplify writing applications using TCP sockets to stream protobuff messages

BuffStreams Streaming Protocol Buffers messages over TCP in Golang What is BuffStreams? BuffStreams is a set of abstraction over TCPConns for streamin

Sean Kelly 249 Aug 1, 2022
A toy project to stream from a Remarkable2

goMarkableStream I use this toy project to stream my remarkable 2 (firmware 2.5) on my laptop using the local wifi. video/demo here Quick start You ne

Olivier Wulveryck 112 Aug 11, 2022
A lightweight stream processing library for Go

go-streams A lightweight stream processing library for Go. go-streams provides a simple and concise DSL to build data pipelines. Wiki In computing, a

Eugene R. 1k Aug 9, 2022
V3IO Frames ("Frames") is a Golang based remote data frames access (over gRPC or HTTP stream)

V3IO Frames ("Frames") is a multi-model open-source data-access library that provides a unified high-performance DataFrame API for working with different types of data sources (backends). The library was developed by Iguazio to simplify working with data in the Iguazio Data Science Platform ("the platform"), but it can be extended to support additional backend types.

null 23 Mar 3, 2022
Stream Camera based on TCP

streamera Term Project of Computer Networking streamera is a Stream Camera based on TCP, which contains client mode and server mode. Features Client M

null 7 Jul 25, 2022
Stream processing stuff for Go

GoStream Type safe Stream processing library inspired in the Java Streams API. Table of contents Requirements Usage examples Limitations Performance C

Mario Macias 50 Aug 8, 2022
Totem - A Go library that can turn a single gRPC stream into bidirectional unary gRPC servers

Totem is a Go library that can turn a single gRPC stream into bidirectional unar

Joe Kralicky 2 Jan 10, 2022
A simple Go server that broadcasts any data/stream

broadcast A simple Go server that broadcasts any data/stream usage data You can POST data. curl -X POST --data-binary "@111.png" localhost:9222/test.p

Zack 11 Aug 6, 2022
Reads MAWS formatted data and converts it into JSON output stream.

maws2json Usage examples Over serial line (stdin pipe) Lets assume that Vaisala weather station is connected via RS232 to USB serial dongle in /dev/tt

Sääsivu 0 Feb 6, 2022
Reads JSON object (stream) from file/stdin and routes it/them to GCP Pub/Sub topics.

json2pubsub Publish JSON object (stream) into GCP Pub/Sub topic based on a field value. Usage: json2pubsub --project=STRING <mapping> ... Reads JSON

Sääsivu 1 Feb 16, 2022
Broadcast-server - A simple Go server that broadcasts any data/stream

broadcast A simple Go server that broadcasts any data/stream usage data You can

Zack 53 Jul 29, 2022
A go module supply Java-Like generic stream programming (while do type check at runtime)

gostream A go module supplying Java-Like generic stream programming (while do type check at runtime) Using Get a Stream To get a Stream, using SliceSt

Sad_Dog 0 Jan 16, 2022
Library for directly interacting and controlling an Elgato Stream Deck on Linux.

Stream Deck Library for directly interacting and controlling an Elgato Stream Deck on Linux. This library is designed to take exclusive control over a

Matthew Penner 3 Jan 23, 2022
Rabbitio - Rabbit stream cipher package RFC 4503 for Go

rabbitio rabbitio is a rabbit stream cipher packge based on RFC 4503 for golang

Sina Ghaderi 7 Feb 8, 2022
Sstreamcry - Shadowsocks stream bomb

ShadowStreamCry A Shadowsocks stream bomb. Credits DuckSoft Qv2ray/rc4md5cry v2f

秋のかえで 7 Feb 24, 2022
Moviefetch: a simple program to search and download for movies from websites like 1337x and then stream them

MovieFetch Disclaimer I am NOT responisble for any legal issues or other you enc

Hashm 2 May 12, 2022