A pluggable backend API that enforces the Event Sourcing Pattern for persisting & broadcasting application state changes

Overview

stategate

A pluggable "Application State Gateway" that enforces the Event Sourcing Pattern for securely persisting & broadcasting application state changes

GoDoc

API Services/Methods

// EntityService serves API methods to clients that modify/query the current state of an entity
// An Entity is a single object with a type, domain, key, and k/v values
service EntityService {
  // Set sets the current state value of an entity, adds it to the event log, then broadcast the event to all interested consumers(EventService.Stream)
  rpc Set(Entity) returns(google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/api/entity/ref/{domain}/{type}/{key}"
    };
  }
  // Edit overwrites the k/v pairs present in the entity request without replacing the entire entity.
  // It then adds the state change to the event log, then broadcast the event to all interested consumers(EventService.Stream)
  // Edit returns the current state of the Entity after the mutation.
  rpc Edit(Entity) returns(Entity){
    option (google.api.http) = {
      patch: "/api/entity/ref/{domain}/{type}/{key}"
    };
  }
  // Revert reverts an Entity to a previous version of itself
  // Reverting an entity dispatches another event since it is a state change
  rpc Revert(EventRef) returns(Entity) {
    option (google.api.http) = {
      put: "/api/entity/ref/{domain}/{type}/{key}/revert"
    };
  }
  // Get gets an entity's current state
  rpc Get(EntityRef) returns(Entity) {
    option (google.api.http) = {
      get: "/api/entity/ref/{domain}/{type}/{key}"
    };
  }
  // Del hard deletes an entity from current state store, adds it's state prior to deletion to the event log, then broadcast the event to all interested consumers(EventService.Stream)
  // an Entity may be recovered via querying the Event store for historical records of the deleted Entity.
  rpc Del(EntityRef) returns(google.protobuf.Empty) {
    option (google.api.http) = {
      delete: "/api/entity/ref/{domain}/{type}/{key}"
    };
  }
  // Search queries the current state of entities
  rpc Search(SearchEntityOpts) returns(Entities) {
    option (google.api.http) = {
      get: "/api/entity/search"
    };
  }
}

// EventService serves API methods related to stategate Event Consumers
// Events are automatically emitted from mutations made from State mutations within the EntityService
service EventService {
  // Stream creates an event stream/subscription to state changes to entities in real time. Glob matching is supported.
  rpc Stream(StreamEventOpts) returns(stream Event) {
    option (google.api.http) = {
      get: "/api/events/stream"
    };
  }
  // Search queries historical events - every historical state change to an entity may be queried.
  rpc Search(SearchEventOpts) returns(Events) {
    option (google.api.http) = {
      get: "/api/events/search"
    };
  }
  // Get gets a single event
  rpc Get(EventRef) returns(Event) {
    option (google.api.http) = {
      get: "/api/events/ref/{domain}/{type}/{key}/{id}"
    };
  }
}

// PeerService provides a means for clients to communicate directly with one another WITHOUT making any state changes.
// Please note that all messages transported via the PeerService are not persisted in any way.
service PeerService {
  // Broadcast broadcasts a message to N subscribers(clients calling Stream)
  rpc Broadcast(Message) returns(google.protobuf.Empty){
    option (google.api.http) = {
      post: "/api/peers/broadcast"
      body: "*"
    };
  }
  // Stream consumes/streams messages from message producers(clients calling broadcast)
  rpc Stream(StreamMessageOpts) returns(stream PeerMessage){
    option (google.api.http) = {
      get: "/api/peers/stream"
    };
  }
}

// CacheService is for persisting short lived values in memory for performance-critical operations
service CacheService {
  // Set sets a value in the cache
  rpc Set(Cache) returns(google.protobuf.Empty){
    option (google.api.http) = {
      put: "/api/cache/ref/{domain}/{key}"
    };
  }
  // Get gets a value from the cache
  rpc Get(CacheRef) returns(Cache) {
    option (google.api.http) = {
      get: "/api/cache/ref/{domain}/{key}"
    };
  }
  // Del deletes a value from the cache
  rpc Del(CacheRef) returns(google.protobuf.Empty) {
    option (google.api.http) = {
      delete: "/api/cache/ref/{domain}/{key}"
    };
  }
}

// MutexService offers distributed locking capabilities for client's that need to coordinate with peer services.
service MutexService {
  // Lock locks a value for a period of time if it is not locked already.
  // If it is already locked, an error will be returned
  // It is best practice for client's to call Unlock when the distributed lock operation is completed instead of relying on the TTL
  rpc Lock(Mutex) returns(google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/api/mutex/ref/{domain}/{key}/lock"
    };
  }
  // Unlock unlocks the key(if it's currently locked) so that it may be locked again.
  // It is best practice for client's to call Unlock when the distributed lock operation is completed instead of relying on the TTL
  rpc Unlock(MutexRef) returns(google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/api/mutex/ref/{domain}/{key}/unlock"
    };
  }
}

Features

  • Capture all changes to an application's state(entities) as a sequence of events - Event Sourcing(EntityService/EventService)
  • High Performance Pubsub Interface Service(PeerService)
  • High Performance Caching Interface(CacheService)
  • High Performance Distributed Locking Interface(MutexService)
  • Stateless & horizontally scaleable
  • Native gRPC support
  • Embedded REST support / (transcoding)
  • Embedded GraphQL support /api/graphql (transcoding)
  • Embedded grpcweb support (transcoding)
  • Metrics Server(prometheus/pprof)
  • Authentication - JWT/OAuth with remote JWKS verification
  • Authorization - Rego based Authorization engine
  • Autogenerated Client gRPC SDK's
  • Structured JSON Logs
  • Sample Kubernetes Manifest
  • Sample Docker Compose
  • Pluggable "Storage" Providers
    • MongoDb
      • fully-tested
    • PostgreSQL
    • MySQL
    • Cassandra
  • Pluggable "Cache" Providers
    • Redis
      • fully-tested

Goals

  • Create a simple API interface for storing state(entities) and subscribing to state changes(events) using pluggable cache & storage providers
  • Capture all changes to an application's state/entities as a sequence of events.
  • Safe to swap backend providers without changing client-side code
  • Type-safe client's generated in many languages
  • Safe to expose to the public internet due to fine-grained authentication/authorization model.
  • Capture a persistant, immutable historical record of all state changes to entities using a pluggable storage provider
  • Revert/Rollback an entity to any previous version of itself at any point in time
  • Store identity(jwt.claims) & timestamp in event logs to capture who is changing what & when
  • Easy deployment model - fully configureable via environmental variables
  • Create complex client applications with stategate as their only dependency
  • Create serverless deployment model for stategate client applications

Design

Stategate was designed with EventSourcing in mind

What is Event Sourcing?

Event sourcing persists the state of a business entity such an Order or a Customer as a sequence of state-changing events. Whenever the state of a business entity changes, a new event is appended to the list of events. Since saving an event is a single operation, it is inherently atomic. The application reconstructs an entity’s current state by replaying the events.

Applications persist events in an event store, which is a database of events. The store has an API for adding and retrieving an entity’s events. The event store also behaves like a message broker. It provides an API that enables services to subscribe to events. When a service saves an event in the event store, it is delivered to all interested subscribers.

Event-Sourcing

Primitives

Entity

An entity represents a single record(k/v pairs) with a unique key with a given type, belonging to a particular domain

    // Entity represents a single record(k/v pairs) with a unique key with a given [type](https://en.wikipedia.org/wiki/Type_system), belonging to a particular [domain](https://en.wikipedia.org/wiki/Domain-driven_design)
    // EventService clients should use the EntityService to persist & interact with the current state of an entity.
    message Entity {
      // the entity's business domain(ex: accounting)
      // must not be empty or contain spaces
      string domain =1[(validator.field) = {regex : "^\\S+$"}];
      // the entity's type (ex: user)
      // must not be empty or contain spaces
      string type =2[(validator.field) = {regex : "^\\S+$"}];
      // the entity's key (unique within type). 
      // must not be empty or contain spaces
      string key =3[(validator.field) = {regex : "^\\S+$"}];
      // the entity's values (k/v pairs)
      google.protobuf.Struct values = 4[(validator.field) = {msg_exists : true}];
    }

Events

Event is primitive that represents a single state change to an entity

    // Event is primitive that represents a single state change to an entity
    // Events are persisted to history & broadcasted to interested consumers(Stream) any time an entity is created/modified/deleted
    // Events are immutable after creation and may be searched.
    // EventService client's may search events to query previous state of an entity(s)
    message Event {
      // identifies the event(uuid v4).
      string id = 1[(validator.field) = {uuid_ver : 4}];
      // state of an Entity after it has been mutated
      Entity entity = 2[(validator.field) = {msg_exists : true}];
      // the invoked method that triggered the event
      string method =5[(validator.field) = {string_not_empty : true}];
      // the authentication claims of the event producer.
      google.protobuf.Struct claims =3[(validator.field) = {msg_exists : true}];
      // timestamp(ns) of when the event was received.
      int64 time =4[(validator.field) = {int_gt : 0}];
    }

Messages

Message is a non-persisted message passed between Peers as a means of communication

// Message is an arbitrary message to be delivered to a Peer
// Messages are NOT persisted and should only be used to communicate with other Peers
message Message {
  // the message's business domain(ex: accounting)
  // must not be empty or contain spaces
  string domain =1[(validator.field) = {regex : "^\\S+$"}];
  // the message's channel(ex: general)
  // must not be empty or contain spaces
  string channel =2[(validator.field) = {regex : "^\\S+$"}];
  // message's type (ex: comment)
  // must not be empty or contain spaces
  string type =3[(validator.field) = {regex : "^\\S+$"}];
  // the body of the message(k/v values).
  google.protobuf.Struct body =4[(validator.field) = {msg_exists : true}];
}

Environmental Variables

.env files are loaded if found in the same directory as stategate

# port to serve on (optional). defaults to 8080
STATEGATE_PORT=8080
# enable debug logs (optional)
STATEGATE_DEBUG=true
# disable all authentication & authorization(jwks, request policies, response policies) (optional)
STATEGATE_AUTH_DISABLED=false
# tls cert file (optional)
STATEGATE_TLS_CERT_FILE=/tmp/certs/stategate.cert
# tls key file (optional)
STATEGATE_TLS_KEY_FILE=/tmp/certs/stategate.key

# JSON Web Key Set remote URI used for fetching jwt signing keys for verification/validation (optional)
STATEGATE_JWKS_URI=https://www.googleapis.com/oauth2/v3/certs

# base64 encoded OPA rego policy executed on inbound requests from clients (optional)
STATEGATE_REQUEST_POLICY=cGFja2FnZSBzdGF0ZWdhdGUuYXV0aHoKCmRlZmF1bHQgYWxsb3cgPSB0cnVl
# base64 encoded OPA rego policy executed on responses sent to clients (optional)
STATEGATE_RESPONSE_POLICY=cGFja2FnZSBzdGF0ZWdhdGUuYXV0aHoKCmRlZmF1bHQgYWxsb3cgPSB0cnVl

# storage provider configuration(JSON) options: [mongo] REQUIRED
STATEGATE_STORAGE_PROVIDER={ "name": "mongo", "database": "testing", "addr": "mongodb://localhost:27017/testing" }

# cache provider configuration(JSON) options: [redis] REQUIRED
STATEGATE_CACHE_PROVIDER={ "name": "redis", "addr": "localhost:6379", "user": "xxx", "password": "xxxxxxxxxx" }

# CORS options for accessing stategate from the browser
STATEGATE_CORS_ALLOW_ORIGINS=*
STATEGATE_CORS_ALLOW_METHODS=POST,GET,PUT,DELETE
STATEGATE_CORS_ALLOW_HEADERS=*

Implementation Details

Storage Providers

supported providers: [mongo]

  • A stategate storage provider is a pluggable, 3rd party database storage service.
  • Storage providers provide persistance for all current entities/events and should be scaled independently of stategate instances.

interface

// EntityProvider provides logic for querying/persisting entities
type EntityProvider interface {
	SetEntity(ctx context.Context, entity *stategate.Entity) *errorz.Error
	EditEntity(ctx context.Context, entity *stategate.Entity) (*stategate.Entity, *errorz.Error)
	SearchEntities(ctx context.Context, ref *stategate.SearchEntityOpts) (*stategate.Entities, *errorz.Error)
	DelEntity(ctx context.Context, ref *stategate.EntityRef) *errorz.Error
	GetEntity(ctx context.Context, ref *stategate.EntityRef) (*stategate.Entity, *errorz.Error)
}

// EventProvider provides logic for querying/persisting events
type EventProvider interface {
	SaveEvent(ctx context.Context, event *stategate.Event) *errorz.Error
	SearchEvents(ctx context.Context, ref *stategate.SearchEventOpts) (*stategate.Events, *errorz.Error)
	GetEvent(ctx context.Context, ref *stategate.EventRef) (*stategate.Event, *errorz.Error)
}

// Provider is an event & entity provider
type Provider interface {
	EventProvider
	EntityProvider
	Close() error
}

Cache Providers

supported providers: [redis]

  • A stategate cache provider is a pluggable, 3rd party caching & message-queue service.
  • Cache providers provide a way for stategate to store ephemeral data & broadcast events to itself while scaling horizontally.

interface

// ChannelProvider acts as dependency injection for broadcasting messages to stategate instances as they fan out
type ChannelProvider interface {
	PublishEvent(ctx context.Context, event *stategate.Event) *errorz.Error
	GetEventChannel(ctx context.Context) (chan *stategate.Event, *errorz.Error)
	PublishMessage(ctx context.Context, message *stategate.PeerMessage) *errorz.Error
	GetMessageChannel(ctx context.Context) (chan *stategate.PeerMessage, *errorz.Error)
}

// CacheProvider acts as dependency injection for caching ephemeral data 
type CacheProvider interface {
	Get(ctx context.Context, ref *stategate.CacheRef) (*stategate.Cache, *errorz.Error)
	Set(ctx context.Context, value *stategate.Cache) *errorz.Error
	Del(ctx context.Context, value *stategate.CacheRef) *errorz.Error
	
}

// MutexProvider acts as dependency injection for distributed mutex operations
type MutexProvider interface {
	Lock(ctx context.Context, ref *stategate.Mutex) *errorz.Error
	Unlock(ctx context.Context, value *stategate.MutexRef) *errorz.Error
}

// Provider is a channel, cache, & mutex provider
type Provider interface {
	CacheProvider
	ChannelProvider
	MutexProvider
	Close() error
}
  • Cache providers should be scaled independently of stategate instances.

Authorization

Request Authorization Policies

TODO

Response Authorization Policies

TODO

Authentication

Remote JWKS URI

https://auth0.com/docs/tokens/json-web-tokens/json-web-key-sets

TODO

Sample GraphQL queries

mutation {
  setEntity(input: {
    domain: "accounting",
    type: "user"
    key: "[email protected]",
    values: {
      first_name: "Coleman"
      last_name: "Word"
    }
  })
}

FAQ

You might also like...
🚀 gnet is a high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go./ gnet 是一个高性能、轻量级、非阻塞的事件驱动 Go 网络框架。 Fast event-loop networking for Go
Fast event-loop networking for Go

evio is an event loop networking framework that is fast and small. It makes direct epoll and kqueue syscalls rather than using the standard Go net pac

High-performance, non-blocking, event-driven, easy-to-use networking framework written in Go, support tls/http1.x/websocket.

High-performance, non-blocking, event-driven, easy-to-use networking framework written in Go, support tls/http1.x/websocket.

Walrus 🕑 Real-time event streaming platform built on top of gRPC streams
Walrus 🕑 Real-time event streaming platform built on top of gRPC streams

Walrus 🕑 Real-time event streaming platform built on top of gRPC streams Table of Contents About the project Built With How it works Getting Started

Bell is the simplest event system written in Go (Golang) which is based on the execution of handlers independent of the main channel.

Bell Bell is the simplest event system written in Go (Golang) which is based on the execution of handlers independent of the main channel. Written in

Event Source for Direktiv and Knative

Event Source for Direktiv and Knative This repository contains ContainerSources for Knative Eventing. The following sources are available: Direktiv (I

Goket (Golang Keyboard Event Tree) is a proof-of-concept code for using keyboard events trees for performing operations.

Goket Goket (Golang Keyboard Event Tree) is a proof-of-concept code for using keyboard events trees for performing operations. Its main goal is to all

Event driven modular status-bar for dwm; written in Go & uses Unix sockets for signaling.

dwmstat A simple event-driven modular status-bar for dwm. It is written in Go & uses Unix sockets for signaling. The status bar is conceptualized as a

Events - Event Manager - Nodejs like

events Event Manager - Nodejs like Please take a look at the TESTS, for further comprehension. Example package main import ( "errors" "fmt" "log"

Simple application in Golang that retrieves your ip and updates your DNS entries automatically each time your IP changes.

DNS-Updater Simple application in Golang that retrieves your ip and updates your DNS entries automatically each time your IP changes. Motivation Havin

42_atomys 9 Mar 10, 2022
Judas is a pluggable phishing proxy.

Judas is a pluggable phishing proxy.

cances 31 Jul 11, 2022
Pluggable Go server to generate Powerline segments

gowerline Because Python is hard and I've always wanted to write my segments in Go. What is this ? This is a deamon that generates and returns Powerli

Thomas Maurice 4 Jul 25, 2022
meek is a blocking-resistant pluggable transport for Tor.

meek is a blocking-resistant pluggable transport for Tor. It encodes a data stream as a sequence of HTTPS requests and responses. Requests are reflect

Clair de Lune 1 Nov 9, 2021
A simple tool to detect WAN IP changes and updates your cloudflare DNS entries.

Cloudflare IP Updater A simple tool to detect WAN IP changes and updates your cloudflare DNS entries. Usage Create a new Cloudflare API token Copy the

AJ 12 Dec 1, 2022
dynflare is a tool to automatically update dns records at Cloudflare, when the ip changes.

dynflare dynflare is a tool to automatically update dns records at Cloudflare, when the ip changes. How it works The current ips are determined by ask

Lukas Dietrich 0 Dec 7, 2021
Converts NFAs (and DFAs) to a regular expressions using the state removal method.

nfa-to-regex: convert NFAs (and DFAs) to regular expressions An implementation of the state removal technique for converting an NFA to a regular expre

David Wolever 3 Apr 29, 2022
gRelay is an open source project written in Go that provides the circuit break pattern with a relay idea behind.

gRELAY gRelay is an open source project written in Go that provides: Circuit Break ✔️ Circuit Break + Relay ✔️ Concurrecny Safe ✔️ Getting start Insta

null 31 Sep 30, 2022
Go Domain Drived Design / Service repository pattern

Go Domain Drived Design / Service repository pattern Simple api domain drived design / service repository pattern API Overview every api (for each dom

Allan Nava 1 Nov 28, 2022