X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f4ca9ad94a6bb006d1f3c7ba207837f1736d1247..cad7d333436703d48c2811de8a26caef9fc130ad:/sdk/go/streamer/streamer.go diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go index 2217dd3352..396e311038 100644 --- a/sdk/go/streamer/streamer.go +++ b/sdk/go/streamer/streamer.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + /* AsyncStream pulls data in from a io.Reader source (such as a file or network socket) and fans out to any number of StreamReader sinks. @@ -36,15 +40,19 @@ Alternately, if you already have a filled buffer and just want to read out from package streamer import ( + "errors" "io" ) +var ErrAlreadyClosed = errors.New("cannot close a stream twice") + type AsyncStream struct { buffer []byte requests chan sliceRequest add_reader chan bool subtract_reader chan bool wait_zero_readers chan bool + closed bool } // Reads from the buffer managed by the Transfer() @@ -55,7 +63,13 @@ type StreamReader struct { } func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { - t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} + t := &AsyncStream{ + buffer: make([]byte, buffersize), + requests: make(chan sliceRequest), + add_reader: make(chan bool), + subtract_reader: make(chan bool), + wait_zero_readers: make(chan bool), + } go t.transfer(source) go t.readersMonitor() @@ -64,7 +78,13 @@ func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { } func AsyncStreamFromSlice(buf []byte) *AsyncStream { - t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} + t := &AsyncStream{ + buffer: buf, + requests: make(chan sliceRequest), + add_reader: make(chan bool), + subtract_reader: make(chan bool), + wait_zero_readers: make(chan bool), + } go t.transfer(nil) go t.readersMonitor() @@ -115,16 +135,24 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) { // Close the responses channel func (this *StreamReader) Close() error { + if this.stream == nil { + return ErrAlreadyClosed + } this.stream.subtract_reader <- true close(this.responses) this.stream = nil return nil } -func (this *AsyncStream) Close() { +func (this *AsyncStream) Close() error { + if this.closed { + return ErrAlreadyClosed + } + this.closed = true this.wait_zero_readers <- true close(this.requests) close(this.add_reader) close(this.subtract_reader) close(this.wait_zero_readers) + return nil }