Substation is a cloud native toolkit for building modular ingest, transform, and load (ITL) data pipelines

Overview

Substation

Substation is a cloud native data pipeline toolkit.

What is Substation?

Substation is a modular ingest, transform, load (ITL) application for moving data between distributed systems. Originally designed to collect, normalize, and enrich security event data, the application provides methods for achieving high quality data through interconnected data pipelines.

Substation also provides Go packages for filtering and modifying JSON data.

Features

As an event-driven ITL application, Substation has these features:

  • real-time event filtering and processing
  • cross-dataset event correlation and enrichment
  • concurrent event routing to downstream systems
  • runs on containers, built for extensibility
    • support for new event filters and processors
    • support for new ingest sources and load destinations
    • supports creation of custom applications (e.g., multi-cloud)

As a package, Substation has these features:

Use Cases

Substation was originally designed to support the mission of achieving high quality data for threat hunting, threat detection, and incident response, but it can be used to move data between many distributed systems and services. Here are some example use cases:

  • data availability: sink data to an intermediary streaming service such as AWS Kinesis, then concurrently sink it to a data lake, data warehouse, and SIEM
  • data consistency: normalize data across every dataset using a permissive schema such as the Elastic Common Schema
  • data completeness: enrich data by integrating AWS Lambda functions and building self-populating AWS DynamoDB tables for low latency, real-time event context

Example Data Pipelines

Simple

The simplest data pipeline is one with a single source (ingest), a single transform, and a single sink (load). The diagram below shows pipelines that ingest data from different sources and sink it unmodified to a data warehouse where it can be used for analysis.

graph TD
    sink(Data Warehouse)

    %% pipeline one
    source_a(HTTPS Source)
    processing_a[Transfer]

    %% flow
    subgraph pipeline X
    source_a ---|Push| processing_a
    end

    processing_a ---|Push| sink

    %% pipeline two
    source_b(Data Lake)
    processing_b[Transfer]

    %% flow
    subgraph pipeline Y
    source_b ---|Pull| processing_b
    end

    processing_b ---|Push| sink

Complex

The complexity of a data pipeline, including its features and how it connects with other pipelines, is up to the user. The diagram below shows two complex data pipelines that have these feature:

  • both pipelines write unmodified data to intermediary streaming data storage (e.g., AWS Kinesis) to support concurrent consumers and downstream systems
  • both pipelines transform data by enriching it from their own inter-pipeline metadata lookup (e.g., AWS DynamoDB)
  • pipeline Y additionally transforms data by enriching it from pipeline X's metadata lookup
graph TD

    %% pipeline a
    source_a_http(HTTPS Source)
    sink_a_streaming(Streaming Data Storage)
    sink_a_metadata(Metadata Lookup)
    sink_a_persistent[Data Warehouse]
    processing_a_http[Transfer]
    processing_a_persistent[Transform]
    processing_a_metadata[Transform]

    %% flow
    subgraph pipeline Y
    source_a_http ---|Push| processing_a_http
    processing_a_http ---|Push| sink_a_streaming
    sink_a_streaming ---|Pull| processing_a_persistent
    sink_a_streaming ---|Pull| processing_a_metadata
    processing_a_persistent---|Push| sink_a_persistent
    processing_a_persistent---|Pull| sink_a_metadata
    processing_a_metadata ---|Push| sink_a_metadata
    end

    processing_a_persistent ---|Pull| sink_b_metadata

    %% pipeline b
    source_b_http(HTTPS Source)
    sink_b_streaming(Streaming Data Storage)
    sink_b_metadata(Metadata Lookup)
    sink_b_persistent(Data Warehouse)
    processing_b_http[Transfer]
    processing_b_persistent[Transform]
    processing_b_metadata[Transform]

    %% flow
    subgraph pipeline X
    source_b_http ---|Push| processing_b_http
    processing_b_http ---|Push| sink_b_streaming
    sink_b_streaming ---|Pull| processing_b_persistent
    sink_b_streaming ---|Pull| processing_b_metadata
    processing_b_persistent---|Push| sink_b_persistent
    processing_b_persistent---|Pull| sink_b_metadata
    processing_b_metadata ---|Push| sink_b_metadata
    end

As a toolkit, Substation makes no assumptions about how data pipelines are configured and connected. We encourage experimentation and outside-the-box thinking when it comes to pipeline design!

Quickstart

Users can use the steps below to test Substation's functionality. We recommend doing the steps below in a Docker container (we've included Visual Studio Code configurations for developing and testing Substation in .devcontainer/ and .vscode/).

Step 1: Compile the File Binary

From the project root, run the commands below to compile the Substation file app.

$ cd cmd/file/substation/
$ go build .
$ ./substation -h

Step 2: Compile the quickstart Configuration File

From the project root, run the commands below to compile the quickstart Jsonnet configuration files into a Substation JSON config.

$ sh build/config/compile.sh

Step 3: Test Substation

From the project root, run the commands below to create a sample events file and test Substation. After this, we recommend reviewing the config documentation and running more tests with other event processors to learn how the app works.

$ echo '{"hello":"world"}' >> quickstart.json
$ ./cmd/file/substation/substation -input /tmp/quickstart.json -config config/quickstart/config.json

Step 4: Test Substation in AWS

Navigate to the build directory and review the terraform, container, and config documentation. build/terraform/aws/example_pipeline.tf is a fully-featured data pipeline that can be used as an example of how to deploy pipelines in AWS.

Additional Documentation

More documentation about Substation can be found across the project, including:

Licensing

Substation and its associated code is released under the terms of the MIT License.

Comments
  • refactor!: Standardizing Use of io

    refactor!: Standardizing Use of io

    Description

    • Updates functions and methods that handle streaming data to use the io package
      • internal/aws/appconfig
      • internal/aws/s3manager
      • internal/file
      • internal/sink/s3
    • Adds a SetConfig method to the Substation app that accepts an io.Reader
    • Adds an internal package for detecting content (media) type of files
    • Adds an internal package that wraps bufio and Scanner
    • Moves the SUBSTATION_CONFIG handler from app.go to config.go
    • Changes and moves the SUBSTATION_SCAN_METHOD handler from app.go to internal/bufio (breaking)
    • Changes the Substation app to make the package private, except for the creation of a new app (breaking)
    • Changes all published apps (cmd/aws/lambda, cmd/file, examples/service) to support changes made in this PR
      • cmd/file now dynamically retrieves configs from local disk, HTTP(S), or S3
      • cmd/aws/lambda now dynamically retrieves configs from local disk, HTTP(S), S3, or AppConfig
    • Updated CONTRIBUTING.md with design pattern documentation for reading and writing streaming data

    Motivation and Context

    Currently the project uses bytes in places where the io package makes more sense, specifically in handling of files. This is most risky when downloading content from cloud buckets and the web -- with bytes all of the content is read into memory at once, but io is more memory efficient because it's streaming data instead of putting it all into memory. Since we're a serverless application and some serverless providers (like AWS Lambda) charge based on memory allocation, there's a tangible benefit to making the application more efficient on memory. More than that, switching to io decreases the likelihood that we'll need to do more refactors in the future.

    While refactoring these methods and functions a few other issues and opportunities came up:

    • having a standardized way to create and use a bufio scanner
    • having a standardized way to inspect files by media type
    • making the core application private

    How Has This Been Tested?

    • All unit tests were converted from supporting bytes to io
    • Packages that cannot be easily unit tested are supported by example tests -- these serve the dual function of testing and showing how to use packages
    • Integration tested using the cmd/file app
    • End to end tested using examples/aws in a development AWS account

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [ ] New feature (non-breaking change which adds functionality)
    • [x] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
    opened by jshlbrd 2
  • fix(linter): fix golangci-lint warnings across substation

    fix(linter): fix golangci-lint warnings across substation

    This updates the Go code lintng to use the meta-linter golangci-lint and fixes related lints.

    Description

    Changes include:

    • fix: handle or explicitly ignore all errors
    • fix: remove redundant function calls (#31) and conversions
    • fix: rename predeclared caps identifier
    • chore: tools: gofmt -> gofumpt, and staticcheck -> golangci-lint
    • docs: update comments to remove grammar mistakes and follow Go comment conventions.

    Motivation and Context

    This closes #31, improves error handling and consistency across with project.

    How Has This Been Tested?

    Local builds, unit tests, and benchmarks all pass.

    Types of changes

    • [x] Bug fix (non-breaking change which fixes an issue)
    • [ ] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [ ] My change requires a change to the documentation.
    • [ ] I have updated the documentation accordingly.
    opened by shellcromancer 2
  • refactor: Update Configs

    refactor: Update Configs

    Description

    • Moves the Substation config so that it is public (can be imported by other Go apps)
      • config.go and Jsonnet functions are now in config/
      • example configurations are now in examples/
    • Adds runtime concurrency setting via SUBSTATION_CONCURRENCY environment variable
      • Default behavior is the same as the current behavior
    • Updated documentation

    Motivation and Context

    • Making the Substation config public was a requirement to make the condition inspectors and processors highly usable in other applications
    • Using SUBSTATION_CONCURRENCY is preferred over pinning concurrency to the number of CPUs -- there are some scenarios when less or more concurrency is required

    How Has This Been Tested?

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [x] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
    opened by jshlbrd 2
  • chore(main): release 0.6.1

    chore(main): release 0.6.1

    opened by github-actions[bot] 1
  • chore(main): release 0.6.0

    chore(main): release 0.6.0

    :robot: I have created a release beep boop

    0.6.0 (2022-11-30)

    ⚠ BREAKING CHANGES

    • Standardizing Use of io (#38)

    Features

    Code Refactoring


    This PR was generated with Release Please. See documentation.

    autorelease: tagged 
    opened by github-actions[bot] 1
  • feat: Add gRPC Support

    feat: Add gRPC Support

    Description

    • Adds initial Protobuf definitions to proto/
      • Adds build dependencies in all Dockerfile, including devcontainer
      • Adds build script in build/scripts/proto/compile.sh
    • Adds gRPC server and service package to internal/service
    • Adds gRPC sink to internal/sink
      • Adds internal/file to support loading a server certificate for the sink from one of many locations
    • Adds example application utilizing the gRPC Sink service
    • Updates devcontainer settings, including an install script

    Motivation and Context

    This update is the first step in the project becoming a true "distributed system" (in the architectural sense) and enables users to deploy the system in new ways (described as future work below). This is quite big from an impact point of view, so we should address any open questions before approving the PR.

    In the short term, this gives us the ability to support synchronous (sync) invocation in AWS Lambda. The problem we've had until now is that there was no way for the sink goroutine to send results back to the calling application -- this is a requirement in supporting sync invocations because the processed data must be returned by the Lambda handler. With this change we can send data from the sink to the caller by using a gRPC service for inter-process communication (IPC). This is shown in the diagram below where goroutines are represented by dotted lines and data flow is represented by solid arrows. (Using gRPC for IPC is described in more detail in the new example included in this PR.)

    graph TD
    handler -.- gRPC
    handler -.- transform
    handler -.- sink
    handler --> ingest
    ingest --> transform
    transform --> sink
    sink --> gRPC
    gRPC --> handler
    

    Not supporting sync invocation was a blocker to supporting some AWS services (like data transformation in Kinesis Firehose) and making the system behave like a data enrichment microservice -- it's not yet clear how we'll implement this, but these changes will make it possible to deploy Substation as an "enrichment node" within the context of a larger system that can be invoked using the existing Lambda processor.

    Onto future work, the added benefit of gRPC is that, with little effort on our part, we can extend that functionality to systems beyond serverless AWS services. This PR adds a definition for a Sink service that mimics the internal Sink interface, but we could also add definitions that mimic other components of the system as well. For example, these definitions would turn every processor and inspector into configurable microservices:

    // Applicator mirrors the Applicator interface defined in process
    service Applicator {
      rpc Apply(Capsule) returns (Capsule) {}
    }
    
    // Inspector mirrors the Inspector interface defined in condition
    service Inspector {
      rpc Inspect(Capsule) returns (Decision) {}
    }
    

    Overall, defining Protobuf based on the system's structs and interfaces would be a relatively safe method for making components accessible to external services (including services not written in Go) and letting others build their own distributed data pipelines on non-serverless infrastructure. (I don't anticipate the team at Brex doing this any time soon since it would increase complexity and we're happy with AWS Lambda, but if others are interested, then it's easy to support).

    How Has This Been Tested?

    The new example included in this PR acts as an integration test for all of the new features, including the proto, the internal gRPC service, the internal gRPC server, and the gRPC sink.

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [x] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
    opened by jshlbrd 1
  • chore(main): release 0.5.0

    chore(main): release 0.5.0

    :robot: I have created a release beep boop

    0.5.0 (2022-10-04)

    ⚠ BREAKING CHANGES

    • Update App Concurrency Model (#30)
    • Add Forward Compatibility for SNS (#21)

    Features

    • Add Forward Compatibility for SNS (#21) (b93dc1e)
    • Add Initial Support for Application Metrics (#25) (30f103d)
    • AppConfig Script Updates (#28) (5261485)
    • Customizable Kinesis Data Stream Autoscaling (#27) (2dd7ea7)
    • Improvements to JSON Parsing (#29) (98cac69)
    • Improvements to Reading and Decoding Files (#24) (e310cb5)

    Bug Fixes

    • linter: fix golangci-lint warnings across substation (#32) (9b7e077)
    • streamname bug (#23) (da9de62)

    Code Refactoring


    This PR was generated with Release Please. See documentation.

    autorelease: tagged 
    opened by github-actions[bot] 1
  • feat!: Encapsulation

    feat!: Encapsulation

    Description

    • Makes encapsulation the default pattern for handling data in all Substation applications and public APIs (see changes in /config/)
      • Adds backwards compatible functions for handling non-encapsulated data
    • Adds AWS Firehose sink support
    • Adds AWS SQS source and sink support
    • Adds context support condition API
    • Updates code style of public APIs

    This is a non-breaking change for ITL applications and a breaking change for the public APIs.

    Motivation and Context

    The motivation for adding encapsulation was the need to let users simultaneously handle both data (structured or unstructured) and metadata. This has two advantages:

    • sources can add metadata when data is ingested
    • metadata can be tracked alongside unstructured (e.g., binary) data

    This change means that users can now access and interpret information only the source application knows; for example:

    • substation/file: the filename and file size of the source file that data is read from
    • substation/aws/kinesis: the Kinesis stream data is delivered by and approximate arrival time of data
    • substation/aws/s3: the S3 bucket and object that data is read from

    It also provides total separation of metadata from data, which is most useful in sink applications; for example:

    • http: storing HTTP headers as metadata instead of in the data object
    • s3: storing the S3 prefix as metadata instead of in the data object
    • sumologic: storing the category as metadata instead of in the data object

    This gives us some flexibility for growth in the future. For example, instead of relying on configured settings for Kinesis streams and S3 buckets, we can add features to let users dynamically apply these from metadata.

    How Has This Been Tested?

    • All public APIs had their unit tests updated
    • Firehose and SQS features were tested in a development AWS account

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [x] New feature (non-breaking change which adds functionality)
    • [x] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
    opened by jshlbrd 1
  • chore(main): release 0.4.0

    chore(main): release 0.4.0

    opened by github-actions[bot] 1
  • chore(main): release 0.3.0

    chore(main): release 0.3.0

    :robot: I have created a release beep boop

    0.3.0 (2022-07-18)

    ⚠ BREAKING CHANGES

    • Migrate to Meta Processors (#7)

    Features


    This PR was generated with Release Please. See documentation.

    autorelease: tagged 
    opened by github-actions[bot] 1
  • chore(main): release 0.2.0

    chore(main): release 0.2.0

    :robot: I have created a release beep boop

    0.2.0 (2022-06-17)

    ⚠ BREAKING CHANGES

    • Pre-release Refactor (#5)

    Features

    • Add base64 Processor (#4) (cc76318)
    • Adds Gzip Processor and Content Inspector (#2) (cdd2999)

    Code Refactoring


    This PR was generated with Release Please. See documentation.

    autorelease: tagged 
    opened by github-actions[bot] 1
  • feat: DNS and IP Database Processing

    feat: DNS and IP Database Processing

    Description

    • Adds a Close method to the Applicator and BatchApplicator interfaces
    • Adds an internal package for supporting IP address enrichment from multiple enrichment database providers (internal/ip/database)
    • Adds an IP address database processor that applies enrichment to IP addresses from local databases
    • Adds a DNS processor

    Motivation and Context

    Currently the team at Brex runs DNS and IP address enrichment in a separate sub-system of our threat detection platform, but we identified an opportunity to migrate this into Substation. IP address enrichment is the most complex of these for two reasons:

    • it relies on having the ability to retrieve and load enrichment databases into the system at runtime
    • we don't know what enrichment database providers users may use

    The project's use of lazy loading, configuration through environment variables, and the ability to contextually retrieve files from local disk, HTTP(S), and AWS S3 solve many of these issues. As a starting point, I've added support for the free MaxMind and IP2Location databases.

    The handling of open files in the IPDatabase processor required the addition of a Close method to each applicator interface; this allows each processor to define it's own teardown process (in the case of IPDatabase, it's closing the open database readers).

    How Has This Been Tested?

    The changes were integration tested using the two new examples files. I can provide a link to a privately hosted MaxMind database so that the reviewer(s) can check that the code is working as expected.

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [x] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
    opened by jshlbrd 0
Releases(v0.6.1)
Owner
Brex
Brex
provide api for cloud service like aliyun, aws, google cloud, tencent cloud, huawei cloud and so on

cloud-fitter 云适配 Communicate with public and private clouds conveniently by a set of apis. 用一套接口,便捷地访问各类公有云和私有云 对接计划 内部筹备中,后续开放,有需求欢迎联系。 开发者社区 开发者社区文档

null 23 May 8, 2022
Tpf2-tpnetmap-toolkit - A toolkit to create svg map images from TransportFever2 world data

tpf2-tpnetmap-toolkit TransportFever2 のワールドデータから svg のマップ画像を作成するツールキットです。 1. 導入方

Nosrith 1 Feb 17, 2022
Cloud-Z gathers information and perform benchmarks on cloud instances in multiple cloud providers.

Cloud-Z Cloud-Z gathers information and perform benchmarks on cloud instances in multiple cloud providers. Cloud type, instance id, and type CPU infor

CloudSnorkel 16 Jun 8, 2022
A lightweight, cloud-native data transfer agent and aggregator

English | 中文 Loggie is a lightweight, high-performance, cloud-native agent and aggregator based on Golang. It supports multiple pipeline and pluggable

null 799 Dec 5, 2022
Transform latin letters to runes & vice versa. Go version.

Riimut Transform latin letters to runes & vice versa. Go version. Includes transformers for four main runic alphabets: Elder Futhark Younger Futhark M

Sampo Silvennoinen 0 Aug 2, 2022
Flowlogs2metrics - Transform flow logs into metrics

Overview Flow-Logs to Metrics (a.k.a. FL2M) is an observability tool that consum

null 24 Nov 23, 2022
GitOops is a tool to help attackers and defenders identify lateral movement and privilege escalation paths in GitHub organizations by abusing CI/CD pipelines and GitHub access controls.

GitOops is a tool to help attackers and defenders identify lateral movement and privilege escalation paths in GitHub organizations by abusing CI/CD pipelines and GitHub access controls.

OVO Technology 599 Nov 22, 2022
Tool for generating Spinnaker application/pipelines and k8s manifests

jarvis Just A Rather Very Intelligent System Get git clone [email protected]:ealebe

Yevhen Lebid 3 Jan 6, 2022
Modular Kubernetes operator to manage the lifecycle of databases

Ensemble Ensemble is a simple and modular Kubernetes Operator to manage the lifecycle of a wide range of databases. Infrastructure as code with Kubern

Tesera 41 Aug 12, 2022
Next generation recitation assignment tool for 6.033. Modular, scalable, fast

Next generation recitation assignment tool for 6.033. Modular, scalable, fast

Jay Lang 1 Feb 3, 2022
Build powerful pipelines in any programming language.

Gaia is an open source automation platform which makes it easy and fun to build powerful pipelines in any programming language. Based on HashiCorp's g

Gaia 4.9k Nov 28, 2022
🏯 Monitor your (gitlab/github) CI/CD pipelines via command line interface with fortress

__ _ / _| | | | |_ ___ _ __| |_ _ __ ___ ___ ___ | _/ _ \| '__| __| '__/ _ \/ __/ _

MrJosh 6 Mar 31, 2022
Drone plugin to skip pipelines based on changed files

drone-skip-pipeline Drone plugin to skip pipelines based on changed files. Build Build the binary with the following command: export GOOS=linux export

ownCloud CI 1 Aug 7, 2022
tfa is a 2fa cli tool that aims to help you to generate 2fa code on CI/CD pipelines.

tfa tfa is 2fa cli tool that aim to help you to generate 2fa code on CI/CD pipelines. You can provide secret with stdin or flag. Install brew install

Kaan Karakaya 28 Nov 27, 2022
🤖 DroneCI plugin to skip pipelines based on files changes

DroneCI Skip Pipeline ?? DroneCI plugin to skip pipelines based on files changes Motivations This DroneCI plugin enables you skip (or short-circuit) a

Josh Komoroske 3 Aug 27, 2022
Just a playground with some interesting concepts like pipelines aka middleware, handleFuncs, request validations etc. Check it out.

Pipeline a.k.a middleware in Go Just a playground with some interesting concepts like pipelines aka middleware, handleFuncs, request validations etc.

null 0 Dec 9, 2021
A sample for okteto pipelines with terraform

Okteto Pipeline with Terraform (PubSub) This sample covers a producer/consumer a

Okteto 1 Dec 23, 2021
Open Service Mesh (OSM) is a lightweight, extensible, cloud native service mesh that allows users to uniformly manage, secure, and get out-of-the-box observability features for highly dynamic microservice environments.

Open Service Mesh (OSM) Open Service Mesh (OSM) is a lightweight, extensible, Cloud Native service mesh that allows users to uniformly manage, secure,

Open Service Mesh 2.5k Nov 30, 2022
Polaris is a cloud-native service discovery and governance center

It can be used to solve the problem of service connection, fault tolerance, traffic control and secure in distributed and microservice architecture.

PolarisMesh 1.8k Dec 2, 2022