Machine is a zero dependency library for highly concurrent Go applications.

Overview

Machine GoDoc

concurrency

import "github.com/autom8ter/machine/v2"

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles:

  • In memory Publish Subscribe for asynchronously broadcasting & consuming messages in memory
  • Asynchronous worker groups similar to errgroup.Group
  • Throttled max active goroutine count
  • Asynchronous error handling(see WithErrorHandler to override default error handler)
  • Asynchronous cron jobs- Cron()

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent. Really, it can be used anywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

// Machine is an interface for highly asynchronous Go applications
type Machine interface {
	// Publish synchronously publishes the Message
	Publish(ctx context.Context, msg Message)
	// Subscribe synchronously subscribes to messages on a given channel,  executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.
	// Glob matching IS supported for subscribing to multiple channels at once.
	Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, opts ...SubscriptionOpt)
	// Go asynchronously executes the given Func
	Go(ctx context.Context, fn Func)
	// Cron asynchronously executes the given function on a timed interval UNTIL the context cancels OR false is returned by the CronFunc
	Cron(ctx context.Context, interval time.Duration, fn CronFunc)
	// Loop asynchronously executes the given function repeatedly UNTIL the context cancels OR false is returned by the LoopFunc
	Loop(ctx context.Context, fn LoopFunc)
	// Wait blocks until all active async functions(Loop, Go, Cron) exit
	Wait()
	// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptions
	Close()
}

Example

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  	defer cancel()
  	var (
  		m       = machine.New()
  		results []string
  		mu      sync.RWMutex
  	)
  	defer m.Close()
  
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "accounting.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "engineering.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "human_resources.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	<-time.After(1 * time.Second)
  	m.Publish(ctx, machine.Msg{
  		Channel: "human_resources.chat_room6",
  		Body:    "hello world human resources",
  	})
  	m.Publish(ctx, machine.Msg{
  		Channel: "accounting.chat_room2",
  		Body:    "hello world accounting",
  	})
  	m.Publish(ctx, machine.Msg{
  		Channel: "engineering.chat_room1",
  		Body:    "hello world engineering",
  	})
  	m.Wait()
  	sort.Strings(results)
  	for _, res := range results {
  		fmt.Print(res)
  	}
  	// Output:
  	//(accounting.chat_room2) received msg: hello world accounting
  	//(accounting.chat_room2) received msg: hello world accounting
  	//(engineering.chat_room1) received msg: hello world engineering
  	//(engineering.chat_room1) received msg: hello world engineering
  	//(human_resources.chat_room6) received msg: hello world human resources
  	//(human_resources.chat_room6) received msg: hello world human resources

Extended Examples

All examples are < 500 lines of code(excluding code generation)

You might also like...
 CDK - Zero Dependency Container Penetration Toolkit
CDK - Zero Dependency Container Penetration Toolkit

CDK is an open-sourced container penetration toolkit, offering stable exploitation in different slimmed containers without any OS dependency. It comes with penetration tools and many powerful PoCs/EXPs helps you to escape container and takeover K8s cluster easily.

alog is a dependency free, zero/minimum memory allocation JSON logger with extensions
alog is a dependency free, zero/minimum memory allocation JSON logger with extensions

Alog (c) 2020-2021 Gon Y Yi. https://gonyyi.com. MIT License Version 1.0.0 Intro Alog was built with a very simple goal in mind: Support Tagging (and

Web-based, zero-config, dependency-free database schema change and version control tool for teams
Web-based, zero-config, dependency-free database schema change and version control tool for teams

Live Demo • Install • Help • Development • Design Doc Bytebase is a web-based, zero-config, dependency-free database schema change and version control

Golang flags parser with zero dependency

flags Golang flags parser with zero dependency. Usage See simple.go for basic usage. Concept flags gives a simple way to get flag's value from argumen

Goworkers - Zero dependency Golang worker pool

Golang Worker Pool Zero dependency golang goroutines pool library. It is useful

wazero: the zero dependency WebAssembly runtime for Go developers

wazero: the zero dependency WebAssembly runtime for Go developers WebAssembly is a way to safely run code compiled in other languages. Runtimes execut

Consul is a distributed, highly available, and data center aware solution to connect and configure applications across dynamic, distributed infrastructure.

Consul Website: https://www.consul.io Tutorials: HashiCorp Learn Forum: Discuss Consul is a distributed, highly available, and data center aware solut

Go-Web-Dev - Golang helps the developer to develop highly scalable applications

Go-Web-Dev Golang helps the developer to develop highly scalable applications. T

An in-memory key:value store/cache (similar to Memcached) library for Go, suitable for single-machine applications.

go-cache go-cache is an in-memory key:value store/cache similar to memcached that is suitable for applications running on a single machine. Its major

An in-memory key:value store/cache (similar to Memcached) library for Go, suitable for single-machine applications.

go-cache go-cache is an in-memory key:value store/cache similar to memcached that is suitable for applications running on a single machine. Its major

An in-memory key:value store/cache (similar to Memcached) library for Go, suitable for single-machine applications.

go-cache go-cache is an in-memory key:value store/cache similar to memcached that is suitable for applications running on a single machine. Its major

Nada is a JS runtime, just like Nodejs. The difference is that Nada allows JS developers to easily achieve millions of concurrent applications.

Nada is a JS runtime, just like Nodejs. The difference is that Nada allows JS developers to easily achieve millions of concurrent applications. It also adds some new enhancements to THE JS syntax (types, interfaces, generics) that fundamentally address JS's perennial complaints.

Sparse matrix formats for linear algebra supporting scientific and machine learning applications

Sparse matrix formats Implementations of selected sparse matrix formats for linear algebra supporting scientific and machine learning applications. Co

Sparse matrix formats for linear algebra supporting scientific and machine learning applications

Sparse matrix formats Implementations of selected sparse matrix formats for linear algebra supporting scientific and machine learning applications. Co

Easy to use Raft library to make your app distributed, highly available and fault-tolerant
Easy to use Raft library to make your app distributed, highly available and fault-tolerant

An easy to use customizable library to make your Go application Distributed, Highly available, Fault Tolerant etc... using Hashicorp's Raft library wh

SigNoz helps developer monitor applications and troubleshoot problems in their deployed applications
SigNoz helps developer monitor applications and troubleshoot problems in their deployed applications

SigNoz helps developers monitor their applications & troubleshoot problems, an open-source alternative to DataDog, NewRelic, etc. 🔥 🖥

An simple, easily extensible and concurrent health-check library for Go services
An simple, easily extensible and concurrent health-check library for Go services

Healthcheck A simple and extensible RESTful Healthcheck API implementation for Go services. Health provides an http.Handlefunc for use as a healthchec

Ristretto is a fast, concurrent cache library built with a focus on performance and correctness.
concurrent caching proxy and decoder library for collections of PMTiles

go-pmtiles A caching proxy for the serverless PMTiles archive format. Resolves several of the limitations of PMTiles by running a minimalistic, single

Comments
  • How to stop subscriber goroutine when publisher goroutine is canceled ?

    How to stop subscriber goroutine when publisher goroutine is canceled ?

    Hello 👋,

    Firstly, I would like to thank @autom8ter for bringing out the Machine. This package is super helpful in building highly concurrent Go applications.

    In the below code snippet, how can I close subscribers when all the messages are consumed? Currently, the subscriber goroutine is hanging indefinitely.

    numRange := []int{1,2,3,4,5,6,7,8,9}
    
    m := machine.New(context.Background(),
    		// functions are added to a FIFO channel that will block when active routines == max routines.
    		machine.WithMaxRoutines(d.MaxConcurrentSqlExecutions),
    		// every function executed by machine.Go will recover from panics
    		machine.WithMiddlewares(machine.PanicRecover()),
    	)
    	defer m.Close()
    
    	channelName := "dedupe"
    
    	// start a goroutine that subscribes to all messages sent to the target channel
    	m.Go(func(routine machine.Routine) {
    		for {
    			select {
    			case <-routine.Context().Done():
    				return
    			default:
    				err := routine.Subscribe(channelName, func(obj interface{}) {
    					log.Infof("%v | subscription msg received! channel = %v msg = %v stats = %s\n",
    						routine.PID(), channelName, obj, m.Stats().String())
    				})
    				if err != nil {
    					log.Error("failed to start goroutines to consume jobs ", err)
    					routine.Cancel()
    					return
    				}
    			}
    		}
    	},
    		machine.GoWithTags("subscribe"))
    
    	// start another goroutine that publishes to the target channel every second
    	m.Go(func(routine machine.Routine) {
    		defer routine.Cancel()
    		log.Infof("%v | streaming msg to channel = %v stats = %s\n", routine.PID(), channelName, routine.Machine().Stats().String())
    		// publish message to channel
    		for _, interval := range numRange {
    			err := routine.Publish(channelName, interval)
    			if err != nil {
    				log.Error("failed to start goroutines to consume jobs ", err)
    			}
    		}
    	},
    		machine.GoWithTags("publish"),
    	)
    
    	m.Wait()
    
    

    Thank you!

    opened by vikramarsid 1
Owner
Coleman Word
Full stack(Go/Typescript) + Devops engineer
Coleman Word
A dead simple, highly performant, highly customizable sessions middleware for go http servers.

If you're interested in jwt's, see my jwt library! Sessions A dead simple, highly performant, highly customizable sessions service for go http servers

Adam Hanna 68 Nov 11, 2022
BlobStore is a highly reliable,highly available and ultra-large scale distributed storage system

BlobStore Overview Documents Build BlobStore Deploy BlobStore Manage BlobStore License Overview BlobStore is a highly reliable,highly available and ul

CubeFS 14 Oct 10, 2022
Highly concurrent drop-in replacement for bufio.Writer

concurrent-writer Highly concurrent drop-in replacement for bufio.Writer. concurrent.Writer implements highly concurrent buffering for an io.Writer ob

Alin Sinpalean 47 Nov 20, 2022
This is an concurrent-queue and concurrent-stack lib for the go.

This is an concurrent-queue and concurrent-stack lib for the go. Getting Started Pull in the dependency go get github.com/boobusy/vector Add the impor

白衣画甲 0 Jan 2, 2022
Simple, zero-dependency scheduling library for Go

go-quartz Simple, zero-dependency scheduling library for Go. About Inspired by the Quartz Java scheduler. Library building blocks Job interface. Any t

Eugene R. 1.1k Nov 28, 2022
MNA - stands for mobile number assignment - a small zero external dependency golang library that is used to identify mobile number assignment in tanzania

MNA - stands for mobile number assignment - a small zero external dependency golang library that is used to identify mobile number assignment in tanzania

TECHCRAFT TECHNOLOGIES LIMITED 8 Nov 29, 2021
Scheduler - Scheduler package is a zero-dependency scheduling library for Go

Scheduler Scheduler package is a zero-dependency scheduling library for Go Insta

Javad Rajabzade 4 Jan 14, 2022
A zero-dependency cache library for storing data in memory with generics.

Memory Cache A zero-dependency cache library for storing data in memory with generics. Requirements Golang 1.18+ Installation go get -u github.com/rod

Rodrigo Brito 11 May 26, 2022
A powerful zero-dependency json logger.

ZKits Logger Library About This package is a library of ZKits project. This is a zero-dependency standard JSON log library that supports structured JS

Qingshan Luo 23 Oct 8, 2022
A zero dependency asset embedder for Go

Mewn A zero dependency asset embedder for Go. About Mewn is perhaps the easiest way to embed assets in a Go program. Here is an example: package main

Lea Anthony 86 Oct 23, 2022