Encore Messaging Proposal
Created: June 1, 2022 1:18 PM
Last Updated: June 10, 2022 1:52 PM
We’ve been looking into providing a cross-cloud encore abstraction for messaging. The feature set of messaging products across our supported cloud providers are somewhat different and the syntax is often overloaded, but the underlying semantics are relatively similar. The purpose of this proposal is to introduce an abstraction which is powerful enough to cover most use cases, while, where possible, remain cloud agnostic.
Proposed design
Overview
flowchart LR
Publisher1 --> Topic
Publisher2 --> Topic
Topic --> Subscription1
Topic --> Subscription2
Subscription1 -.-> DLQ1
Subscription2 -.-> DLQ2
Subscription1 --> Subscriber1
Subscription1 --> Subscriber2
Subscription2 --> Subscriber3
Messages
are published by publishers to a Topic
. The Topic
forwards messages to Subscriptions
where messages are queued until they’re consumed by a Subscriber
. All published messages will be forwarded to all subscriptions. Each message on a subscription will be received by one subscriber. If the subscriber returns an error, the message will be retried MaxRetries times. After MaxRetries, the message will be forwarded to an opt-out Dead-letter Queue (DLQ) which can be manually inspected by a system admin.
Message Definitions
Messages consist of attributes, a body and metadata.
Messages are defined using structs with field tags. Fields are marshalled as JSON and inserted into the Message body, unless they’re tagged with pubsub-attr
, in which case they’re added as Message Attributes
. The json
tag can be used as usual for marshalling hints.
type Email struct {
Language string `pubsub-attr:"lang"` // Also marshalled as msg attribute
Recepient string `pubsub-attr:"recepient"` // Also marshalled as msg attribute
Priority int `pubsub-attr:"prio"`
Subject string
Body string
Signature struct {
Name string
Company string
}
}
It’s also possible to publish raw messages by using the RawMessage
type. A RawMessage
has a []byte
body and a map of string
attributes.
var topic = pubsub.NewTopic[RawMessage]("rawtopic", nil)
func SendRawMessage(ctx context.Context) error {
msg := &RawMessage{
Attributes: map[string]string{
"header1": "value1",
"header2": "value2",
},
Body: []byte("This is a raw body")
}
return topic.Publish(ctx, msg)
}
encore.CurrentRequest()
is extended to include a MessageData
field which contains Message specific properties like PublishTime
, RetryAttempt
and MessageId
. Request.Type
is set to PubSub
if the current goroutine was initiated by a topic subscription.
func ConsumeMessage(ctx context.Context, email *Email) error {
req := encore.CurrentRequest()
if msg != nil {
fmt.Printf("The current message was published at: %v", req.MessageData.PublishTime)
}
}
Topics and Subscriptions
The Topic
is the core entity of the messaging infrastructure. Topics accept messages from publishers which are then forwarded to Subscriptions
. Subscriptions are message queues which store messages until they are consumed by one of their Subscribers
.
Topics
are created as infrastructure resources and used to publish messages and create subscriptions. If they are uniquely consumed by one service, they should be declared in the package of the service. Otherwise it’s recommended to declare the Topic
in a shared package. Topics should be typed using the Type Parameter.
Create a topic named emails
consisting of Email
messages like so:
var EmailTopic = pubsub.NewTopic[*Email]("emails", pubsub.TopicConfig{})
Delivery Guarantees
The Encore messaging API allows configuring a Delivery Guarantee per topic. The default DeliveryGuarantee
is AtLeastOnce
, meaning that each message on a Subscription
is guaranteed to be delivered and acknowledged by a Subscriber
at least once. The guarantee can be upgraded to ExactlyOnce
, which is useful for e.g. email delivery, but will typically incur an extra cost from your cloud provider and limit throughput. A Topic
can also be configured to guarantee ordered delivery of messages by setting Ordered
to true. This guarantees that all messages sharing the same grouping key (a message attribute defined by GroupingKey
) will be delivered in the order they were published.
Create a Topic
which delivers Email
messages exactly once in the order they were published for a recipient
like so:
var EmailTopic = pubsub.NewTopic[*Email]("emails", pubsub.TopicConfig{
DeliveryPolicy: pubsub.DeliveryPolicy{
DeliveryGuarantee: pubsub.AtLeastOnce,
Ordered: true,
GroupingKey: "recipient"
}
})
The default TopicConfig
is:
TopicConfig{
DeliveryPolicy{
DeliveryGuarantee: pubsub.AtLeastOnce,
Ordered: False
}
}
Publishing Messages
Messages are published by calling Publish
on a Topic
instance. Publish
returns an id of the newly published message, or an error signalling the publish was unsuccessful.
//encore:api public
func SendMessage(ctx context.Context, email *Email) error {
id, err := EmailTopic.Publish(ctx, email)
if err != nil {
return errors.Wrap(err, "failed to publish message")
}
fmt.Printf("Published a message with id: %s", id)
return nil
}
Consuming Messages
Message are consumed by Subscribers
. A Subscription
is created by using Topic.NewSubscription
. The Subscription
requires a subscription name
(automatically provisioned by encore), a Subscriber
handler and a SubscriptionConfig
. The subscription name
must be unique for thetopic
, otherwise Encore will throw a compile error.
Register a Subscriber
(and create a Subscription
named prio-email-sub
), subscribing to messages with an attribute named priority set to 1, like so:
var _ = EmailTopic.NewSubscription(
"email-sub",
EmailService.SendEmail, // func(EmailService, context.Context, *Email) error
pubsub.SubscriptionConfig{},
)
func (e *EmailService) SendEmails(ctx context.Context, email *Email) error {
err := e.client.Send(email)
return err
}
The default SubscriptionConfig
is:
SubscriptionConfig{
RetryPolicy{
MinRetryDelay: 5 * time.Second,
MaxRetryDelay: 1 * time.Minute,
MaxRetries: 5,
}
AckDeadline: 30 * time.Second,
MessageRetention: 7 * time.Days,
},
Acking, Nacking, Deadlettering
A message is automatically Acked
after a successful call to a subscriber. If a pubsub.UnrecoverableError
is returned (based on errors.Is
), the message will be forwarded directly to the deadletter topic. If any other error is returned, the message will be effectivelyNacked
with a backoff delay. The exact implementation of this will depend on the provider. For e.g. AWS which doesn’t support backoffs, Encore modifies the visibility timeout to simulate exponential backoff.
If a Message
is not processed within the SubscriptionConfig.AckDeadline
it will be returned to the Subscription
and be re-delivered to a Subscriber
. When the AckDeadline
expires, the context used in the subscriber call will also be cancelled.
If not consumed, Messages
will be kept on the Subscription
for the time specified bySubscriptionConfig.MessageRetention
.
Filtering
A Filter
can be defined for a Subscription
. The Filter
is a boolean expression operating on message Attributes
. Supported operators are =
, !=
,&&
and IN
.
Create a Subscription
with a Filter
selecting messages having a message Attribute
named prio
equalling 4
and lang
is either en
or es
like so:
var _ = EmailTopic.NewSubscription(
"email-sub",
EmailService.SendEmail,
SubscriptionConfig{
Filter: `prio = 4 && lang IN ["en", "es"]`
},
)
Error Handling
The RetryPolicy
is applied when a subscriber returns an error while processing a message. The policy will backoff retries exponentially and is bounded by the MinRetryDelay
and the MaxRetryDelay
. If MaxRetries
is exceeded, the message will be forwarded to a deadletter topic. Messages on this topic can be manually inspected and republished through the admin webapp.
Create a Subscription
which retries 10
times, starting with an initial delay of 5 seconds
and backing off to a maximum of 5 minute
before forwarding to the deadletter topic like so:
var _ = EmailTopic.NewSubscription(
"prio-email-sub",
EmailService.SendEmail,
SubscriptionConfig(
RetryPolicy: RetryPolicy{
MinRetryDelay: 5 * time.Second,
MaxRetryDelay: 5 * time.Minute,
MaxRetries: 10,
},
Filter: "prio=1"
),
)
A consumer can indicate that a message has an unrecoverable error by wrapping an error in pubsub.UnrecoverableError
. This will tell encore to forward the message directly to the deadletter topic (without retrying). The following Subscriber
will forward a message to the dead-letter topic without retry if the email recipient is invalid:
func (e *EmailService) SendEmail(ctx context.Context, email *Email) error {
err := e.client.Send(ctx, email)
if errors.Is(err, emails.InvalidAddress) {
return pubsub.UnrecoverableError(err)
}
return err
}
Cloud Provider Support
GCP implementation
The GCP implementation would be based on GCP Pub/Sub. The Encore abstraction maps pretty well to the GCP implementation and we’re using similar language. GCP does however not support Ordered
messages combined with ExactlyOnce
delivery.
AWS implementation
The AWS implementation would be built on top of SNS and SQS. Messages would be published to an SNS topic, and a SQS would be created and attached to the Topic for each subscription. ExactlyOnce and Ordering is implemented through FIFO Topics and Queues. Retry backoff is implemented by modifying the visibility timeout of a message.
Azure implementation
The Azure implementation will be built on top of Service Bus Topics and Subscriptions. The Encore syntax matches the Azure functionality quite well. We’ll use FIFO subscriptions for OrderedDelivery and Duplicate Detection for ExactlyOnce delivery. Retry backoff is implemented by extending the lock for a Message.
API Outline
package pubsub
// RawMessage is a special struct used to publish a raw
// slice of bytes in the message body. Attributes can be used
// to set custom message attributes
type RawMessage struct {
Attributes map[string]string
Body []byte
}
// UnrecoverableError can be returned by a subscriber to
// immediately drop or forward a messsage to a deadletter topic
func UnrecoverableError(err error) UnrecoverableError
// Subscriber is a function reference
// The signature must be `func(context.Context, msg M) error` where M is either the
// message type of the topic or RawMessage
type Subscriber[T any] func(ctx context.Context, msg T) error
type Subscription[T any] interface {}
// Topic is the entry point for published messages
type Topic[T any] interface{
// NewSubscription adds a handler to incoming messages on a topic.
// name is a unique name for the subscription and sub is a reference to a subscription handler.
// The SubscriptionConfig is used to configure filters, retries, etc.
NewSubscription(name string, sub Subscriber, cfg *SubscriptionConfig) *Subscription[T]
// Publish publishes messages. The result is the message id
// or an error signalling a publishing failure
Publish(ctx context.Context, msg T) (id string, err error)
}
type RetryPolicy struct {
// If MaxRetryDelay >= MinRetryDelay the first retry will be
// delayed MinRetryDelayed and then following retries will backoff
// exponentially until reaching the MaxRetryDelay
MinRetryDelay time.Duration
MaxRetryDelay time.Duration
// MaxRetries is used to control deadletter queuing logic
// n = -1 does not create a DLQ and retries infinitely
// n >=0 creates a DLQ and forwards a message to it after n retries
MaxRetries int
}
// DeliveryGuarantee is used to configure the delivery contract for a topic
type DeliveryGuarantee int
const (
// AtLeastOnce guarantees that a message for a subscription is delivered to
// a subscriber at least once
AtLeastOnce DeliveryGuarantee = iota
// ExactlyOnce guarantees that a message for a subscription is delivered to
// a subscriber exactly once
ExactlyOnce
)
// DeliveryPolicy configures how messages are deliverd from Topics
type DeliveryPolicy struct {
// DeliveryGuarantee is used to configure the delivery guarantee of a Topic
DeliveryGuarantee DeliveryGuarantee
// Ordered should be set to true if messages should grouped by GroupingKey and
// be delivered in the order they were published
Ordered bool
// GroupingKey is the name of the message attribute used to
// partition messages into groups with guaranteed ordered
// delivery.
// GroupingKey must be set if `Ordered` is true
GroupingKey string
}
// TopicConfig is used when creating a Topic
type TopicConfig struct {
// AWS does not support mixing FIFO SNS with standard SQS.
// Therefore, if one subscription is Ordered/OrderedExactlyOnce,
// all others must be too.
DeliveryPolicy *DeliveryPolicy
}
// SubscriptionConfig is used when creating a subscription
type SubscriptionConfig struct {
// Filter is a boolean expresson using =, !=, IN, &&
// It is used to filter which messages are forwarded from the
// topic to a subscription
Filter string
// AckDeadline is the time a consumer has to process a message
// before it's returned to the subscription
AckDeadline time.Duration
// MessageRetention is how long an undelivered message is kept
// on the topic before it's purged
MessageRetention time.Duration
// RetryPolicy defines how a message should be retried when
// the subscriber returns an error
RetryPolicy *RetryPolicy
}
// NewTopic is used to declare a Topic. Encore will use static
// analysis to identify Topics and automatically provision them
// for you.
func NewTopic[T any](name string, cfg ...*TopicConfig) *Topic[T]
package encore
// Request is the existing metadata entity returned by
// encore.CurrentRequest()
type Request struct {
... // Previous fields
// MessageData contains fields specific to Messages
MessageData *MessageData
}
type MessageData struct {
// ID is the unique ID of the message assigned by the messaging service
ID string
// PublishTime is the time the message was first published
PublishTime time.Time
// DeliveryAttempt is a counter for how many times the messages
// has been attempted to be delivered
DeliveryAttempt int
// Error returned last time the message was attempted to be delivered
LastError error
}
Complete Example
type Email struct {
Id string `pubsub-attr:"id" json:"-"`
From string
To string `pubsub-attr:"to"`
Subject string
Body string
}
type EmailEvent struct {
Id string `pubsub-attr:"id"`
Type string `pubsub-attr:"type"`
}
emails := pubsub.NewTopic[*Email]("emails", &TopicConfig{})
events := pubsub.NewTopic[*EmailEvent]("events", &TopicConfig{})
var _ = emails.NewSubscription(
"email-sub",
SendEmail, // Function generated by Encore
SubscriptionConfig(
DeliveryPolicy: DeliveryPolicy {
DeliveryGuarantee: pubsub.OrderedExactlyOnce,
GroupingKey: "to",
}
))
var _ = events.NewSubscription(
"event-sub",
StoreEmailFailures, // Function generated by Encore
SubscriptionConfig(
Filter: pubsub.Filter(`
(type == "retry" && id <= 10000) ||
(type == "failed" && id > 10000)`),
),
)
type EmailService struct {
emailClient EmailClient
storageClient StorageClient
}
func (s *EmailService) SendEmail(ctx context.Context, msg *Email) error {
if err := s.emailClient.Send(msg.Body); err != nil {
log.Error(err).Msg("failed to send email")
err = events.Publish(ctx, &EmailEvent(msg.Body.Id, "failed"))
return pubsub.UnrecoverableError(err)
}
return events.Publish(ctx, &EmailEvent(msg.Body.Id, "success"))
}
func (s *EmailService) ScheduleEmail(ctx context.Context, email *Email) error {
return emails.Publish(ctx, email)
}
func (s *EmailService) StoreEmailFailures(ctx context.Context, msg *EmailEvent) error {
err := s.storageClient.Store(event)
if err != nil {
return err
}
return nil
}
proposal