12475: Rewrite streamer -> asyncbuf.
[arvados.git] / sdk / go / streamer / streamer.go
diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
deleted file mode 100644 (file)
index 396e311..0000000
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
-  reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
-  reader.Close()
-
-When you're done with the stream:
-  stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
-  stream := AsyncStreamFromSlice(buf)
-
-  r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
-       "errors"
-       "io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
-       buffer            []byte
-       requests          chan sliceRequest
-       add_reader        chan bool
-       subtract_reader   chan bool
-       wait_zero_readers chan bool
-       closed            bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
-       offset    int
-       stream    *AsyncStream
-       responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       t := &AsyncStream{
-               buffer:            make([]byte, buffersize),
-               requests:          make(chan sliceRequest),
-               add_reader:        make(chan bool),
-               subtract_reader:   make(chan bool),
-               wait_zero_readers: make(chan bool),
-       }
-
-       go t.transfer(source)
-       go t.readersMonitor()
-
-       return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{
-               buffer:            buf,
-               requests:          make(chan sliceRequest),
-               add_reader:        make(chan bool),
-               subtract_reader:   make(chan bool),
-               wait_zero_readers: make(chan bool),
-       }
-
-       go t.transfer(nil)
-       go t.readersMonitor()
-
-       return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
-       this.add_reader <- true
-       return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
-       this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
-       rr, valid := <-this.responses
-       if valid {
-               this.offset += len(rr.slice)
-               return copy(p, rr.slice), rr.err
-       } else {
-               return 0, io.ErrUnexpectedEOF
-       }
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
-       // Record starting offset in order to correctly report the number of bytes sent
-       starting_offset := this.offset
-       for {
-               this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
-               rr, valid := <-this.responses
-               if valid {
-                       this.offset += len(rr.slice)
-                       if rr.err != nil {
-                               if rr.err == io.EOF {
-                                       // EOF is not an error.
-                                       return int64(this.offset - starting_offset), nil
-                               } else {
-                                       return int64(this.offset - starting_offset), rr.err
-                               }
-                       } else {
-                               dest.Write(rr.slice)
-                       }
-               } else {
-                       return int64(this.offset), io.ErrUnexpectedEOF
-               }
-       }
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
-       if this.stream == nil {
-               return ErrAlreadyClosed
-       }
-       this.stream.subtract_reader <- true
-       close(this.responses)
-       this.stream = nil
-       return nil
-}
-
-func (this *AsyncStream) Close() error {
-       if this.closed {
-               return ErrAlreadyClosed
-       }
-       this.closed = true
-       this.wait_zero_readers <- true
-       close(this.requests)
-       close(this.add_reader)
-       close(this.subtract_reader)
-       close(this.wait_zero_readers)
-       return nil
-}