A stream processing API for Go (alpha)

Overview

automi

A data stream processing API for Go (alpha)


GoDoc Build Status

Automi is an API for processing streams of data using idiomatic Go. Using Automi, programs can process streaming of data chunks by composing stages of operations that are applied to each element of the stream.

Concept

Automi streaming concepts


The Automi API expresses a stream with four primitives including:

  • An emitter: an in-memory, network, or file resource that can emit elements for streaming
  • The stream: represents a conduit whithin which data elements are streamed
  • Stream operations: code which can be attached to the stream to process streamed elements
  • A collector: an in-memory, network, or file resource that can collect streamed data.

Automi streams use Go channels internally to route data. This means Automi streams automatically support features such as buffering, automatic back-pressure queuing, and concurrency safety.

Using Automi

Now, let us explore some examples to see how easy it is to use Automi to stream and process data.

See all examples in the ./example directory.

Example: streaming from a slice into stdout

This first example shows how easy it is to compose and express stream operations with Automi. In this example, rune values are emitted from a slice and are streamed invidividually. Stream operator method Filter is applied to filter out unwanted rune values and the Sort operator method sorts the remaining items. Lastly, a collector is used to collect the result into an io.Writer and piped to stdout.

func main() {
	strm := stream.New([]rune("B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw"))

	strm.Filter(func(item rune) bool {
		return item >= 65 && item < (65+26)
	}).Map(func(item rune) string {
		return string(item) 
	}).Batch().Sort() 
	strm.Into(collectors.Writer(os.Stdout))

	if err := <-strm.Open(); err != nil {
		fmt.Println(err)
		return
	}
}

See the full source code.

How it works
  1. Create the stream with an emitter source. Automi supports several types of sources including channels, io.Reader, slices, etc. (see list of emitters below). Each element in the slice will be streamed individually.
strm := stream.New([]rune(`B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw`))
  1. Apply user-provided or built-in stream operations as shown below:
strm.Filter(func(item rune) bool {
    return item >= 65 && item < (65+26)
}).Map(func(item rune) string {
    return string(item)
}).Batch().Sort()
  1. Collect the result. In this example, the result is collected into an io.Writer which further streams the data into standard output:
strm.Into(collectors.Writer(os.Stdout))
  1. Lastly, open the stream once it is properly composed:
if err := <-strm.Open(); err != nil {
    fmt.Println(err)
    return
}  

Example: streaming from an io.Reader into collector function

The next example shows how to use Automi to stream data from an io.Reader emitting buffered string values from an in-memory source in 50-byte chunks. The data is processed with a Map and Filter opertor methods and the result is sent to a user-provided collector function which prints the result.

func main() {
	data := `"request", "/i/a", "00:11:51:AA", "accepted"
"response", "/i/a/", "00:11:51:AA", "served"
"response", "/i/a", "00:BB:22:DD", "served"...`

 	reader := strings.NewReader(data)
    
	// create stream from a buffered io.Reader emitter,
	// emitting 50-byte chunks.
	stream := stream.New(emitters.Reader(reader).BufferSize(50))
	stream.Map(func(chunk []byte) string {
		str := string(chunk)
		return str
	})
	stream.Filter(func(e string) bool {
		return (strings.Contains(e, `"response"`))
	})
	stream.Into(collectors.Func(func(data interface{}) error {
		e := data.(string)
		fmt.Println(e)
		return nil
	}))

	if err := <-stream.Open(); err != nil {
		fmt.Println(err)
		return
	}
}

See complete example here.

Example: streaming using CSV files

The following example streams data from a CSV source file. Each row is mapped to a custom type, filtered, then mapped to a slice of strings which is then collected into another CSV file.

type scientist struct {
	FirstName string
	LastName  string
	Title     string
	BornYear  int
}

func main() {
    // creates a stream using a CSV emitter
    // emits each row as []string
    stream := stream.New("./data.txt")

    // Map each CSV row, []string, to type scientist
    stream.Map(func(cs []string) scientist {
        yr, _ := strconv.Atoi(cs[3])
        return scientist{
            FirstName: cs[1],
            LastName:  cs[0],
            Title:     cs[2],
            BornYear:  yr,
        }
    })
    stream.Filter(func(cs scientist) bool {
        return (cs.BornYear > 1930)
    })
    stream.Map(func(cs scientist) []string {
        return []string{cs.FirstName, cs.LastName, cs.Title}
    })
    stream.Into("./result.txt")

    // open the stream
    if err := <-stream.Open(); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    fmt.Println("wrote result to file result.txt")
}

See complete example here.

Example: streaming HTTP requests and responses

The following example shows how to use Automi to stream and process data using HTTP requests and responses. The following HTTP server program streams data from the request Body, encodes it using base64, and streams the result into the HTTP response:

func main() {

	http.HandleFunc(
		"/",
		func(resp http.ResponseWriter, req *http.Request) {
			resp.Header().Add("Content-Type", "text/html")
			resp.WriteHeader(http.StatusOK)

			strm := stream.New(req.Body)
			strm.Process(func(data []byte) string {
				return base64.StdEncoding.EncodeToString(data)
			}).Into(resp)

			if err := <-strm.Open(); err != nil {
				resp.WriteHeader(http.StatusInternalServerError)
				log.Printf("Stream error: %s", err)
			}
		},
	)

	log.Println("Server listening on :4040")
	http.ListenAndServe(":4040", nil)
}

See complete example here.

Streaming gRPC service payload

The following example shows how to use Automi to stream data items from a gRPC streaming sevice. The following gRPC client setups an Automi emitter to emit time values that are streamed from a gRPC time service:

// setup an Automi emitter function to stream from the gRPC service
func emitStreamFrom(client pb.TimeServiceClient) <-chan []byte {
	source := make(chan []byte)
	timeStream, err := client.GetTimeStream(context.Background(), &pb.TimeRequest{Interval: 3000})
	...
	go func(stream pb.TimeService_GetTimeStreamClient, srcCh chan []byte) {
		defer close(srcCh)
		for {
			t, err := stream.Recv()
			srcCh <- t.Value
		}
	}(timeStream, source)

	return source
}

func main() {
	...
	client := pb.NewTimeServiceClient(conn)
	// create automi stream
	stream := stream.New(emitStreamFrom(client))
	stream.Map(func(item []byte) time.Time {
		secs := int64(binary.BigEndian.Uint64(item))
		return time.Unix(int64(secs), 0)
	})
	stream.Into(collectors.Func(func(item interface{}) error {
		time := item.(time.Time)
		fmt.Println(time)
		return nil
	}))

	// open the stream
	if err := <-stream.Open(); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

}

See complete example here.

More Examples

Examples - View a long list of examples that cover all aspects of using Automi.

Automi components

Automi comes with a set of built-in components to get you started with stream processing including the followings.

Emitters

  • Channel
  • CSV
  • io.Reader
  • io.Scanner
  • Slice

Operators

  • Stream.Filter
  • Stream.Map
  • Stream.FlatMap
  • Stream.Reduce
  • Stream.GroupByKey
  • Stream.GroupByName
  • Stream.GroupByPos
  • Stream.Sort
  • Stream.SortByKey
  • Stream.SortByName
  • Stream.SortByPos
  • Stream.SortWith
  • Stream.Sum
  • Stream.SumByKey
  • Stream.SumByName
  • Stream.SumByPos
  • Stream.SumAllKeys

Collectors

  • CSV
  • Func
  • Null
  • Slice
  • Writer

Licence

Apache 2.0

Comments
  • nifty overlap!

    nifty overlap!

    hi! just wanted to drop a line to note that i'd written something similar to this a while back - https://github.com/sdboyer/transducers-go

    i ended up abandoning it, and i suspect that you probably have all the patterns explored in there present in here. but, given the similarities, i figured i'd just drop a link 😸

    opened by sdboyer 4
  • Dodo bird dead ?

    Dodo bird dead ?

    This looks very nice. The abstracts are really useful as well as the plugin model.

    So did this die due to technical dead end or just other things tempting you ?

    opened by joeblew99 3
  • CSV example

    CSV example

    This looks really useful. I have a project at the moment where I need to pull a few CSV files over http, filter out some crap rows, map the data into my structs, and lump them into the dB.

    Basically ETL stuff that your example in the read me dies.

    Is the example and the data that drives it anywhere ?

    Thanks again for this ....

    opened by joeblew99 3
  • stream: fix errors found by vet

    stream: fix errors found by vet

    This pull request was made with an automated tool.

    The suggested change fixes one or more problems discovered by "go vet".

    See https://github.com/functionary/functionary for more details on the @functionary GitHub user.

    opened by functionary 1
  • Reactivex

    Reactivex

    Is the plan to implement some/most of the functions from Reactivex? Will you add support to emit multiple items such as Storm does with tuples? How will you handle errors?

    opened by isaldana 1
  • Stream.GroupBy Tasks

    Stream.GroupBy Tasks

    Add/complete more functionalities for GroupBy

    • [x] Add GroupByInt tests with more scenarios
    • [x] Add support for GroupBy(field-name) / tests
    • [ ] Add support for GroupBy(function{})
    • [x] More tests for additional enhancements
    enhancement Task 
    opened by vladimirvivien 1
  • Implement Auxiliary Plan

    Implement Auxiliary Plan

    AuxPlan is to process auxChan stream with a full plan instead of a simple endpoint. This would allow users to create a flow around auxiliary streams.

    enhancement 
    opened by vladimirvivien 1
  • Big Refactor, Big Redesign

    Big Refactor, Big Redesign

    This is a major redesign of the project.

    • Simpler constructs and API
    • Focused on Go idioms and data types
    • Provides more features, components
    • Designed to be a data streaming API, not tool
    • Etc

    Hope it works for you!

    opened by vladimirvivien 0
  • Remove all dependencies

    Remove all dependencies

    Reduce the dependency surface of core API to zero. The plan is to keep the core API super lightweight. Sources and Sinks are to be placed in their own repos.

    opened by vladimirvivien 0
  • Fix function comments based on best practices from Effective Go

    Fix function comments based on best practices from Effective Go

    Every exported function in a program should have a doc comment. The first sentence should be a summary that starts with the name being declared. From effective go.

    I generated this with CodeLingo and I'm keen to get some feedback, but this is automated so feel free to close it and just say "opt out" to opt out of future CodeLingo outreach PRs.

    opened by Daanikus 1
Releases(v0.1.0-alpha.0)
Owner
Vladimir Vivien
Software Engineer, VMware
Vladimir Vivien
Stream data into Google BigQuery concurrently using InsertAll() or BQ Storage.

bqwriter A Go package to write data into Google BigQuery concurrently with a high throughput. By default the InsertAll() API is used (REST API under t

null 10 Dec 16, 2022
DEPRECATED: Data collection and processing made easy.

This project is deprecated. Please see this email for more details. Heka Data Acquisition and Processing Made Easy Heka is a tool for collecting and c

Mozilla Services 3.4k Nov 30, 2022
Open source framework for processing, monitoring, and alerting on time series data

Kapacitor Open source framework for processing, monitoring, and alerting on time series data Installation Kapacitor has two binaries: kapacitor – a CL

InfluxData 2.2k Dec 24, 2022
Baker is a high performance, composable and extendable data-processing pipeline for the big data era

Baker is a high performance, composable and extendable data-processing pipeline for the big data era. It shines at converting, processing, extracting or storing records (structured data), applying whatever transformation between input and output through easy-to-write filters.

AdRoll 153 Dec 14, 2022
Basic Crud operation api's in golang

Basic Crud operation api's in golang

null 0 Nov 9, 2021
ant (alpha) is a web crawler for Go.

The package includes functions that can scan data from the page into your structs or slice of structs, this allows you to reduce the noise and complexity in your source-code.

Amir Abushareb 265 Dec 30, 2022
A Hackathon project created by Alpha Interface team for Agri-D Food Hack

Alpha Interface A Hackathon project created by Alpha Interface team for Agri-D Food Hack Installation Downloading Wasp and wasp-cli https://wiki.iota.

Jirawat Boonkumnerd 3 Oct 16, 2022
Go Collection Stream API, inspired in Java 8 Stream.

GoStream gostream 是一个数据流式处理库。它可以声明式地对数据进行转换、过滤、排序、分组、收集,而无需关心操作细节。 Changelog 2021-11-18 add ToSet() collector Roadmap 移除go-linq依赖 Get GoStream go get

null 2 Nov 21, 2022
Go Stream, like Java 8 Stream.

Go Stream, like Java 8 Stream.

Youth.霖 70 Dec 1, 2022
A lightweight stream processing library for Go

go-streams A lightweight stream processing library for Go. go-streams provides a simple and concise DSL to build data pipelines. Wiki In computing, a

Eugene R. 1.2k Dec 31, 2022
Build platforms that flexibly mix SQL, batch, and stream processing paradigms

Overview Gazette makes it easy to build platforms that flexibly mix SQL, batch, and millisecond-latency streaming processing paradigms. It enables tea

Gazette 455 Dec 12, 2022
💥 Fusion is a tiny stream processing library written in Go.

?? Fusion Fusion is a tiny stream processing library written in Go. See reactor for a stream processing tool built using fusion. Features Simple & lig

Shivaprasad Bhat 17 Jun 30, 2021
Processing Nomad Events Stream

Nomad Events Sink Nomad Events Sink is an events collection agent which uses Nomad Events SDK to fetch events. Events can help debug the cluster state

Karan Sharma 31 Dec 19, 2022
Stream processing stuff for Go

GoStream Type safe Stream processing library inspired in the Java Streams API. Table of contents Requirements Usage examples Limitations Performance C

Mario Macias 97 Dec 26, 2022
💾 Wolke API is the API behind Wolke image storage and processing aswell as user management

?? Wolke API Wolke API is the API behind Wolke image storage and processing aswell as user management Deploying To deploy Wolke Bot you'll need podman

wolke.casa 1 Dec 21, 2021
Triangula-api-server - API server for processing images with Triangula

Triangula API server Minimalistic API server that calculates and serves artistic

Maik Schneider 0 Jan 10, 2022
🔥 Golang live stream lib/client/server. support RTMP/RTSP/HLS/HTTP[S]-FLV/HTTP-TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache. 官方文档见 https://pengrl.com/lal

lal是一个开源GoLang直播流媒体网络传输项目,包含三个主要组成部分: lalserver:流媒体转发服务器。类似于nginx-rtmp-module等应用,但支持更多的协议,提供更丰富的功能。lalserver简介 demo:一些小应用,比如推、拉流客户端,压测工具,流分析工具,调度示例程序等

yoko 1.8k Jan 1, 2023
Stream API for Go.

mapreduce English | 简体中文 Why we have this repo mapreduce is part of go-zero, but a few people asked if mapreduce can be used separately. But I recomme

Kevin Wan 81 Dec 28, 2022
Sentiment Analysis Pipeline + API written in Golang (currently processing Twitter tweets).

Go Sentiment Analysis Components Config: config module based in JSON (enter twitter credentials for use) Controllers: handle the API db call/logic for

Joseph Moussa 1 Mar 22, 2022
Sentiment Analysis Pipeline + API written in Golang (currently processing Twitter tweets).

Go Sentiment Analysis Components Config: config module based in JSON (enter twitter credentials for use) Controllers: handle the API db call/logic for

Joseph Moussa 1 Mar 22, 2022