Merge branch '10211-double-close-crash' closes #10211
authorTom Clegg <tom@curoverse.com>
Mon, 17 Oct 2016 12:50:57 +0000 (08:50 -0400)
committerTom Clegg <tom@curoverse.com>
Mon, 17 Oct 2016 12:50:57 +0000 (08:50 -0400)
sdk/go/streamer/streamer.go
sdk/go/streamer/streamer_test.go

index 187c800392257b0ade10cb2a837175a2a57437f6..a46ca4cc55aa5c3faa8ccdbe5b73339aacfa50ef 100644 (file)
@@ -48,6 +48,7 @@ type AsyncStream struct {
        add_reader        chan bool
        subtract_reader   chan bool
        wait_zero_readers chan bool
+       closed            bool
 }
 
 // Reads from the buffer managed by the Transfer()
@@ -58,7 +59,13 @@ type StreamReader struct {
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+       t := &AsyncStream{
+               buffer:            make([]byte, buffersize),
+               requests:          make(chan sliceRequest),
+               add_reader:        make(chan bool),
+               subtract_reader:   make(chan bool),
+               wait_zero_readers: make(chan bool),
+       }
 
        go t.transfer(source)
        go t.readersMonitor()
@@ -67,7 +74,13 @@ func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+       t := &AsyncStream{
+               buffer:            buf,
+               requests:          make(chan sliceRequest),
+               add_reader:        make(chan bool),
+               subtract_reader:   make(chan bool),
+               wait_zero_readers: make(chan bool),
+       }
 
        go t.transfer(nil)
        go t.readersMonitor()
@@ -127,10 +140,15 @@ func (this *StreamReader) Close() error {
        return nil
 }
 
-func (this *AsyncStream) Close() {
+func (this *AsyncStream) Close() error {
+       if this.closed {
+               return ErrAlreadyClosed
+       }
+       this.closed = true
        this.wait_zero_readers <- true
        close(this.requests)
        close(this.add_reader)
        close(this.subtract_reader)
        close(this.wait_zero_readers)
+       return nil
 }
index 80aeb268975d8acbc7f7d1c2e771c17e7adfa719..f5333c37c175be1774eab5318824bb018f47f31f 100644 (file)
@@ -365,3 +365,13 @@ func (s *StandaloneSuite) TestManyReaders(c *C) {
        writer.Write([]byte("baz"))
        writer.Close()
 }
+
+func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
+       buffer := make([]byte, 100)
+       tr := AsyncStreamFromSlice(buffer)
+       sr := tr.MakeStreamReader()
+       c.Check(sr.Close(), IsNil)
+       c.Check(sr.Close(), Equals, ErrAlreadyClosed)
+       c.Check(tr.Close(), IsNil)
+       c.Check(tr.Close(), Equals, ErrAlreadyClosed)
+}