Apache AVRO for go

Overview

avro

Build Status codecov Go Report Card

The purpose of this package is to facilitate use of AVRO with go strong typing.

Features

github.com/khezen/avro

GoDoc

github.com/khezen/avro/sqlavro

GoDoc

github.com/khezen/avro/redshiftavro

GoDoc

What is AVRO

Apache AVRO is a data serialization system which relies on JSON schemas.

It provides:

  • Rich data structures
  • A compact, fast, binary data format
  • A container file, to store persistent data
  • Remote procedure call (RPC)

AVRO binary encoded data comes together with its schema and therefore is fully self-describing.

When AVRO data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small.

When AVRO data is stored in a file, its schema is stored with it, so that files may be processed later by any program. If the program reading the data expects a different schema this can be easily resolved, since both schemas are present.

Examples

Schema Marshal/Unmarshal

package main

import (
  "encoding/json"
  "fmt"

  "github.com/khezen/avro"
)

func main() {
  schemaBytes := []byte(
    `{
      "type": "record",
      "namespace": "test",
      "name": "LongList",
      "aliases": [
        "LinkedLongs"
      ],
      "doc": "linked list of 64 bits integers",
      "fields": [
        {
          "name": "value",
          "type": "long"
        },
        {
          "name": "next",
          "type": [
            "null",
            "LongList"
          ]
        }
      ]
    }`,
  )

  // Unmarshal JSON  bytes to Schema interface
  var anySchema avro.AnySchema
  err := json.Unmarshal(schemaBytes, &anySchema)
  if err != nil {
    panic(err)
  }
  schema := anySchema.Schema()  
  // Marshal Schema interface to JSON bytes
  schemaBytes, err = json.Marshal(schema)
  if err != nil {
    panic(err)
  }
  fmt.Println(string(schemaBytes))
}
{
    "type": "record",
    "namespace": "test",
    "name": "LongList",
    "aliases": [
        "LinkedLongs"
    ],
    "doc": "linked list of 64 bits integers",
    "fields": [
        {
            "name": "value",
            "type": "long"
        },
        {
            "name": "next",
            "type": [
                "null",
                "LongList"
            ]
        }
    ]
}

Convert SQL Table to AVRO Schema

package main
import (
  "database/sql"
  "encoding/json"
  "fmt"

  "github.com/khezen/avro/sqlavro"
)

func main() {
  db, err := sql.Open("mysql", "[email protected]/blog")
  if err != nil {
    panic(err)
  }
  defer db.Close()
  _, err = db.Exec(
    `CREATE TABLE posts(
      ID INT NOT NULL,
      title VARCHAR(128) NOT NULL,
      body LONGBLOB NOT NULL,
      content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
      post_date DATETIME NOT NULL,
      update_date DATETIME,
      reading_time_minutes DECIMAL(3,1),
      PRIMARY KEY(ID)
    )`,
  )
  if err != nil {
    panic(err)
  }
  schemas, err := sqlavro.SQLDatabase2AVRO(db, "blog")
  if err != nil {
    panic(err)
  }
  schemasBytes, err := json.Marshal(schemas)
  if err != nil {
    panic(err)
  }
  fmt.Println(string(schemasBytes))
}
[
    {
        "type": "record",
        "namespace": "blog",
        "name": "posts",
        "fields": [
            {
                "name": "ID",
                "type": "int"
            },
            {
                "name": "title",
                "type": "string"
            },
            {
                "name": "body",
                "type": "bytes"
            },
            {
                "name": "content_type",
                "type": [
                    "string",
                    "null"
                ],
                "default": "text/markdown; charset=UTF-8"
            },
            {
                "name": "post_date",
                "type": {
                    "type": "int",
                    "doc":"datetime",
                    "logicalType": "timestamp"
                }
            },
            {
                "name": "update_date",
                "type": [
                    "null",
                    {
                        "type": "int",
                        "doc":"datetime",
                        "logicalType": "timestamp"
                    }
                ]
            },
            {
                "name": "reading_time_minutes",
                "type": [
                    "null",
                    {
                        "type": "bytes",
                        "logicalType": "decimal",
                        "precision": 3,
                        "scale": 1
                    }
                ]
            }
        ]
    }
]

Query records from SQL into AVRO or CSV binary

package main

import (
	"database/sql"
	"fmt"
	"io/ioutil"
	"time"

	"github.com/khezen/avro"
	"github.com/khezen/avro/sqlavro"
)

func main() {
	db, err := sql.Open("mysql", "[email protected]/blog")
	if err != nil {
		panic(err)
	}
	defer db.Close()
	_, err = db.Exec(
		`CREATE TABLE posts(
			ID INT NOT NULL,
			title VARCHAR(128) NOT NULL,
			body LONGBLOB NOT NULL,
			content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
			post_date DATETIME NOT NULL,
			update_date DATETIME,
			reading_time_minutes DECIMAL(3,1),
			PRIMARY KEY(ID)
		)`,
	)
	if err != nil {
		panic(err)
	}
	_, err = db.Exec(
		// statement
		`INSERT INTO posts(ID,title,body,content_type,post_date,update_date,reading_time_minutes)
		 VALUES (?,?,?,?,?,?,?)`,
		// values
		42,
		"lorem ispum",
		[]byte(`Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.`),
		"text/markdown; charset=UTF-8",
		"2009-04-10 00:00:00",
		"2009-04-10 00:00:00",
		"4.2",
	)
	if err != nil {
		panic(err)
	}
	schema, err := sqlavro.SQLTable2AVRO(db, "blog", "posts")
	if err != nil {
		panic(err)
	}
	limit := 1000
	order := avro.Ascending
	from, err := time.Parse("2006-02-01 15:04:05", "2009-04-10 00:00:00")
	if err != nil {
		panic(err)
	}
	avroBytes, updatedCriteria, err := sqlavro.Query(sqlavro.QueryConfig{
		DB:     db,
		DBName: "blog",
		Schema: schema,
		Limit:  limit,
		Criteria: []sqlavro.Criterion{
			*sqlavro.NewCriterionDateTime("post_date", &from, order),
		},
		Output: "avro",
	})
	if err != nil {
		panic(err)
	}
	err = ioutil.WriteFile("/tmp/blog_posts.avro", avroBytes, 0644)
	if err != nil {
		panic(err)
	}
	fmt.Println(updatedCriteria)
}

Notes

  • When record fields contains aliases, the first alias is used in the query instead of the field name.

Types

Avro Go     SQL
null nil NULL
bytes []byte BLOB,MEDIUMBLOB,LONGBLOB
fixed []byte       CHAR,NCHAR
string,enum string VARCHAR, NVARCHAR,TEXT,TINYTEXT,MEDIUMTEXT,LONGTEXT,ENUM,SET
float float32 FLOAT
double float64 DOUBLE
long int64 BIGINT
int int32   TINYINT,SMALLINT,INT,YEAR
decimal *big.Rat DECIMAL
time int32 TIME
timestamp int32 TIMESTAMP,DATETIME
date time.Time DATE
array []interface{} N/A
map,record map[string]interface{} N/A
union see below    any type nullable

Because of encoding rules for Avro unions, when an union's value is null, a simple Go nil is returned. However when an union's value is non-nil, a Go map[string]interface{} with a single key is returned for the union. The map's single key is the Avro type name and its value is the datum's value.

Produce Redshift create statement from AVRO schema

package main

import (
	"encoding/json"
	"fmt"

	"github.com/khezen/avro"
	"github.com/khezen/avro/redshiftavro"
)

func main() {
	schemaBytes := []byte(`
	{
        "type": "record",
        "namespace": "blog",
        "name": "posts",
        "fields": [
            {
                "name": "ID",
                "type": "int"
            },
            {
                "name": "title",
                "type": "string"
            },
            {
                "name": "body",
                "type": "bytes"
            },
            {
                "name": "content_type",
                "type": [
                    "string",
                    "null"
                ],
                "default": "text/markdown; charset=UTF-8"
            },
            {
                "name": "post_date",
                "type": {
                    "type": "int",
                    "doc":"datetime",
                    "logicalType": "timestamp"
                }
            },
            {
                "name": "update_date",
                "type": [
                    "null",
                    {
                        "type": "int",
                        "doc":"datetime",
                        "logicalType": "timestamp"
                    }
                ]
            },
            {
                "name": "reading_time_minutes",
                "type": [
                    "null",
                    {
                        "type": "bytes",
                        "logicalType": "decimal",
                        "precision": 3,
                        "scale": 1
                    }
                ]
            }
        ]
	}`)
	var anySchema avro.AnySchema
	err := json.Unmarshal(schemaBytes, &anySchema)
	if err != nil {
		panic(err)
	}
	schema := anySchema.Schema().(*avro.RecordSchema)
	cfg := redshiftavro.CreateConfig{
		Schema:      *schema,
		SortKeys:    []string{"post_date", "title"},
		IfNotExists: true,
	}
	statement, err := redshiftavro.CreateTableStatement(cfg)
	if err != nil {
		panic(err)
	}
	fmt.Println(statement)
}
CREATE TABLE IF NOT EXISTS posts(
	ID INTEGER ENCODE LZO NOT NULL,
	title VARCHAR(65535) ENCODE RAW NOT NULL,
	body VARCHAR(65535) ENCODE ZSTD NOT NULL,
	content_type VARCHAR(65535) ENCODE ZSTD NULL,
	post_date TIMESTAMP WITHOUT TIME ZONE ENCODE RAW NOT NULL,
	update_date TIMESTAMP WITHOUT TIME ZONE ENCODE LZO NULL,
	reading_time_minutes DECIMAL(3,1) ENCODE RAW NULL
)
SORTKEY(
	post_date,
	title
)

Issues

If you have any problems or questions, please ask for help through a GitHub issue.

Contributions

Help is always welcome! For example, documentation (like the text you are reading now) can always use improvement. There's always code that can be improved. If you ever see something you think should be fixed, you should own it. If you have no idea what to start on, you can browse the issues labeled with help wanted.

As a potential contributor, your changes and ideas are welcome at any hour of the day or night, weekdays, weekends, and holidays. Please do not ever hesitate to ask a question or send a pull request.

Code of conduct.

You might also like...
Apache Pulsar Go Client Library

Apache Pulsar Go Client Library A Go client library for the Apache Pulsar project. Goal This projects is developing a pure-Go client library for Pulsa

The DataStax Kubernetes Operator for Apache Cassandra

Cass Operator The DataStax Kubernetes Operator for Apache Cassandra®. This repository replaces the old datastax/cass-operator for use-cases in the k8s

Apache H2 Database Go Driver

Apache H2 Database Go Driver This driver is VERY experimental state NOT use for production yet Introduction Apache H2 Database is a very-low footprint

CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Paster 服务端核心模块,使用字节跳动开源的微服务 RPC 框架 KiteX ,以 Apache Thrift 作为通信协议
Paster 服务端核心模块,使用字节跳动开源的微服务 RPC 框架 KiteX ,以 Apache Thrift 作为通信协议

paster_core Paster 服务端核心模块,使用字节跳动开源的微服务 RPC 框架 KiteX ,以 Apache Thrift 作为通信协议。 Todo: 实现 KiteX 服务注册扩展接口,使用 Consul 服务注册 新增 frame 层,通过 PreProcessor, PostP

franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Apache Traffic Control is an Open Source implementation of a Content Delivery Network

Apache Traffic Control Apache Traffic Control is an Open Source implementation of a Content Delivery Network. Documentation Intro CDN Basics Traffic C

Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

apache dubbo gateway,L7 proxy,virtual host,k8s ingress controller.
apache dubbo gateway,L7 proxy,virtual host,k8s ingress controller.

apache dubbo gateway,L7 proxy,virtual host,k8s ingress controller.

Command Line Tool for managing Apache Kafka

kafkactl A command-line interface for interaction with Apache Kafka | Features command auto-completion for bash, zsh, fish shell including dynamic com

A High Performance Object Storage released under Apache License
A High Performance Object Storage released under Apache License

MinIO Quickstart Guide MinIO is a High Performance Object Storage released under Apache License v2.0. It is API compatible with Amazon S3 cloud storag

Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Message relay written in golang for PostgreSQL and Apache Kafka

Message Relay Message relay written in golang for PostgreSQL and Apache Kafka Requirements Docker and Docker Compose Local installation and using dock

Traefik plugin to proxy requests to owasp/modsecurity-crs:apache container
Traefik plugin to proxy requests to owasp/modsecurity-crs:apache container

Traefik Modsecurity Plugin Traefik plugin to proxy requests to owasp/modsecurity-crs:apache Traefik Modsecurity Plugin Demo Full Configuration with do

NJ Meetup - Build an event-driven architecture with Apache Pulsar

Meetup-YourFirstEventDrivenApp NJ Meetup - Build an event-driven architecture with Apache Pulsar Code Along bin/pulsar-admin tenants create meetup bi

Apachedist-resource - A concourse resource to track updates of an apache distribution, e.g. tomcat

Apache Distribution Resource A concourse resource that can track information abo

The NiFiKop NiFi Kubernetes operator makes it easy to run Apache NiFi on Kubernetes.
The NiFiKop NiFi Kubernetes operator makes it easy to run Apache NiFi on Kubernetes.

The NiFiKop NiFi Kubernetes operator makes it easy to run Apache NiFi on Kubernetes. Apache NiFI is a free, open-source solution that support powerful and scalable directed graphs of data routing, transformation, and system mediation logic.

Comments
  • Fix default conversion logic

    Fix default conversion logic

    In the case of a Timestamp with a default of CURRENT_TIMESTAMP this default conversion was failing because the enclosing switch wasn't capturing Timestamp at all

    opened by adi 1
  • add support for logical types

    add support for logical types

    Logical Types

    A logical type is an Avro primitive or complex type with extra attributes to represent a derived type. The attribute logicalType must always be present for a logical type, and is a string with the name of one of the logical types listed later in this section. Other attributes may be defined for particular logical types.

    A logical type is always serialized using its underlying Avro type so that values are encoded in exactly the same way as the equivalent Avro type that does not have a logicalType attribute. Language implementations may choose to represent logical types with an appropriate native type, although this is not required.

    Language implementations must ignore unknown logical types when reading, and should use the underlying Avro type. If a logical type is invalid, for example a decimal with scale greater than its precision, then implementations should ignore the logical type and use the underlying Avro type.

    Decimal

    The decimal logical type represents an arbitrary-precision signed decimal number of the form unscaled × 10-scale.

    A decimal logical type annotates Avro bytes or fixed types. The byte array must contain the two's-complement representation of the unscaled integer value in big-endian byte order. The scale is fixed, and is specified using an attribute.

    The following attributes are supported:

    scale, a JSON integer representing the scale (optional). If not specified the scale is 0. precision, a JSON integer representing the (maximum) precision of decimals stored in this type (required). For example, the following schema represents decimal numbers with a maximum precision of 4 and a scale of 2:

    {
      "type": "bytes",
      "logicalType": "decimal",
      "precision": 4,
      "scale": 2
    }
    

    Precision must be a positive integer greater than zero. If the underlying type is a fixed, then the precision is limited by its size. An array of length n can store at most floor(log_10(28 × n - 1 - 1)) base-10 digits of precision.

    Scale must be zero or a positive integer less than or equal to the precision.

    For the purposes of schema resolution, two schemas that are decimal logical types match if their scales and precisions match.

    Date

    The date logical type represents a date within the calendar, with no reference to a particular time zone or time of day.

    A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, 1 January 1970 (ISO calendar).

    Time (millisecond precision)

    The time-millis logical type represents a time of day, with no reference to a particular calendar, time zone or date, with a precision of one millisecond.

    A time-millis logical type annotates an Avro int, where the int stores the number of milliseconds after midnight, 00:00:00.000.

    Time (microsecond precision)

    The time-micros logical type represents a time of day, with no reference to a particular calendar, time zone or date, with a precision of one microsecond.

    A time-micros logical type annotates an Avro long, where the long stores the number of microseconds after midnight, 00:00:00.000000.

    Timestamp (millisecond precision)

    The timestamp-millis logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one millisecond.

    A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC.

    Timestamp (microsecond precision)

    The timestamp-micros logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one microsecond.

    A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds from the unix epoch, 1 January 1970 00:00:00.000000 UTC.

    Duration

    The duration logical type represents an amount of time defined by a number of months, days and milliseconds. This is not equivalent to a number of milliseconds, because, depending on the moment in time from which the duration is measured, the number of days in the month and number of milliseconds in a day may differ. Other standard periods such as years, quarters, hours and minutes can be expressed through these basic periods.

    A duration logical type annotates Avro fixed type of size 12, which stores three little-endian unsigned integers that represent durations at different granularities of time. The first stores a number in months, the second stores a number in days, and the third stores a number in milliseconds.

    opened by khezen 0
Releases(v1.1.7)
Owner
Guillaume Simonneau
Opensource & knowledge sharing enthusiast.
Guillaume Simonneau
Fixed column file to avro/kafka

shredder shredds Fixed column file to avro/kafka . Implementation uses Avro schema and multicore Speed around 220mb/sec per Core using 4 core on a 1Gb

Rickard 3 Dec 20, 2021
k6 extension supporting avro textual and binary representations

xk6-avro This extension wraps the goavro library into a k6 extension. You can build the extension using: xk6 build --with github.com/xvzf/xk6-avro Exa

Matthias Riegler 0 Dec 8, 2021
Mirror of Apache Calcite - Avatica Go SQL Driver

Apache Avatica/Phoenix SQL Driver Apache Calcite's Avatica Go is a Go database/sql driver for the Avatica server. Avatica is a sub-project of Apache C

The Apache Software Foundation 103 Nov 3, 2022
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Confluent Inc. 3.7k Dec 30, 2022
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Shopify 9.5k Jan 1, 2023
Go(lang) client library for accessing information of an Apache Mesos cluster.

megos Go(lang) client library for accessing an Apache Mesos cluster. Features Determine the Mesos leader Get the current state of every mesos node (ma

Andy Grunwald 55 Sep 27, 2022
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

CloudHut 2.9k Jan 3, 2023
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Johannes Brüderl 1.8k Dec 31, 2022
Apache RocketMQ go client

RocketMQ Client Go A product ready RocketMQ Client in pure go, which supports almost the full features of Apache RocketMQ, such as pub and sub message

The Apache Software Foundation 1k Jan 4, 2023
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Black Square Media 1k Dec 28, 2022