Persistent stacks and queues for Go backed by LevelDB

Related tags

Database goque
Overview

Goque GoDoc License Go Report Card Build Status

Goque provides embedded, disk-based implementations of stack and queue data structures.

Motivation for creating this project was the need for a persistent priority queue that remained performant while growing well beyond the available memory of a given machine. While there are many packages for Go offering queues, they all seem to be memory based and/or standalone solutions that are not embeddable within an application.

Instead of using an in-memory heap structure to store data, everything is stored using the Go port of LevelDB. This results in very little memory being used no matter the size of the database, while read and write performance remains near constant.

Features

  • Provides stack (LIFO), queue (FIFO), priority queue, and prefix queue structures.
  • Stacks and queues (but not priority queues or prefix queues) are interchangeable.
  • Persistent, disk-based.
  • Optimized for fast inserts and reads.
  • Goroutine safe.
  • Designed to work with large datasets outside of RAM/memory.

Installation

Fetch the package from GitHub:

go get github.com/beeker1121/goque

Import to your project:

import "github.com/beeker1121/goque"

Usage

Stack

Stack is a LIFO (last in, first out) data structure.

Create or open a stack:

s, err := goque.OpenStack("data_dir")
...
defer s.Close()

Push an item:

item, err := s.Push([]byte("item value"))
// or
item, err := s.PushString("item value")
// or
item, err := s.PushObject(Object{X:1})
// or
item, err := s.PushObjectAsJSON(Object{X:1})

Pop an item:

item, err := s.Pop()
...
fmt.Println(item.ID)         // 1
fmt.Println(item.Key)        // [0 0 0 0 0 0 0 1]
fmt.Println(item.Value)      // [105 116 101 109 32 118 97 108 117 101]
fmt.Println(item.ToString()) // item value

// Decode to object.
var obj Object
err := item.ToObject(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

// Decode to object from JSON.
var obj Object
err := item.ToObjectFromJSON(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

Peek the next stack item:

item, err := s.Peek()
// or
item, err := s.PeekByOffset(1)
// or
item, err := s.PeekByID(1)

Update an item in the stack:

item, err := s.Update(1, []byte("new value"))
// or
item, err := s.UpdateString(1, "new value")
// or
item, err := s.UpdateObject(1, Object{X:2})
// or
item, err := s.UpdateObjectAsJSON(1, Object{X:2})

Delete the stack and underlying database:

s.Drop()

Queue

Queue is a FIFO (first in, first out) data structure.

Methods

Create or open a queue:

q, err := goque.OpenQueue("data_dir")
...
defer q.Close()

Enqueue an item:

item, err := q.Enqueue([]byte("item value"))
// or
item, err := q.EnqueueString("item value")
// or
item, err := q.EnqueueObject(Object{X:1})
// or
item, err := q.EnqueueObjectAsJSON(Object{X:1})

Dequeue an item:

item, err := q.Dequeue()
...
fmt.Println(item.ID)         // 1
fmt.Println(item.Key)        // [0 0 0 0 0 0 0 1]
fmt.Println(item.Value)      // [105 116 101 109 32 118 97 108 117 101]
fmt.Println(item.ToString()) // item value

// Decode to object.
var obj Object
err := item.ToObject(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

// Decode to object from JSON.
var obj Object
err := item.ToObjectFromJSON(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

Peek the next queue item:

item, err := q.Peek()
// or
item, err := q.PeekByOffset(1)
// or
item, err := q.PeekByID(1)

Update an item in the queue:

item, err := q.Update(1, []byte("new value"))
// or
item, err := q.UpdateString(1, "new value")
// or
item, err := q.UpdateObject(1, Object{X:2})
// or
item, err := q.UpdateObjectAsJSON(1, Object{X:2})

Delete the queue and underlying database:

q.Drop()

Priority Queue

PriorityQueue is a FIFO (first in, first out) queue with priority levels.

Methods

Create or open a priority queue:

pq, err := goque.OpenPriorityQueue("data_dir", goque.ASC)
...
defer pq.Close()

Enqueue an item:

item, err := pq.Enqueue(0, []byte("item value"))
// or
item, err := pq.EnqueueString(0, "item value")
// or
item, err := pq.EnqueueObject(0, Object{X:1})
// or
item, err := pq.EnqueueObjectAsJSON(0, Object{X:1})

Dequeue an item:

item, err := pq.Dequeue()
// or
item, err := pq.DequeueByPriority(0)
...
fmt.Println(item.ID)         // 1
fmt.Println(item.Priority)   // 0
fmt.Println(item.Key)        // [0 58 0 0 0 0 0 0 0 1]
fmt.Println(item.Value)      // [105 116 101 109 32 118 97 108 117 101]
fmt.Println(item.ToString()) // item value

// Decode to object.
var obj Object
err := item.ToObject(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

// Decode to object from JSON.
var obj Object
err := item.ToObjectFromJSON(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

Peek the next priority queue item:

item, err := pq.Peek()
// or
item, err := pq.PeekByOffset(1)
// or
item, err := pq.PeekByPriorityID(0, 1)

Update an item in the priority queue:

item, err := pq.Update(0, 1, []byte("new value"))
// or
item, err := pq.UpdateString(0, 1, "new value")
// or
item, err := pq.UpdateObject(0, 1, Object{X:2})
// or
item, err := pq.UpdateObjectAsJSON(0, 1, Object{X:2})

Delete the priority queue and underlying database:

pq.Drop()

Prefix Queue

PrefixQueue is a FIFO (first in, first out) data structure that separates each given prefix into its own queue.

Methods

Create or open a prefix queue:

pq, err := goque.OpenPrefixQueue("data_dir")
...
defer pq.Close()

Enqueue an item:

item, err := pq.Enqueue([]byte("prefix"), []byte("item value"))
// or
item, err := pq.EnqueueString("prefix", "item value")
// or
item, err := pq.EnqueueObject([]byte("prefix"), Object{X:1})
// or
item, err := pq.EnqueueObjectAsJSON([]byte("prefix"), Object{X:1})

Dequeue an item:

item, err := pq.Dequeue([]byte("prefix"))
// or
item, err := pq.DequeueString("prefix")
...
fmt.Println(item.ID)         // 1
fmt.Println(item.Key)        // [112 114 101 102 105 120 0 0 0 0 0 0 0 0 1]
fmt.Println(item.Value)      // [105 116 101 109 32 118 97 108 117 101]
fmt.Println(item.ToString()) // item value

// Decode to object.
var obj Object
err := item.ToObject(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

// Decode to object from JSON.
var obj Object
err := item.ToObjectFromJSON(&obj)
...
fmt.Printf("%+v\n", obj) // {X:1}

Peek the next prefix queue item:

item, err := pq.Peek([]byte("prefix"))
// or
item, err := pq.PeekString("prefix")
// or
item, err := pq.PeekByID([]byte("prefix"), 1)
// or
item, err := pq.PeekByIDString("prefix", 1)

Update an item in the prefix queue:

item, err := pq.Update([]byte("prefix"), 1, []byte("new value"))
// or
item, err := pq.UpdateString("prefix", 1, "new value")
// or
item, err := pq.UpdateObject([]byte("prefix"), 1, Object{X:2})
// or
item, err := pq.UpdateObjectAsJSON([]byte("prefix"), 1, Object{X:2})

Delete the prefix queue and underlying database:

pq.Drop()

Benchmarks

Benchmarks were ran on a Google Compute Engine n1-standard-1 machine (1 vCPU 3.75 GB of RAM):

Go 1.6:

$ go test -bench=.
PASS
BenchmarkPriorityQueueEnqueue     200000              8104 ns/op             522 B/op          7 allocs/op
BenchmarkPriorityQueueDequeue     200000             18622 ns/op            1166 B/op         17 allocs/op
BenchmarkQueueEnqueue             200000              8049 ns/op             487 B/op          7 allocs/op
BenchmarkQueueDequeue             200000             18970 ns/op            1089 B/op         17 allocs/op
BenchmarkStackPush                200000              8145 ns/op             487 B/op          7 allocs/op
BenchmarkStackPop                 200000             18947 ns/op            1097 B/op         17 allocs/op
ok      github.com/beeker1121/goque     22.549s

Go 1.8:

$ go test -bench=.
BenchmarkPrefixQueueEnqueue        20000             60553 ns/op           10532 B/op        242 allocs/op
BenchmarkPrefixQueueDequeue        10000            100727 ns/op           18519 B/op        444 allocs/op
BenchmarkPriorityQueueEnqueue     300000              4781 ns/op             557 B/op          9 allocs/op
BenchmarkPriorityQueueDequeue     200000             11656 ns/op            1206 B/op         19 allocs/op
BenchmarkQueueEnqueue             300000              4625 ns/op             513 B/op          9 allocs/op
BenchmarkQueueDequeue             200000             11537 ns/op            1125 B/op         19 allocs/op
BenchmarkStackPush                300000              4631 ns/op             513 B/op          9 allocs/op
BenchmarkStackPop                 200000              9629 ns/op            1116 B/op         19 allocs/op
PASS
ok      github.com/beeker1121/goque     18.135s

Thanks

syndtr (https://github.com/syndtr) - LevelDB port to Go
bogdanovich (https://github.com/bogdanovich/siberite) - Server based queue for Go using LevelDB
connor4312 (https://github.com/connor4312) - Recommending BoltDB/LevelDB, helping with structure
bwmarrin (https://github.com/bwmarrin) - Recommending BoltDB/LevelDB
zeroZshadow (https://github.com/zeroZshadow) - Code review and optimization
nstafie (https://github.com/nstafie) - Help with structure

Comments
  • Reclaiming disk space

    Reclaiming disk space

    Would it be possible to make goque to reclaim disk space? The algorithm could be that if queue is empty and size of database file is larger than the initial extent (or a configurable limit) then delete it and re-create.

    The motivation for this feature is that we want to use this queue for persisting some telemetry data when network connection goes down. The amount of data is quite substantial but these event do not occur often and application must stay online 24/7/365. So, the problem is that one outage can use a lot of disk space that is never returned back. Once system operates normally the queue will be very shallow.

    The application may use few dozen queues at the same time but outages do not occur at the same time so waste of storage will multiply and can easily take system down if left unattended.

    I suppose we could work around this problem by recreating the queues from outside but it would be nicer to have this feature in goque. Would it be possible to add this feature?

    opened by dtoubelis 4
  • Does not make a distinction between pointer-nil and value-zero

    Does not make a distinction between pointer-nil and value-zero

    if an object contains a pointer and is written to a queue (did only test it with priority queues) then reading the object from the queue can result in a different object when the dereferenced pointer has a zero value. In that case the pointer itself is nil instead of a pointer to a zero-value.

    See the output of the following program:

    package main

    import ( "fmt" "github.com/beeker1121/goque" "log" )

    type Obj struct { A *int B int }

    func (o Obj) String() string { if o.A != nil { return fmt.Sprintf("%v %v %v", o.A, *o.A, o.B) } else { return fmt.Sprintf("%v %v", o.A, o.B) } }

    func main() { queue, err := goque.OpenPriorityQueue("queuetest", goque.ASC) if err != nil { log.Fatal("OpenPriorityQueue()", err) }

    o1 := Obj{new(int), 1}
    *o1.A = 0 // not necessary
        // *o1.A = 10 // this will work
        // o1.A = nil // will work, too
    
    log.Println("Writing to queue:", o1)
    
    _, err = queue.EnqueueObject(0, o1)
    if err != nil {
        log.Println("EnqueueObject()", o1, err)
    }
    
    item, err := queue.Dequeue()
    if err != nil {
        log.Fatal("Dequeue()", err)
    }
    o2 := Obj{}
    err = item.ToObject(&o2)
    if err != nil {
        log.Fatal("ToObject()", err)
    }
    
    log.Println("Read from queue:", o2)
    
    queue.Drop()
    

    }

    opened by wngmnheiko 4
  • PriorityQueue's Length method returns wrong(?) number of elements

    PriorityQueue's Length method returns wrong(?) number of elements

    When a PriorityQueue is already closed or dropped the Length method still returns the number of elements available before close/drop.

    Maybe 0 or an error would be a better return value in this case.

    opened by wngmnheiko 4
  • concurrent access to LevelDB in goque

    concurrent access to LevelDB in goque

    Hi,

    I'm working on a restful microservice and using PrefixQueue.

    pq, err := goque.OpenPrefixQueue(DATADIR) gives error on concurrent access. That is, it fails on trying to access the db on concurrent API calls(or goroutines).

    error message- "resource temporarily unavailable" PS - I'm closing the db after using it.

    Is there any workaround for this? I need to be able to access the DB concurrently

    opened by sp98 3
  • Change module path

    Change module path

    It is better to change following

    https://github.com/golang/go/wiki/Modules#semantic-import-versioning

    If the module is version v2 or higher, the major version of the module must be included as a /vN at the end of the module paths used in go.mod files

    Currently, an error occurs when trying to download with go mod download

    $ go mod download
    go: finding github.com/beeker1121/goque v2.1.1+incompatible
    go: finding github.com/beeker1121/goque v2.1.1+incompatible
    github.com/beeker1121/[email protected]+incompatible: invalid version: +incompatible suffix not allowed: module contains a go.mod file, so semantic import versioning is required
    
    opened by t-asaka 1
  • Not decoding dynamic nested structs

    Not decoding dynamic nested structs

    I've to store a struct type in the queue. I don't have much control on this struct type and it can be a nested json data that I get from a request. For example

    {
    "name": "string",
      "quota": {
        "compute": {},
        "storage": {}
      },
      "regime": "string",
      "request_id": "test",
      "requestor_id": "string",
      "service_group": "string",
      "service_resource_id": "string",
      "tags": [
        "string"
      ],
      "update_uri": "string"
    }
    

    So if I try to add this struct (of type map[string]interface{}) as a prefix queue using pq.EnqueueObject([]byte(k), v) Then its not getting added to the queue.

    I think gob module is not able to decode it.

    Is there any way to be able to store this data ?

    opened by sp98 1
  • Changing the API

    Changing the API

    I'm thinking about changing the API of goque so it's a bit more idiomatic. I wanted to see what people would think of the change, if it's a good idea or not.

    The API change would consist of taking out the Item object as a parameter to the Enqueue, Push, and Update methods. Instead, the value itself you wish to enqueue/push/update will be passed as the parameter, which then returns a new Item.

    For example, instead of: func (q *Queue) Enqueue(item *Item) error {

    the method would be changed to: func (q *Queue) Enqueue(value []byte) (*Item, error) {

    The prior API would remain tagged as v1.0.2 with a legacy branch created and README.md updated, notifying everyone of the changes.

    The Update function was bothering me, as there's no definition of what the Item object passed to it should contain. Do you think this API change would be a more idiomatic approach?

    enhancement 
    opened by beeker1121 1
  • BUG: Update method did not check for item presence

    BUG: Update method did not check for item presence

    The Update method in stacks and queues did not check if the item actually existed within the stack or queue. Commit 303be53f551f1c2aede7b4b09fb4e2e569ce2d25 fixes this.

    opened by beeker1121 0
  • OpenPrefixQueue should return opened object on failure

    OpenPrefixQueue should return opened object on failure

    If checkGoqueType fails within OpenPrefixQueue, there is no way to close the already-constructed object. On Windows, the file lock held will prevent deletion of the path.

    Other types (queue, stack, priority queue) return an object, so prefix queue should, as well.

    opened by philipatl 0
  • Accessing queue via RPC

    Accessing queue via RPC

    I'm looking for exactly this but need to use it with RPC for other processes to access the queue from a daemon. I was just wondering if there's any way of getting a queue item and then confirming it's been stored before deleting it? Just in case the other process crashes before processing the item...

    Thanks!

    opened by abemedia 2
  • Add /v2 path component to module name.

    Add /v2 path component to module name.

    go mod assumes that major versions greater than v1 come with a change to the import path. See https://blog.golang.org/v2-go-modules, and see https://github.com/go-gorp/gorp/releases for an example of another package that adopted Go modules and then had to fix up the module name in order to work.

    This allows other packages that use go mod to import goque.

    opened by jsha 1
  • Fixed race condition

    Fixed race condition

    I have one goroutine that enqueues, and another that dequeues.

    When running my tests I have a race condition. The changes I made should solve the problem.

    Thank you for your work.

    opened by telemac 0
Releases(v2.1.1)
Owner
null
levigo is a Go wrapper for LevelDB

levigo levigo is a Go wrapper for LevelDB. The API has been godoc'ed and is available on the web. Questions answered at [email protected].

Jeff Hodges 412 Jan 5, 2023
a key-value store with multiple backends including leveldb, badgerdb, postgresql

Overview goukv is an abstraction layer for golang based key-value stores, it is easy to add any backend provider. Available Providers badgerdb: Badger

Mohammed Al Ashaal 52 Jan 5, 2023
A simple, fast, embeddable, persistent key/value store written in pure Go. It supports fully serializable transactions and many data structures such as list, set, sorted set.

NutsDB English | 简体中文 NutsDB is a simple, fast, embeddable and persistent key/value store written in pure Go. It supports fully serializable transacti

徐佳军 2.7k Jan 1, 2023
BadgerDB is an embeddable, persistent and fast key-value (KV) database written in pure Go

BadgerDB BadgerDB is an embeddable, persistent and fast key-value (KV) database written in pure Go. It is the underlying database for Dgraph, a fast,

Blizard 1 Dec 10, 2021
A RESTful caching micro-service in Go backed by Couchbase

Couchcache A caching service developed in Go. It provides REST APIs to access key-value pairs stored in Couchbase. You may also consider using couchca

Jerry Zhao 58 Sep 26, 2022
A disk-backed key-value store.

What is diskv? Diskv (disk-vee) is a simple, persistent key-value store written in the Go language. It starts with an incredibly simple API for storin

Peter Bourgon 1.2k Jan 7, 2023
Nipo is a powerful, fast, multi-thread, clustered and in-memory key-value database, with ability to configure token and acl on commands and key-regexes written by GO

Welcome to NIPO Nipo is a powerful, fast, multi-thread, clustered and in-memory key-value database, with ability to configure token and acl on command

Morteza Bashsiz 17 Dec 28, 2022
Owl is a db manager platform,committed to standardizing the data, index in the database and operations to the database, to avoid risks and failures.

Owl is a db manager platform,committed to standardizing the data, index in the database and operations to the database, to avoid risks and failures. capabilities which owl provides include Process approval、sql Audit、sql execute and execute as crontab、data backup and recover .

null 34 Nov 9, 2022
Being played at The Coffee House and try to find and play it on Spotify

The Coffee House Muzik Follow the music that is being played at The Coffee House and try to find and play it on Spotify. Installation Clone this proje

SangND 4 May 25, 2022
Walrus - Fast, Secure and Reliable System Backup, Set up in Minutes.

Walrus is a fast, secure and reliable backup system suitable for modern infrastructure. With walrus, you can backup services like MySQL, PostgreSQL, Redis, etcd or a complete directory with a short interval and low overhead. It supports AWS S3, digitalocean spaces and any S3-compatible object storage service.

Ahmed 452 Jan 5, 2023
🔑A high performance Key/Value store written in Go with a predictable read/write performance and high throughput. Uses a Bitcask on-disk layout (LSM+WAL) similar to Riak.

bitcask A high performance Key/Value store written in Go with a predictable read/write performance and high throughput. Uses a Bitcask on-disk layout

James Mills 10 Sep 26, 2022
BuntDB is an embeddable, in-memory key/value database for Go with custom indexing and geospatial support

BuntDB is a low-level, in-memory, key/value store in pure Go. It persists to disk, is ACID compliant, and uses locking for multiple readers and a sing

Josh Baker 4k Dec 30, 2022
Concurrency-safe Go caching library with expiration capabilities and access counters

cache2go Concurrency-safe golang caching library with expiration capabilities. Installation Make sure you have a working Go environment (Go 1.2 or hig

Christian Muehlhaeuser 1.9k Dec 31, 2022
groupcache is a caching and cache-filling library, intended as a replacement for memcached in many cases.

groupcache Summary groupcache is a distributed caching and cache-filling library, intended as a replacement for a pool of memcached nodes in many case

Go 11.9k Dec 29, 2022
Scalable datastore for metrics, events, and real-time analytics

InfluxDB InfluxDB is an open source time series platform. This includes APIs for storing and querying data, processing it in the background for ETL or

InfluxData 24.7k Jan 5, 2023
Kivik provides a common interface to CouchDB or CouchDB-like databases for Go and GopherJS.

Kivik Package kivik provides a common interface to CouchDB or CouchDB-like databases. The kivik package must be used in conjunction with a database dr

Kivik 252 Dec 29, 2022
The Prometheus monitoring system and time series database.

Prometheus Visit prometheus.io for the full documentation, examples and guides. Prometheus, a Cloud Native Computing Foundation project, is a systems

Prometheus 46.1k Dec 31, 2022
Fast and simple key/value store written using Go's standard library

Table of Contents Description Usage Cookbook Disadvantages Motivation Benchmarks Test 1 Test 4 Description Package pudge is a fast and simple key/valu

Vadim Kulibaba 331 Nov 17, 2022
Fast specialized time-series database for IoT, real-time internet connected devices and AI analytics.

unitdb Unitdb is blazing fast specialized time-series database for microservices, IoT, and realtime internet connected devices. As Unitdb satisfy the

Saffat Technologies 100 Jan 1, 2023