2798: Tracks opening and closing of readers, will block closing AsyncStream
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 21 May 2014 02:23:18 +0000 (22:23 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 21 May 2014 02:23:18 +0000 (22:23 -0400)
until all readers are closed.  Additionally, will block if too many readers are
created.

sdk/go/src/arvados.org/streamer/streamer.go
sdk/go/src/arvados.org/streamer/streamer_test.go
sdk/go/src/arvados.org/streamer/transfer.go

index 0b8755fe4042070e1b3cac6b74f9adba272a6979..ba49fb341a9e0a923039a38f64d1a5ed32fc040d 100644 (file)
@@ -28,42 +28,48 @@ import (
 )
 
 type AsyncStream struct {
-       requests      chan readRequest
-       Reader_status chan error
+       requests          chan readRequest
+       add_reader        chan bool
+       subtract_reader   chan bool
+       wait_zero_readers chan bool
+       Reader_status     chan error
 }
 
 // Reads from the buffer managed by the Transfer()
 type StreamReader struct {
        offset    int
-       requests  chan<- readRequest
+       stream    *AsyncStream
        responses chan readResult
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
        buf := make([]byte, buffersize)
 
-       t := &AsyncStream{make(chan readRequest), make(chan error)}
+       t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
 
        go transfer(buf, source, t.requests, t.Reader_status)
+       go t.readersMonitor()
 
        return t
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{make(chan readRequest), nil}
+       t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
 
        go transfer(buf, nil, t.requests, nil)
+       go t.readersMonitor()
 
        return t
 }
 
 func (this *AsyncStream) MakeStreamReader() *StreamReader {
-       return &StreamReader{0, this.requests, make(chan readResult)}
+       this.add_reader <- true
+       return &StreamReader{0, this, make(chan readResult)}
 }
 
 // Reads from the buffer managed by the Transfer()
 func (this *StreamReader) Read(p []byte) (n int, err error) {
-       this.requests <- readRequest{this.offset, len(p), this.responses}
+       this.stream.requests <- readRequest{this.offset, len(p), this.responses}
        rr, valid := <-this.responses
        if valid {
                this.offset += len(rr.slice)
@@ -77,7 +83,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
        // Record starting offset in order to correctly report the number of bytes sent
        starting_offset := this.offset
        for {
-               this.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+               this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
                rr, valid := <-this.responses
                if valid {
                        this.offset += len(rr.slice)
@@ -99,12 +105,17 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 
 // Close the responses channel
 func (this *StreamReader) Close() error {
+       this.stream.subtract_reader <- true
        close(this.responses)
        return nil
 }
 
 func (this *AsyncStream) Close() {
+       this.wait_zero_readers <- true
        close(this.requests)
+       close(this.add_reader)
+       close(this.subtract_reader)
+       close(this.wait_zero_readers)
        if this.Reader_status != nil {
                close(this.Reader_status)
        }
index 9e24cfb52b26a4ba44aa5cdace4ce6a8f86a759a..33f84b809623f4e48c3f4144aa6dcc8b105af004 100644 (file)
@@ -329,8 +329,10 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
        }
 
        tr := AsyncStreamFromSlice(buffer)
+       defer tr.Close()
 
        br1 := tr.MakeStreamReader()
+       defer br1.Close()
 
        reader, writer := io.Pipe()
 
index ab8f941af77760620dcf25e7bd6cacb09e1dd1f3..77242f13a6a7553de402c20741730c0674153686 100644 (file)
@@ -2,9 +2,10 @@ package streamer
 
 import (
        "io"
-       "log"
 )
 
+const MAX_READERS = 100
+
 // A slice passed from readIntoBuffer() to transfer()
 type readerSlice struct {
        slice        []byte
@@ -102,7 +103,6 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
 // Handle a read request.  Returns true if a response was sent, and false if
 // the request should be queued.
 func handleReadRequest(req readRequest, body []byte, complete bool) bool {
-       log.Printf("HandlereadRequest %d %d %d", req.offset, req.maxsize, len(body))
        if req.offset < len(body) {
                var end int
                if req.offset+req.maxsize < len(body) {
@@ -218,3 +218,49 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
                }
        }
 }
+
+func (this *AsyncStream) readersMonitor() {
+       var readers int = 0
+
+       for {
+               if readers == 0 {
+                       select {
+                       case _, ok := <-this.wait_zero_readers:
+                               if ok {
+                                       // nothing, just implicitly unblock the sender
+                               } else {
+                                       return
+                               }
+                       case _, ok := <-this.add_reader:
+                               if ok {
+                                       readers += 1
+                               } else {
+                                       return
+                               }
+                       }
+               } else if readers > 0 && readers < MAX_READERS {
+                       select {
+                       case _, ok := <-this.add_reader:
+                               if ok {
+                                       readers += 1
+                               } else {
+                                       return
+                               }
+
+                       case _, ok := <-this.subtract_reader:
+                               if ok {
+                                       readers -= 1
+                               } else {
+                                       return
+                               }
+                       }
+               } else if readers == MAX_READERS {
+                       _, ok := <-this.subtract_reader
+                       if ok {
+                               readers -= 1
+                       } else {
+                               return
+                       }
+               }
+       }
+}