unbounded chan

Related tags

Network chanx
Overview

chanx

Unbounded chan.

License GoDoc travis Go Report Card coveralls

Refer to the below articles and issues:

  1. https://github.com/golang/go/issues/20352
  2. https://stackoverflow.com/questions/41906146/why-go-channels-limit-the-buffer-size
  3. https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd
  4. https://erikwinter.nl/articles/2020/channel-with-infinite-buffer-in-golang/

Usage

ch := NewUnboundedChan(1000)

go func() {
    ...
    ch.In <- ... // send values
    ...

    close(ch.In) // close In channel
}()


for v := range ch.Out { // read values
    fmt.Println(v)
}
Comments
  • data race in Len() and BufLen()

    data race in Len() and BufLen()

    Consider the following test:

    func TestDataRace(t *testing.T) {
    	ch := NewUnboundedChan(1)
    	stop := make(chan bool)
    	for i := 0; i < 100; i++ { // may tweak the number of iterations
    		go func() {
    			for {
    				select {
    				case <-stop:
    					return
    				default:
    					ch.In <- 42
    					<-ch.Out
    				}
    			}
    		}()
    	}
    
    	for i := 0; i < 10000; i++ { // may tweak the number of iterations
    		ch.Len()
    	}
    	close(stop)
    }
    

    The above test results in the following data race:

    $ go test -run=DataRace -race
    ==================
    WARNING: DATA RACE
    Read at 0x00c0001309a8 by goroutine 7:
      github.com/smallnest/chanx.(*RingBuffer).Len()
          /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:110 +0x118
      github.com/smallnest/chanx.UnboundedChan.Len()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
      github.com/smallnest/chanx.TestChanDataRace()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
      testing.tRunner()
          /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
    
    Previous write at 0x00c0001309a8 by goroutine 8:
      github.com/smallnest/chanx.(*RingBuffer).Read()
          /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:42 +0x12c
      github.com/smallnest/chanx.(*RingBuffer).Pop()
          /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:51 +0x6c
      github.com/smallnest/chanx.process()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:72 +0x460
    
    Goroutine 7 (running) created at:
      testing.(*T).Run()
          /Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
      testing.runTests.func1()
          /Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
      testing.tRunner()
          /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
      testing.runTests()
          /Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
      testing.(*M).Run()
          /Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
      main.main()
          _testmain.go:55 +0x288
    
    Goroutine 8 (running) created at:
      github.com/smallnest/chanx.NewUnboundedChanSize()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
      github.com/smallnest/chanx.NewUnboundedChan()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
      github.com/smallnest/chanx.TestChanDataRace()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
      testing.tRunner()
          /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
    ==================
    ==================
    WARNING: DATA RACE
    Read at 0x00c0001309b0 by goroutine 7:
      github.com/smallnest/chanx.(*RingBuffer).Len()
          /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:110 +0x134
      github.com/smallnest/chanx.UnboundedChan.Len()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
      github.com/smallnest/chanx.TestChanDataRace()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
      testing.tRunner()
          /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
    
    Previous write at 0x00c0001309b0 by goroutine 8:
      github.com/smallnest/chanx.(*RingBuffer).Write()
          /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:70 +0xec
      github.com/smallnest/chanx.process()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:69 +0x44c
    
    Goroutine 7 (running) created at:
      testing.(*T).Run()
          /Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
      testing.runTests.func1()
          /Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
      testing.tRunner()
          /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
      testing.runTests()
          /Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
      testing.(*M).Run()
          /Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
      main.main()
          _testmain.go:55 +0x288
    
    Goroutine 8 (running) created at:
      github.com/smallnest/chanx.NewUnboundedChanSize()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
      github.com/smallnest/chanx.NewUnboundedChan()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
      github.com/smallnest/chanx.TestChanDataRace()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
      testing.tRunner()
          /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
    ==================
    ==================
    WARNING: DATA RACE
    Read at 0x00c0001309a0 by goroutine 7:
      github.com/smallnest/chanx.(*RingBuffer).Len()
          /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:118 +0x1ac
      github.com/smallnest/chanx.UnboundedChan.Len()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
      github.com/smallnest/chanx.TestChanDataRace()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
      testing.tRunner()
          /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
    
    Previous write at 0x00c0001309a0 by goroutine 8:
      github.com/smallnest/chanx.(*RingBuffer).grow()
          /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:96 +0x3bc
      github.com/smallnest/chanx.(*RingBuffer).Write()
          /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:77 +0x18c
      github.com/smallnest/chanx.process()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:69 +0x44c
    
    Goroutine 7 (running) created at:
      testing.(*T).Run()
          /Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
      testing.runTests.func1()
          /Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
      testing.tRunner()
          /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
      testing.runTests()
          /Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
      testing.(*M).Run()
          /Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
      main.main()
          _testmain.go:55 +0x288
    
    Goroutine 8 (running) created at:
      github.com/smallnest/chanx.NewUnboundedChanSize()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
      github.com/smallnest/chanx.NewUnboundedChan()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
      github.com/smallnest/chanx.TestChanDataRace()
          /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
      testing.tRunner()
          /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
    ==================
    --- FAIL: TestChanDataRace (0.01s)
        testing.go:1135: race detected during execution of test
    FAIL
    exit status 1
    FAIL    github.com/smallnest/chanx      0.194s
    

    I think providing a Len() method in abstracting an unbounded channel needs more careful handling (Mutex or lock-free pattern via an external atomic counter similar to the runtime channel implementation). The self-created internal buffer for an unbounded channel is maintained in a separate goroutine, a read via len() creates a race condition with any writes with respect to that buffer.

    bug 
    opened by changkun 6
  • 当初始大小为1时,报错

    当初始大小为1时,报错

    package main
    
    import (
    	"fmt"
    	"github.com/smallnest/chanx"
    	"time"
    )
    
    func main() {
    	ch := chanx.NewUnboundedChan(1)
    
    	go func() {
    		for i := 0; i < 100; i++ {
    			ch.In <- i
    		}
    		fmt.Println("push ok")
    		close(ch.In) // close In channel
    	}()
    
    	time.Sleep(1 * time.Second)
    	for v := range ch.Out { // read values
    		fmt.Println(v)
    	}
    }
    

    报错

    panic: runtime error: index out of range [1] with length 1
    
    goroutine 6 [running]:
    github.com/smallnest/chanx.(*RingBuffer).Write(0xc000072040, 0x10b04e0, 0x1158910)
    	/Users/vearne/Documents/gopath/pkg/mod/github.com/smallnest/[email protected]/ringbuffer.go:61 +0x235
    github.com/smallnest/chanx.NewUnboundedChan.func1(0xc00004c1e0, 0xc00004c180, 0xc00004c180, 0xc00004c1e0, 0xc000072040)
    	/Users/vearne/Documents/gopath/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:59 +0x233
    created by github.com/smallnest/chanx.NewUnboundedChan
    	/Users/vearne/Documents/gopath/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:35 +0x112
    exit status 2
    
    opened by vearne 1
  • RingBuffer的Pop方法是不是可以调用Read来实现呢?

    RingBuffer的Pop方法是不是可以调用Read来实现呢?

    // from
    func (r *RingBuffer) Read() (T, error) {
    	if r.r == r.w {
    		return nil, ErrIsEmpty
    	}
    
    	v := r.buf[r.r]
    	r.r++
    	if r.r == r.size {
    		r.r = 0
    	}
    
    	return v, nil
    }
    
    func (r *RingBuffer) Pop() T {
    	if r.r == r.w { // Empty
    		panic(ErrIsEmpty.Error())
    	}
    
    	v := r.buf[r.r]
    	r.r++
    	if r.r == r.size {
    		r.r = 0
    	}
    
    	return v
    }
    
    // to
    func (r *RingBuffer) Pop() T {
            v, err := r.Read()
    	if err == ErrIsEmpty { // Empty
    		panic(ErrIsEmpty.Error())
    	}
    
    	return v
    }
    
    opened by donnol 1
  • potential goroutine leaks

    potential goroutine leaks

    ref: https://github.com/tikv/pd/actions/runs/3776529907/jobs/6419849603

    2022-12-25T16:09:45.9555131Z  Goroutine 179341 in state chan receive, with github.com/smallnest/chanx.process[...] on top of the stack:
    2022-12-25T16:09:45.9555252Z goroutine 179341 [chan receive]:
    2022-12-25T16:09:45.9555427Z github.com/smallnest/chanx.process[...](0xc0880dccc0, 0xc0880dcd80, 0xc08872fb40)
    2022-12-25T16:09:45.9555654Z 	/home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:55 +0xd8
    2022-12-25T16:09:45.9555842Z created by github.com/smallnest/chanx.NewUnboundedChanSize[...]
    2022-12-25T16:09:45.9556065Z 	/home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:46 +0x41e
    2022-12-25T16:09:45.9556071Z 
    2022-12-25T16:09:45.9556329Z  Goroutine 179778 in state chan receive, with github.com/smallnest/chanx.process[...] on top of the stack:
    2022-12-25T16:09:45.9556442Z goroutine 179778 [chan receive]:
    2022-12-25T16:09:45.9556690Z github.com/smallnest/chanx.process[...](0xc08a5f0cc0, 0xc08a5f0d20, 0xc08c56a1e0)
    2022-12-25T16:09:45.9556921Z 	/home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:55 +0xd8
    2022-12-25T16:09:45.9557096Z created by github.com/smallnest/chanx.NewUnboundedChanSize[...]
    2022-12-25T16:09:45.9557323Z 	/home/runner/go/pkg/mod/github.com/smallnest/[email protected]/unbounded_chan.go:46 +0x41e
    2022-12-25T16:09:45.9557397Z ]
    
    opened by lhy1024 0
  • Add Cancel method/Done channel

    Add Cancel method/Done channel

    Although it is feasible to rely on close(ch.In) to cancel the process, it is necessary to ensure that close(ch.In) must be executed after ch.In<-v when using it. It is inconvenient, because the two may be in different goroutine. So I suggest providing additional channels to handle this.

    opened by molon 0
  • Add Cancel method and Done chanel

    Add Cancel method and Done chanel

    Although it is feasible to rely on close(ch.In) to cancel the process, it is necessary to ensure that close(ch.In) must be executed after ch.In<-v when using it. It is inconvenient, because the two may be in different goroutine. So I suggest providing additional channels to handle this.

    opened by molon 0
Owner
smallnest
Author of 《Scala Collections Cookbook》
smallnest
Probabilistic data structures for processing continuous, unbounded streams.

Boom Filters Boom Filters are probabilistic data structures for processing continuous, unbounded streams. This includes Stable Bloom Filters, Scalable

Tyler Treat 1.5k Dec 30, 2022
a unified representation of buffered, unbuffered, and unbounded channels in Go

chann a unified representation of buffered, unbuffered, and unbounded channels in Go import "golang.design/x/chann" This package requires Go 1.18. Us

The golang.design Initiative 23 Oct 31, 2022
Package reservoir samples values uniformly at random from an unbounded sequence of inputs

Package reservoir samples values uniformly at random from an unbounded sequence of inputs

null 1 Oct 5, 2022