X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/244159419c42341baeb388236ad29cc546b7eca1..1b16fe46cccf4af94349ddad730b5c7f3bc03718:/sdk/go/streamer/streamer.go diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go index 2217dd3352..c8d012f765 100644 --- a/sdk/go/streamer/streamer.go +++ b/sdk/go/streamer/streamer.go @@ -36,15 +36,20 @@ Alternately, if you already have a filled buffer and just want to read out from package streamer import ( + "errors" + "fmt" "io" ) +var ErrAlreadyClosed = errors.New("cannot close a stream twice") + type AsyncStream struct { buffer []byte requests chan sliceRequest add_reader chan bool subtract_reader chan bool wait_zero_readers chan bool + closed bool } // Reads from the buffer managed by the Transfer() @@ -55,7 +60,13 @@ type StreamReader struct { } func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { - t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} + t := &AsyncStream{ + buffer: make([]byte, buffersize), + requests: make(chan sliceRequest), + add_reader: make(chan bool), + subtract_reader: make(chan bool), + wait_zero_readers: make(chan bool), + } go t.transfer(source) go t.readersMonitor() @@ -64,7 +75,13 @@ func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { } func AsyncStreamFromSlice(buf []byte) *AsyncStream { - t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)} + t := &AsyncStream{ + buffer: buf, + requests: make(chan sliceRequest), + add_reader: make(chan bool), + subtract_reader: make(chan bool), + wait_zero_readers: make(chan bool), + } go t.transfer(nil) go t.readersMonitor() @@ -115,16 +132,50 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) { // Close the responses channel func (this *StreamReader) Close() error { + if this.stream == nil { + return ErrAlreadyClosed + } this.stream.subtract_reader <- true close(this.responses) this.stream = nil return nil } -func (this *AsyncStream) Close() { +func (this *AsyncStream) Close() error { + if this.closed { + return ErrAlreadyClosed + } + this.closed = true this.wait_zero_readers <- true close(this.requests) close(this.add_reader) close(this.subtract_reader) close(this.wait_zero_readers) + return nil +} + +func (this *StreamReader) Seek(offset int64, whence int) (int64, error) { + var want int64 + switch whence { + case io.SeekStart: + want = offset + case io.SeekCurrent: + want = int64(this.offset) + offset + case io.SeekEnd: + want = int64(this.Len()) + offset + default: + return int64(this.offset), fmt.Errorf("invalid whence %d", whence) + } + if want < 0 { + return int64(this.offset), fmt.Errorf("attempted seek to %d", want) + } + if want > int64(this.Len()) { + want = int64(this.Len()) + } + this.offset = int(want) + return want, nil +} + +func (this *StreamReader) Len() uint64 { + return uint64(len(this.stream.buffer)) }