Declarative streaming ETL for mundane tasks, written in Go

Overview

Benthos

godoc for Jeffail/benthos goreportcard for Jeffail/benthos Build Status

Benthos is a high performance and resilient stream processor, able to connect various sources and sinks in a range of brokering patterns and perform hydration, enrichments, transformations and filters on payloads.

It comes with a powerful mapping language, is easy to deploy and monitor, and ready to drop into your pipeline either as a static binary, docker image, or serverless function, making it cloud native as heck.

Benthos is fully declarative, with stream pipelines defined in a single config file, allowing you to specify connectors and a list of processing stages:

input:
  gcp_pubsub:
    project: foo
    subscription: bar

pipeline:
  processors:
    - bloblang: |
        root.message = this
        root.meta.link_count = this.links.length()
        root.user.age = this.user.age.number()

output:
  redis_streams:
    url: tcp://TODO:6379
    stream: baz
    max_in_flight: 20

Delivery Guarantees

Yep, we got 'em. Benthos implements transaction based resiliency with back pressure. When connecting to at-least-once sources and sinks it guarantees at-least-once delivery without needing to persist messages during transit.

Supported Sources & Sinks

AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (pub/sub), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), SQL (MySQL, PostgreSQL, Clickhouse), Stdin/Stdout, TCP & UDP, sockets and ZMQ4.

Connectors are being added constantly, if something you want is missing then open an issue.

Documentation

If you want to dive fully into Benthos then don't waste your time in this dump, check out the documentation site.

For guidance on how to configure more advanced stream processing concepts such as stream joins, enrichment workflows, etc, check out the cookbooks section.

For guidance on building your own custom plugins check out this example repo.

Install

Grab a binary for your OS from here. Or use this script:

curl -Lsf https://sh.benthos.dev | bash

Or pull the docker image:

docker pull jeffail/benthos

Benthos can also be installed via Homebrew:

brew install benthos

For more information check out the getting started guide.

Run

benthos -c ./config.yaml

Or, with docker:

# Send HTTP /POST data to Kafka:
docker run --rm \
	-e "INPUT_TYPE=http_server" \
	-e "OUTPUT_TYPE=kafka" \
	-e "OUTPUT_KAFKA_ADDRESSES=kafka-server:9092" \
	-e "OUTPUT_KAFKA_TOPIC=benthos_topic" \
	-p 4195:4195 \
	jeffail/benthos

# Using your own config file:
docker run --rm -v /path/to/your/config.yaml:/benthos.yaml jeffail/benthos

Monitoring

Health Checks

Benthos serves two HTTP endpoints for health checks:

  • /ping can be used as a liveness probe as it always returns a 200.
  • /ready can be used as a readiness probe as it serves a 200 only when both the input and output are connected, otherwise a 503 is returned.

Metrics

Benthos exposes lots of metrics either to Statsd, Prometheus or for debugging purposes an HTTP endpoint that returns a JSON formatted object. The target can be specified via config.

Tracing

Benthos also emits opentracing events to a tracer of your choice (currently only Jaeger is supported) which can be used to visualise the processors within a pipeline.

Configuration

Benthos provides lots of tools for making configuration discovery, debugging and organisation easy. You can read about them here.

Environment Variables

It is possible to select fields inside a configuration file to be set via environment variables. The docker image, for example, is built with a config file where all common fields can be set this way.

Build

Build with Go (1.15 or later):

git clone [email protected]:Jeffail/benthos
cd benthos
make

Lint

Benthos uses golangci-lint for linting, which you can install with:

curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.35.2

And then run it with make lint.

Plugins

It's pretty easy to write your own custom plugins for Benthos, take a look at this repo for examples and build instructions.

Docker Builds

There's a multi-stage Dockerfile for creating a Benthos docker image which results in a minimal image from scratch. You can build it with:

make docker

Then use the image:

docker run --rm \
	-v /path/to/your/benthos.yaml:/config.yaml \
	-v /tmp/data:/data \
	-p 4195:4195 \
	benthos -c /config.yaml

There are a few examples here that show you some ways of setting up Benthos containers using docker-compose.

ZMQ4 Support

Benthos supports ZMQ4 for both data input and output. To add this you need to install libzmq4 and use the compile time flag when building Benthos:

make TAGS=ZMQ4

Or to build a docker image using CGO, which includes ZMQ:

make docker-cgo

Contributing

Contributions are welcome, please read the guidelines, come and chat (links are on the community page), and watch your back.

Issues
  • Roadmap planning - USE CASES WANTED

    Roadmap planning - USE CASES WANTED

    TLDR: Please share your use cases here or send them privately to [email protected], we will use these to help steer the roadmap towards Benthos version 4.

    Hey everyone,

    Benthos has been at major version 3 for over a year now, and I consider it to be a pretty cool achievement that given all the dope features added we've managed to keep both the Benthos config spec and APIs fully backwards compatible.

    However, eventually it would be nice to cut a new major release and prune all of the dead weight that has accumulated during this time. In order to do that I want to be sure that we've fully thought through any potential breaking changes that could be bundled along with it, and that requires a roadmap.

    Therefore I'm hoping to hear from you on how you use Benthos, what works well for you and what doesn't, and any features you'd hope to see eventually that would make your lives easier.

    Some features that might come into play are:

    • Improved plugin APIs (need more use cases to help shape this)
    • HTTP poll APIs for the dynamic components as well as streams mode, allowing Benthos to pull configs
    • Configuration file reloading
    • Expanded logging, metrics and tracing options

    Please let us know how you use (or intend to use) Benthos, either by posting here or getting in touch privately at [email protected] You can also find us on the discord channel at https://discord.gg/6VaWjzP, more chat options can be found on the community page.

    Once planning starts I'm going to be looking into organising public roadmap planning sessions.

    PSA v4 feedback wanted 
    opened by Jeffail 28
  • Tell me what annoys you

    Tell me what annoys you

    If you're using Benthos and find something about it frustrating, but not worthy of raising an issue, please give me a brief description here. It could be anything: features, documentation, performance, me.

    Also, if the thing that annoys you is already posted then give it a thumbs up.

    question 
    opened by Jeffail 26
  • RFC: Benthos Lambda changes

    RFC: Benthos Lambda changes

    I've been tinkering with the Lambda binary and had a few things I wanted to run by folks and get thoughts on. I'm happy to split this into multiple issues for multiple threads if we need it. To demo things, I've put together https://github.com/kconwayatlassian/benthos-ext that implements each of the ideas below:

    • Ability to both produce to one or more outputs in addition to returning the processed message. This enables us to use any combination of outputs and brokers but still return the message as the Lambda response.

    • Offering the Lambda function as a code export instead of only a binary for custom builds outside of Benthos and code reuse in things that adapt Lambda to other runtimes.

    • Adding some test coverage of the Lambda functionality.

    I'm looking for feedback on 1) if any of the ideas have value beyond my team that would justify having them in Benthos and 2) feedback on whether the demo code would be acceptable as a Benthos contribution or how I'd need to refactor in order to meet the project standards/style.

    enhancement serverless 
    opened by kconwayatlassian 24
  • Output `gcp_pubsub` attributes

    Output `gcp_pubsub` attributes

    Is it possible to promote properties of a JSON input to attributes in gcp_pubsub?

    https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

    I would like to promote things like event timestamps to attributes.time.

    It looks like the gcp_pubsub output puts the data into Message Body on the GCP side.

    Screen Shot 2020-02-14 at 11 10 44 AM

    We are using the attributes to sort the data in Cloud Dataflow:

    read = PubsubIO
        .readMessagesWithAttributes()
        .withTimestampAttribute("time")
        .fromTopic(defaultOptions.getTopic());
    

    EDIT: In the pubsub GO lib the attributes would go in the attributes field of the message struct here: https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.2.0/pubsub/message.go#L36

    You could likely pull off fields from the JSON object using jmespath and store them in an additional input to the gcp_pubsub output.

    Likely this would require code updates. But I wanted to confirm it was not possible via some other means and whether the parsing of attributes is possible.

    Thanks!

    documentation 
    opened by gregorskii 19
  • AMQP 1.0 (beta): amqp:link:detach-forced

    AMQP 1.0 (beta): amqp:link:detach-forced

    Using Service Bus from Azure (which uses AMQP 1.0)

    Messages are received correctly, but after I while I get this message:

    Failed to read message: link detached, reason: *Error{Condition: amqp:link:detach-forced, Description: The link 'G19:151274741:MB9xjRjuCfMcXqGvOKbKyGeG9fJypgtW-sjO4ErGrubNtxbFpGcCxx' is force detached. Code: consumer(link11331556). Details: InnerMessageReceiver was closed. TrackingId:e33664c70000930b00ace7e45f28340d_G19_B11, SystemTracker:queue-name-dev:Queue:queue-name, Timestamp:2020-08-03T16:01:10, Info: map[]}
    

    I will analyze it more tomorrow and update the ticket if I find anything helpful

    My config:

    input:
      broker:
        inputs:
        - amqp_1:
            url: "myqueueurl"
            source_address: "my-queue-name"
        batching:
          count: 180
          period: "50s"
    // processors etc.....
    
    bug 
    opened by ianaz 18
  • Can't output from kinesis-balanced or kinesis

    Can't output from kinesis-balanced or kinesis

    I have the following config and benthos 3.11.0:

    input: kinesis_balanced: stream: what-a-great-stream start_from_oldest: false dynamodb_table: table-for-benthos dynamodb_billing_mode: PAY_PER_REQUEST region: us-east-1 credentials: profile: what-a-great-profile output: file: path: outfile.txt delimiter: ""

    Using the same AWS profile, on us-east-1, I'm able to put records on this stream, get records from it, and when I scan dynamodb table-for-benthos I see leases are being updated by benthos as long as it's running. The file, outfile.txt is getting created, and if I swap the input to stdin it works.

    I'm not getting any errors, and I'm not getting any output from kinesis.

    I'm so close to living the dream, I can taste it.

    bug 
    opened by avdempsey 18
  • Add MSSQL driver

    Add MSSQL driver

    Fixes #797.

    I tested the output like so (might be worth adding some integration tests for this too, since only Postgres is covered under lib/test/integration:

    localhost> docker run --rm -it -e ACCEPT_EULA=Y -e SA_PASSWORD=ins4n3lyStrongP4ssword -p1433:1433 microsoft/mssql-server-linux
    
    container> /opt/mssql-tools/bin/sqlcmd -l 30 -S localhost -h-1 -V1 -U sa -P ins4n3lyStrongP4ssword -Q "create table footable(data varchar(50) not null);"
    
    input:
      stdin:
        codec: lines
    output:
      sql:
        driver: mssql
        data_source_name: "sqlserver://sa:[email protected]:1433?database=master"
        query: "insert into footable(data) values(?);"
        args:
          - ${! json("data") }
    
    localhost> ./target/bin/benthos -c sql.yaml
    {"data":"foo"}
    
    container> /opt/mssql-tools/bin/sqlcmd -l 30 -S localhost -h-1 -V1 -U sa -P ins4n3lyStrongP4ssword -Q "select * from footable;"
    foo
    
    (1 rows affected)
    
    opened by mihaitodor 17
  • adding syslog processor

    adding syslog processor

    Hey, @Jeffail , I have a little something for benthos.

    I used github.com/influxdata/go-syslog to parse a message and I decided to implement just rfc5424 support. Shouldn't be a real problem to include another rfc under processor's wing but it is quite good for a starter. I am diabolically sad that rfc3164 (one of the popular) can't be parsed because the library hasn't implemented it yet, but then again I can't hold a grudge for longer than a few minutes, so..

    I think, there is no need to provide functionality via config letting users not to create some of final JSON structure (not all of us need those field and those might be better off in the metadata), but every use case is unique and it's not fair to throw everyone in the same barrel, especially if it's limited space inside. So, overall, it should be easy to unset some of those JSON fields afterwards.

    I also used bestEffort option which implies processor won't crash during the parsing if there would be some problem in an original message as long as it is bound, in a certain degree, to rfc.

    opened by DpoBoceka 17
  • Kafka Input only gets the topic messages, does not get keys

    Kafka Input only gets the topic messages, does not get keys

    Currently the input of type Kafka will only get the message from a given topic, is there any plan to enable also getting the keys?

    My use case is very simply that I need to back up the _schemas topic, but then in order to restore I need to be able to produce those messages with the same key they were originally produced with.

    Would you accept a PR for this?

    question 
    opened by chris-vest 16
  • Improvements to the avro processor

    Improvements to the avro processor

    HI again,

    I am working with the Avro processor. We are trying to use it to process Avro textual records (or pure JSON) as the event input, and marshal it to Avro a record to send to GCP as bytes.

    Currently the process we have been debugging in #377 is working, but we are trying to move Avro validation and processing closer to our event processing side (after GCP).

    In our ideal flow our event publishers would send either pure JSON or textual Avro records to Benthos, Benthos would process those records into Avro records and transmit them to our pipeline as a Avro byte object. The pipeline would be able to validate the bytes with the Avro reader and writer.

    Is this possible with Benthos and the Avro processor currently, or am I misunderstanding what the Avro processor is doing?

    I have added it to a test pipeline with a schema defined and it seems to have no affect.

    A second question is whether the Avro.schema property can be loaded from a file dynamically, or does it have to come from a single Avro schema definition?

    Here is our example Avro schema:

    {
      "type" : "record",
      "name" : "Event_TestEvent",
      "fields" : [ {
        "name" : "event_type",
        "type" : {
          "type" : "enum",
          "name" : "EventType",
          "symbols" : [ "TestEvent" ]
        }
      }, {
        "name" : "metadata",
        "type" : {
          "type" : "record",
          "name" : "Metadata",
          "fields" : [ {
            "name" : "schema_version",
            "type" : "string"
          } ]
        }
      }, {
        "name" : "event_properties",
        "type" : {
          "type" : "record",
          "name" : "TestEventProperties",
          "fields" : [ {
            "name" : "source",
            "type" : [ "null", "string" ],
            "default" : null
          } ]
        }
      }, {
        "name" : "event_id",
        "type" : "string"
      }, {
        "name" : "timestamp",
        "type" : {
          "type" : "long",
          "logicalType" : "timestamp-millis"
        }
      } ]
    }
    

    Benthos config:

    http:
      address: 0.0.0.0:1337
      read_timeout: 5s
      root_path: /benthos
      debug_endpoints: true
    
    input:
      type: broker
      broker:
        copies: 25
        inputs:
        - type: http_server
          http_server:
            address: ""
            cert_file: ""
            key_file: ""
            path: /track
            timeout: 5s
            ws_path: /track/ws
    
    buffer:
      type: memory
      memory:
        limit: 1_000_000_000
    
    pipeline:
      processors:
        - avro:
            operator: from_json
            encoding: textual
            schema: "{\"type\":\"record\",\"name\":\"Event_TestEvent\",\"fields\":[{\"name\":\"event_type\",\"type\":{\"type\":\"enum\",\"name\":\"EventType\",\"symbols\":[\"TestEvent\"]}},{\"name\":\"metadata\",\"type\":{\"type\":\"record\",\"name\":\"Metadata\",\"fields\":[{\"name\":\"schema_version\",\"type\":\"string\"}]}},{\"name\":\"event_properties\",\"type\":{\"type\":\"record\",\"name\":\"TestEventProperties\",\"fields\":[{\"name\":\"source\",\"type\":[\"null\",\"string\"],\"default\":null}]}},{\"name\":\"event_id\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}"
    
        - log:
            level: INFO
            message: "DATA ${!content}"
    
    output:
      type: stdout
    
    logger:
      prefix: benthos
      level: INFO
      add_timestamp: true
      json_format: true
      static_fields:
        '@service': benthos
    
    shutdown_timeout: 20s
    

    Thanks!

    enhancement processors 
    opened by gregorskii 15
  • Add new encrypt/decrypt processors

    Add new encrypt/decrypt processors

    Hi,

    Thanks for open sourcing this, its pretty awesome. I couldn't see any scope for custom processors, is that a case of fork and add them? Im planning a encyrpt/decrypt processor, would they be of interest in the main project?

    Paul

    enhancement processors 
    opened by mintbridge 15
  • Make service.RunWithOpts public again

    Make service.RunWithOpts public again

    On V3, we could start the service with RunWithOpts. On V4, we can only start it passing options if we use the StreamBuilder. Although StreamBuilder is great, RunWithOpts is more suitable for most of our use cases, so it would be nice to get this public again :)

    P.S: This was already discussed on the Discord channel a few time ago.

    enhancement plugin api 
    opened by joao3101 2
  • amqp_0_9 deadlock waiting  ack

    amqp_0_9 deadlock waiting ack

    During load testing, I found that benthos can stay deadlock on amqp_0_9 output waiting for ack here: https://github.com/benthosdev/benthos/blob/main/internal/impl/amqp09/output.go#L282

    This can lead to full deadlock of the benthos processing since the message is not released and no error is triggerred.

    It happen randomly near millions of messages processing and maybe related to a bug on amqp side but it maybe possible to add a (configurable) timeout on benthos side. This could in this case trigger a timeout error and allow any error handling in benthos like the resubmitting of the message.

    Stack trace of the locked goroutine:

    goroutine 261 [semacquire, 19 minutes]:
    sync.runtime_Semacquire(0x101d7c0?)
    	/usr/local/go/src/runtime/sema.go:56 +0x25
    sync.(*WaitGroup).Wait(0xc00126fb90?)
    	/usr/local/go/src/sync/waitgroup.go:136 +0x52
    github.com/rabbitmq/amqp091-go.(*DeferredConfirmation).Wait(...)
    	/go/pkg/mod/github.com/rabbitmq/[email protected]/confirms.go:178
    github.com/benthosdev/benthos/v4/internal/impl/amqp09.(*amqp09Writer).WriteWithContext.func1(0xc004b6ea70?, 0x0?)
    	/go/src/github.com/benthosdev/benthos/internal/impl/amqp09/output.go:282 +0x487
    github.com/benthosdev/benthos/v4/internal/component/output.IterateBatchedSend(0x440c20?, 0xc00126fb60?)
    	/go/src/github.com/benthosdev/benthos/internal/component/output/batched_send.go:34 +0x10d
    github.com/benthosdev/benthos/v4/internal/impl/amqp09.(*amqp09Writer).WriteWithContext(0xc00067d180, {0x36c9ce0?, 0xc0000d4000?}, 0xc003a72eb8)
    	/go/src/github.com/benthosdev/benthos/internal/impl/amqp09/output.go:237 +0x10b
    github.com/benthosdev/benthos/v4/internal/component/output.(*AsyncWriter).latencyMeasuringWrite(0xc001905e30, 0xf?)
    	/go/src/github.com/benthosdev/benthos/internal/component/output/async_writer.go:104 +0xbe
    github.com/benthosdev/benthos/v4/internal/component/output.(*AsyncWriter).loop.func4()
    	/go/src/github.com/benthosdev/benthos/internal/component/output/async_writer.go:254 +0x35d
    created by github.com/benthosdev/benthos/v4/internal/component/output.(*AsyncWriter).loop
    	/go/src/github.com/benthosdev/benthos/internal/component/output/async_writer.go:292 +0x5dc
    

    benthos-stack.txt

    bug inputs waiting for upstream 
    opened by sapk 2
  • Whether the output batch supports multi threading

    Whether the output batch supports multi threading

    hello I read the code related to batcher and found that it seems to be single thread processing. Is there any way to make this conversion parquet file faster. https://github.com/benthosdev/benthos/blob/811c58786a46085861a828f7fd606e659f872253/internal/component/output/batcher/batcher.go#L63-L156

        batching:
          byte_size: 125829120
          count: 20000
          period: 30s
          processors:
            - parquet:
                compression: snappy
                operator: from_json
                schema: ''
    
    question 
    opened by skyoct 1
  • Update kafka franz logging to be able to treat debug logs like trace logs

    Update kafka franz logging to be able to treat debug logs like trace logs

    This is kind of a crazy PR and not great code imo. Having one component have a different log level than the rest would be a much better alternative but I don't see a way to do that...

    opened by ekeric13 1
  • bloblang: hostname() errors without Benthos

    bloblang: hostname() errors without Benthos

    I'm looking at using bloblang in another app. The only issue I've found so far is that hostname doesn't seem to work.

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"log"
    
    	"github.com/benthosdev/benthos/v4/public/bloblang"
    )
    
    func main() {
    	mapping := `
    		root.host = hostname()
    		root.ksuid = ksuid()
    	`
    	exe, err := bloblang.Parse(mapping)
    	if err != nil {
    		log.Fatal(err)
    	}
    	src := "{}"
    	var object interface{}
    	err = json.Unmarshal([]byte(src), &object)
    	if err != nil {
    		log.Fatal(err)
    	}
    	res, err := exe.Query(object)
    	if err != nil {
    		log.Fatal(err)
    	}
    	jsonBytes, err := json.MarshalIndent(res, "", "  ")
    	if err != nil {
    		log.Fatal(err)
    	}
    	fmt.Println(string(jsonBytes))
    }
    

    The output is below. If you comment out hostname, then ksuid works just fine.

    2022/06/14 08:56:46 unrecognised function 'hostname': hostn
    exit status 1
    

    Bloblang is fantastic! This is a great little scripting tool and works really well for what you're doing -- and hopefully what I'm doing. My only other concern is that it pulls A LOT of dependencies into my application when I include it.

    enhancement documentation bloblang 
    opened by billgraziano 4
  • Allow multiple batch input in tests

    Allow multiple batch input in tests

    Hi, firstly, thanks for this great and neat tool.

    I would like to be able to test how a processor is working across multiple batch and this need to be able to define multiple input batch like the output batch.

    My use case is verifying that my processors are working by batch and not across them by skipping some message in one batch but not in an other batch. Like a deduplication but by batch and not across them.

    This would mean a breaking change by switching configuration tests[].input_batch[] to tests[].input_batch[][] similar to output batch structure.

    I would gladly contribute to do this change but I would need your go before since it is a breaking change.

    enhancement testing 
    opened by sapk 1
Releases(v4.3.0)
Owner
Ashley Jeffs
If you want to get in touch please find me in person I'm not good with computers.
Ashley Jeffs
churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline applications.

Churro - ETL for Kubernetes churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline appli

churrodata 13 Mar 10, 2022
xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL.

xyr [WIP] xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL. Supported Drivers

Mohammed Al Ashaal 55 Apr 4, 2022
CUE is an open source data constraint language which aims to simplify tasks involving defining and using data.

CUE is an open source data constraint language which aims to simplify tasks involving defining and using data.

null 2.7k Jun 29, 2022
Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.

Gleam Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize. Gleam is built

Chris Lu 3.1k Jun 24, 2022
A clean, safe, user-friendly implementation of GraphQL's Dataloader, written with generics in go

go-dataloader A clean, safe, user-friendly implementation of GraphQL's Dataloader, written with generics in go (go1.18beta1). Features written in gene

yckao 11 May 30, 2022
Declarative streaming ETL for mundane tasks, written in Go

Benthos is a high performance and resilient stream processor, able to connect various sources and sinks in a range of brokering patterns and perform hydration, enrichments, transformations and filters on payloads.

Ashley Jeffs 4.5k Jun 26, 2022
Tnbassist - A CLI tool for thenewboston blockchain to perform various mundane tasks like taking daily accounts backup

TNB Assist is a CLI (Command Line Interface) tool for thenewboston blockchain to perform various mundane tasks like taking daily accounts backup, computing statistics, etc easier.

Open blockchain explorer 1 Feb 14, 2022
A library for performing data pipeline / ETL tasks in Go.

Ratchet A library for performing data pipeline / ETL tasks in Go. The Go programming language's simplicity, execution speed, and concurrency support m

Daily Burn 385 Jan 19, 2022
omniparser: a native Golang ETL streaming parser and transform library for CSV, JSON, XML, EDI, text, etc.

omniparser Omniparser is a native Golang ETL parser that ingests input data of various formats (CSV, txt, fixed length/width, XML, EDI/X12/EDIFACT, JS

JF Technology 462 Jun 22, 2022
Package tasks is an easy to use in-process scheduler for recurring tasks in Go

Tasks Package tasks is an easy to use in-process scheduler for recurring tasks in Go. Tasks is focused on high frequency tasks that run quick, and oft

Benjamin Cane 97 Jun 20, 2022
Delay-tasks - A delayed tasks implementation for golang

delay-tasks An implementation of delayed tasks. Usage $ git clone https://github

null 2 Jan 14, 2022
churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline applications.

Churro - ETL for Kubernetes churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline appli

churrodata 13 Mar 10, 2022
xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL.

xyr [WIP] xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL. Supported Drivers

Mohammed Al Ashaal 55 Apr 4, 2022
Server and client implementation of the grpc go libraries to perform unary, client streaming, server streaming and full duplex RPCs from gRPC go introduction

Description This is an implementation of a gRPC client and server that provides route guidance from gRPC Basics: Go tutorial. It demonstrates how to u

Joram Wambugu 0 Nov 24, 2021
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

About This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, excha

Alex 74 May 29, 2022
A declarative struct-tag-based HTML unmarshaling or scraping package for Go built on top of the goquery library

goq Example import ( "log" "net/http" "astuart.co/goq" ) // Structured representation for github file name table type example struct { Title str

Andrew Stuart 217 May 30, 2022
Declarative penetration testing orchestration framework

Decker - Penetration Testing Orchestration Framework Purpose Decker is a penetration testing orchestration framework. It leverages HashiCorp Configura

Steven Aldinger 270 Jun 10, 2022
Continuous Delivery for Declarative Kubernetes, Serverless and Infrastructure Applications

Continuous Delivery for Declarative Kubernetes, Serverless and Infrastructure Applications Explore PipeCD docs » Overview PipeCD provides a unified co

PipeCD 599 Jun 23, 2022
Declarative web scraping

Ferret Try it! Docs CLI Test runner Web worker What is it? ferret is a web scraping system. It aims to simplify data extraction from the web for UI te

MontFerret 5k Jun 25, 2022
vjson is a Go package that helps to validate JSON objects in a declarative way.

vjson vjson is a Go package that helps to validate JSON objects in a declarative way. Getting Started Installing For installing vjson, use command bel

Milad Ebrahimi 30 May 27, 2022
Declarative golang HTTP client

go-req Declarative golang HTTP client package req_test

陈杨文 15 Apr 11, 2022
Declarative error handling for Go.

ErrorFlow Declarative error handling for Go. Motivation Reading list: Don't defer Close() on writable files Error Handling — Problem Overview Proposal

Serhiy T 8 Mar 3, 2022
Declarative CLI Version manager. Support Lazy Install and Sharable configuration mechanism named Registry. Switch versions seamlessly

aqua Declarative CLI Version manager. Support Lazy Install and Sharable configuration mechanism named Registry. Switch versions seamlessly. Index Slid

Shunsuke Suzuki 149 Jun 28, 2022
Graph-based Declarative Configuration Language

Virgo Configuration Language Most configuration problems reduce to graphs, e.g. Dockerfiles and Makefiles But there are no graph-based configuration l

Matt Rickard 108 Jun 15, 2022
A Declarative Cloud Firewall Reverse Proxy Solution with Companion Mobile App

A declarative Cloud firewall reverse proxy solution with inbuilt DDoS protection and alerting mechanism to protect your servers and keeping an eye on those malicious requests

null 12 Apr 4, 2022
A wrapper of aliyun-cli subcommand alidns, run aliyun-cli in Declarative mode.

aliyun-dns A wrapper of aliyun-cli subcommand alidns, run aliyun-cli in Declarative mode. Installation Install aliyun-cli. Usage $ aliyun-dns -h A wra

许嘉华 0 Dec 21, 2021