10990: Use AsyncStream to minimize store-and-forward latency
[arvados.git] / sdk / go / streamer / streamer.go
index 2217dd3352eae69255b74b4faa5a74425efca0ee..c8d012f76591470551e51dcb8faea87eea2aecea 100644 (file)
@@ -36,15 +36,20 @@ Alternately, if you already have a filled buffer and just want to read out from
 package streamer
 
 import (
+       "errors"
+       "fmt"
        "io"
 )
 
+var ErrAlreadyClosed = errors.New("cannot close a stream twice")
+
 type AsyncStream struct {
        buffer            []byte
        requests          chan sliceRequest
        add_reader        chan bool
        subtract_reader   chan bool
        wait_zero_readers chan bool
+       closed            bool
 }
 
 // Reads from the buffer managed by the Transfer()
@@ -55,7 +60,13 @@ type StreamReader struct {
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+       t := &AsyncStream{
+               buffer:            make([]byte, buffersize),
+               requests:          make(chan sliceRequest),
+               add_reader:        make(chan bool),
+               subtract_reader:   make(chan bool),
+               wait_zero_readers: make(chan bool),
+       }
 
        go t.transfer(source)
        go t.readersMonitor()
@@ -64,7 +75,13 @@ func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+       t := &AsyncStream{
+               buffer:            buf,
+               requests:          make(chan sliceRequest),
+               add_reader:        make(chan bool),
+               subtract_reader:   make(chan bool),
+               wait_zero_readers: make(chan bool),
+       }
 
        go t.transfer(nil)
        go t.readersMonitor()
@@ -115,16 +132,50 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 
 // Close the responses channel
 func (this *StreamReader) Close() error {
+       if this.stream == nil {
+               return ErrAlreadyClosed
+       }
        this.stream.subtract_reader <- true
        close(this.responses)
        this.stream = nil
        return nil
 }
 
-func (this *AsyncStream) Close() {
+func (this *AsyncStream) Close() error {
+       if this.closed {
+               return ErrAlreadyClosed
+       }
+       this.closed = true
        this.wait_zero_readers <- true
        close(this.requests)
        close(this.add_reader)
        close(this.subtract_reader)
        close(this.wait_zero_readers)
+       return nil
+}
+
+func (this *StreamReader) Seek(offset int64, whence int) (int64, error) {
+       var want int64
+       switch whence {
+       case io.SeekStart:
+               want = offset
+       case io.SeekCurrent:
+               want = int64(this.offset) + offset
+       case io.SeekEnd:
+               want = int64(this.Len()) + offset
+       default:
+               return int64(this.offset), fmt.Errorf("invalid whence %d", whence)
+       }
+       if want < 0 {
+               return int64(this.offset), fmt.Errorf("attempted seek to %d", want)
+       }
+       if want > int64(this.Len()) {
+               want = int64(this.Len())
+       }
+       this.offset = int(want)
+       return want, nil
+}
+
+func (this *StreamReader) Len() uint64 {
+       return uint64(len(this.stream.buffer))
 }