X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/df2a5463aa313dde8e0dba33c357e3eb4272ba35..cad7d333436703d48c2811de8a26caef9fc130ad:/sdk/go/streamer/streamer.go diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go index 187c800392..396e311038 100644 --- a/sdk/go/streamer/streamer.go +++ b/sdk/go/streamer/streamer.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + /* AsyncStream pulls data in from a io.Reader source (such as a file or network socket) and fans out to any number of StreamReader sinks. @@ -48,6 +52,7 @@ type AsyncStream struct { add_reader chan bool subtract_reader chan bool wait_zero_readers chan bool + closed bool } // Reads from the buffer managed by the Transfer() @@ -58,7 +63,13 @@ type StreamReader struct { } func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { - t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} + t := &AsyncStream{ + buffer: make([]byte, buffersize), + requests: make(chan sliceRequest), + add_reader: make(chan bool), + subtract_reader: make(chan bool), + wait_zero_readers: make(chan bool), + } go t.transfer(source) go t.readersMonitor() @@ -67,7 +78,13 @@ func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { } func AsyncStreamFromSlice(buf []byte) *AsyncStream { - t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} + t := &AsyncStream{ + buffer: buf, + requests: make(chan sliceRequest), + add_reader: make(chan bool), + subtract_reader: make(chan bool), + wait_zero_readers: make(chan bool), + } go t.transfer(nil) go t.readersMonitor() @@ -127,10 +144,15 @@ func (this *StreamReader) Close() error { return nil } -func (this *AsyncStream) Close() { +func (this *AsyncStream) Close() error { + if this.closed { + return ErrAlreadyClosed + } + this.closed = true this.wait_zero_readers <- true close(this.requests) close(this.add_reader) close(this.subtract_reader) close(this.wait_zero_readers) + return nil }