X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b9d2799dfebae724dda3b3e28641116ca5daf5c7..318c49002aea966128a9d37ab29e601a104d79bb:/sdk/go/streamer/streamer.go diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go index 0c4d208348..a46ca4cc55 100644 --- a/sdk/go/streamer/streamer.go +++ b/sdk/go/streamer/streamer.go @@ -36,8 +36,8 @@ Alternately, if you already have a filled buffer and just want to read out from package streamer import ( - "io" "errors" + "io" ) var ErrAlreadyClosed = errors.New("cannot close a stream twice") @@ -48,6 +48,7 @@ type AsyncStream struct { add_reader chan bool subtract_reader chan bool wait_zero_readers chan bool + closed bool } // Reads from the buffer managed by the Transfer() @@ -58,7 +59,13 @@ type StreamReader struct { } func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { - t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} + t := &AsyncStream{ + buffer: make([]byte, buffersize), + requests: make(chan sliceRequest), + add_reader: make(chan bool), + subtract_reader: make(chan bool), + wait_zero_readers: make(chan bool), + } go t.transfer(source) go t.readersMonitor() @@ -67,7 +74,13 @@ func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { } func AsyncStreamFromSlice(buf []byte) *AsyncStream { - t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} + t := &AsyncStream{ + buffer: buf, + requests: make(chan sliceRequest), + add_reader: make(chan bool), + subtract_reader: make(chan bool), + wait_zero_readers: make(chan bool), + } go t.transfer(nil) go t.readersMonitor() @@ -127,10 +140,15 @@ func (this *StreamReader) Close() error { return nil } -func (this *AsyncStream) Close() { +func (this *AsyncStream) Close() error { + if this.closed { + return ErrAlreadyClosed + } + this.closed = true this.wait_zero_readers <- true close(this.requests) close(this.add_reader) close(this.subtract_reader) close(this.wait_zero_readers) + return nil }