12475: Rewrite streamer -> asyncbuf.
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 23 Nov 2017 04:11:43 +0000 (23:11 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 23 Nov 2017 06:27:10 +0000 (01:27 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

build/run-tests.sh
sdk/go/asyncbuf/buf.go [new file with mode: 0644]
sdk/go/asyncbuf/buf_test.go [new file with mode: 0644]
sdk/go/keepclient/hashcheck.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go
sdk/go/streamer/streamer.go [deleted file]
sdk/go/streamer/streamer_test.go [deleted file]
sdk/go/streamer/transfer.go [deleted file]

index 433685c5a86d9cf2cc61b884e21c16d91a67e932..3cfc692aaec5bc1f595b9333fa6be9179164f200 100755 (executable)
@@ -100,7 +100,7 @@ sdk/go/health
 sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
-sdk/go/streamer
+sdk/go/asyncbuf
 sdk/go/stats
 sdk/go/crunchrunner
 sdk/cwl
@@ -829,7 +829,7 @@ gostuff=(
     sdk/go/health
     sdk/go/httpserver
     sdk/go/manifest
-    sdk/go/streamer
+    sdk/go/asyncbuf
     sdk/go/crunchrunner
     sdk/go/stats
     lib/crunchstat
diff --git a/sdk/go/asyncbuf/buf.go b/sdk/go/asyncbuf/buf.go
new file mode 100644 (file)
index 0000000..b3b9bf2
--- /dev/null
@@ -0,0 +1,101 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+       "bytes"
+       "io"
+       "sync"
+)
+
+// A Buffer is an io.Writer that distributes written data
+// asynchronously to multiple concurrent readers.
+//
+// NewReader() can be called at any time. In all cases, every returned
+// io.Reader reads all data written to the Buffer.
+//
+// Behavior is undefined if Write is called after Close or
+// CloseWithError.
+type Buffer interface {
+       io.WriteCloser
+
+       // NewReader() returns an io.Reader that reads all data
+       // written to the Buffer.
+       NewReader() io.Reader
+
+       // Close, but return the given error (instead of io.EOF) to
+       // all readers when they reach the end of the buffer.
+       //
+       // CloseWithError(nil) is equivalent to
+       // CloseWithError(io.EOF).
+       CloseWithError(error) error
+}
+
+type buffer struct {
+       data *bytes.Buffer
+       cond sync.Cond
+       err  error // nil if there might be more writes
+}
+
+// NewBuffer creates a new Buffer using buf as its initial
+// contents. The new Buffer takes ownership of buf, and the caller
+// should not use buf after this call.
+func NewBuffer(buf []byte) Buffer {
+       return &buffer{
+               data: bytes.NewBuffer(buf),
+               cond: sync.Cond{L: &sync.Mutex{}},
+       }
+}
+
+func (b *buffer) Write(p []byte) (int, error) {
+       defer b.cond.Broadcast()
+       b.cond.L.Lock()
+       defer b.cond.L.Unlock()
+       if b.err != nil {
+               return 0, b.err
+       }
+       return b.data.Write(p)
+}
+
+func (b *buffer) Close() error {
+       return b.CloseWithError(nil)
+}
+
+func (b *buffer) CloseWithError(err error) error {
+       defer b.cond.Broadcast()
+       b.cond.L.Lock()
+       defer b.cond.L.Unlock()
+       if err == nil {
+               b.err = io.EOF
+       } else {
+               b.err = err
+       }
+       return nil
+}
+
+func (b *buffer) NewReader() io.Reader {
+       return &reader{b: b}
+}
+
+type reader struct {
+       b    *buffer
+       read int // # bytes already read
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+       r.b.cond.L.Lock()
+       defer r.b.cond.L.Unlock()
+       for {
+               if r.b.data.Len() > r.read || len(p) == 0 {
+                       n := copy(p, r.b.data.Bytes()[r.read:])
+                       r.read += n
+                       return n, nil
+               }
+               if r.b.err != nil {
+                       return 0, r.b.err
+               }
+               r.b.cond.Wait()
+       }
+}
diff --git a/sdk/go/asyncbuf/buf_test.go b/sdk/go/asyncbuf/buf_test.go
new file mode 100644 (file)
index 0000000..845853b
--- /dev/null
@@ -0,0 +1,159 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+       "crypto/md5"
+       "errors"
+       "io"
+       "io/ioutil"
+       "math/rand"
+       "testing"
+       "time"
+
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestNoWrites(c *check.C) {
+       b := NewBuffer(nil)
+       r1 := b.NewReader()
+       r2 := b.NewReader()
+       b.Close()
+       s.checkReader(c, r1, []byte{}, nil, nil)
+       s.checkReader(c, r2, []byte{}, nil, nil)
+}
+
+func (s *Suite) TestNoReaders(c *check.C) {
+       b := NewBuffer(nil)
+       n, err := b.Write([]byte("foobar"))
+       err2 := b.Close()
+       c.Check(n, check.Equals, 6)
+       c.Check(err, check.IsNil)
+       c.Check(err2, check.IsNil)
+}
+
+func (s *Suite) TestWriteReadClose(c *check.C) {
+       done := make(chan bool, 2)
+       b := NewBuffer(nil)
+       n, err := b.Write([]byte("foobar"))
+       c.Check(n, check.Equals, 6)
+       c.Check(err, check.IsNil)
+       r1 := b.NewReader()
+       r2 := b.NewReader()
+       go s.checkReader(c, r1, []byte("foobar"), nil, done)
+       go s.checkReader(c, r2, []byte("foobar"), nil, done)
+       time.Sleep(time.Millisecond)
+       c.Check(len(done), check.Equals, 0)
+       b.Close()
+       <-done
+       <-done
+}
+
+func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
+       done := make(chan bool, 2)
+       b := NewBuffer([]byte("baz"))
+       n, err := b.Write([]byte("waz"))
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+       b.Close()
+       r1 := b.NewReader()
+       go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
+       r2 := b.NewReader()
+       go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
+       <-done
+       <-done
+}
+
+func (s *Suite) TestWriteReadCloseRead(c *check.C) {
+       done := make(chan bool, 1)
+       b := NewBuffer(nil)
+       r1 := b.NewReader()
+       go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
+
+       b.Write([]byte("bazwaz"))
+
+       r2 := b.NewReader()
+       r2.Read(make([]byte, 3))
+
+       b.Write([]byte("qux"))
+       b.Close()
+
+       s.checkReader(c, r2, []byte("wazqux"), nil, nil)
+       <-done
+}
+
+func (s *Suite) TestCloseWithError(c *check.C) {
+       errFake := errors.New("it's not even a real error")
+
+       done := make(chan bool, 1)
+       b := NewBuffer(nil)
+       r1 := b.NewReader()
+       go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
+
+       b.Write([]byte("bazwaz"))
+
+       r2 := b.NewReader()
+       r2.Read(make([]byte, 3))
+
+       b.Write([]byte("qux"))
+       b.CloseWithError(errFake)
+
+       s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
+       <-done
+}
+
+// Write n*n bytes, n at a time; read them into n goroutines using
+// varying buffer sizes; compare checksums.
+func (s *Suite) TestManyReaders(c *check.C) {
+       const n = 256
+
+       b := NewBuffer(nil)
+
+       expectSum := make(chan []byte)
+       go func() {
+               hash := md5.New()
+               buf := make([]byte, n)
+               for i := 0; i < n; i++ {
+                       time.Sleep(10 * time.Nanosecond)
+                       rand.Read(buf)
+                       b.Write(buf)
+                       hash.Write(buf)
+               }
+               expectSum <- hash.Sum(nil)
+               b.Close()
+       }()
+
+       gotSum := make(chan []byte)
+       for i := 0; i < n; i++ {
+               go func(bufSize int) {
+                       got := md5.New()
+                       io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
+                       gotSum <- got.Sum(nil)
+               }(i + n/2)
+       }
+
+       expect := <-expectSum
+       for i := 0; i < n; i++ {
+               c.Check(expect, check.DeepEquals, <-gotSum)
+       }
+}
+
+func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
+       buf, err := ioutil.ReadAll(r)
+       c.Check(err, check.Equals, expectError)
+       c.Check(buf, check.DeepEquals, expectData)
+       if done != nil {
+               done <- true
+       }
+}
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
index 726b81362ca6d4e9b1db43f6f8677111039911b3..9295c14cc24a47cd38479e19a8aa57dc91c1c42a 100644 (file)
@@ -72,19 +72,16 @@ func (this HashCheckingReader) Close() (err error) {
        _, err = io.Copy(this.Hash, this.Reader)
 
        if closer, ok := this.Reader.(io.Closer); ok {
-               err2 := closer.Close()
-               if err2 != nil && err == nil {
-                       return err2
+               closeErr := closer.Close()
+               if err == nil {
+                       err = closeErr
                }
        }
        if err != nil {
                return err
        }
-
-       sum := this.Hash.Sum(nil)
-       if fmt.Sprintf("%x", sum) != this.Check {
-               err = BadChecksum
+       if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check {
+               return BadChecksum
        }
-
-       return err
+       return nil
 }
index cbfad8177da775337bf2b528a99ff9a0757cbaa0..37d651e31fbd971defa3217d6c11e883c4a073cc 100644 (file)
@@ -21,7 +21,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
+       "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
 )
 
 // A Keep "block" is 64MB.
@@ -156,10 +156,12 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
                bufsize = BLOCKSIZE
        }
 
-       t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
-       defer t.Close()
-
-       return kc.putReplicas(hash, t, dataBytes)
+       buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+       go func() {
+               _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+               buf.CloseWithError(err)
+       }()
+       return kc.putReplicas(hash, buf.NewReader, dataBytes)
 }
 
 // PutHB writes a block to Keep. The hash of the bytes is given in
@@ -167,9 +169,8 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
 //
 // Return values are the same as for PutHR.
 func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
-       t := streamer.AsyncStreamFromSlice(buf)
-       defer t.Close()
-       return kc.putReplicas(hash, t, int64(len(buf)))
+       newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+       return kc.putReplicas(hash, newReader, int64(len(buf)))
 }
 
 // PutB writes a block to Keep. It computes the hash itself.
index 3ce4e7425aa273e2753ddcaab516a86ecd34d59a..055141cbe88165cbe93a79d903d0b69a3731c672 100644 (file)
@@ -5,6 +5,7 @@
 package keepclient
 
 import (
+       "bytes"
        "crypto/md5"
        "errors"
        "fmt"
@@ -20,7 +21,6 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
        . "gopkg.in/check.v1"
 )
 
@@ -172,18 +172,8 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                make(chan string)}
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, reader io.ReadCloser,
-                       writer io.WriteCloser, upload_status chan uploadStatus) {
-
-                       tr := streamer.AsyncStreamFromReader(512, reader)
-                       defer tr.Close()
-
-                       br1 := tr.MakeStreamReader()
-
-                       go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
-                       writer.Write([]byte("foo"))
-                       writer.Close()
+               func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+                       go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0)
 
                        <-st.handled
 
index 49ef543d872f94d169c2e76b422cffee76ef86ed..37912506a2cb6ab7c014a0edac13e922c20526d6 100644 (file)
@@ -17,7 +17,6 @@ import (
        "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
 )
 
 // Function used to emit debug messages. The easiest way to enable
@@ -57,7 +56,7 @@ type uploadStatus struct {
        response        string
 }
 
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
        upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
 
        var req *http.Request
@@ -66,21 +65,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
                DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
                upload_status <- uploadStatus{err, url, 0, 0, ""}
-               body.Close()
                return
        }
 
        req.ContentLength = expectedLength
        if expectedLength > 0 {
-               // Do() will close the body ReadCloser when it is done
-               // with it.
-               req.Body = body
+               req.Body = ioutil.NopCloser(body)
        } else {
-               // "For client requests, a value of 0 means unknown if Body is
-               // not nil."  In this case we do want the body to be empty, so
-               // don't set req.Body.  However, we still need to close the
-               // body ReadCloser.
-               body.Close()
+               // "For client requests, a value of 0 means unknown if
+               // Body is not nil."  In this case we do want the body
+               // to be empty, so don't set req.Body.
        }
 
        req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
@@ -121,7 +115,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
 
 func (this *KeepClient) putReplicas(
        hash string,
-       tr *streamer.AsyncStream,
+       getReader func() io.Reader,
        expectedLength int64) (locator string, replicas int, err error) {
 
        // Generate an arbitrary ID to identify this specific
@@ -174,7 +168,7 @@ func (this *KeepClient) putReplicas(
                                // Start some upload requests
                                if next_server < len(sv) {
                                        DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
-                                       go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+                                       go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
                                        next_server += 1
                                        active += 1
                                } else {
diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
deleted file mode 100644 (file)
index 396e311..0000000
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
-  reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
-  reader.Close()
-
-When you're done with the stream:
-  stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
-  stream := AsyncStreamFromSlice(buf)
-
-  r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
-       "errors"
-       "io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
-       buffer            []byte
-       requests          chan sliceRequest
-       add_reader        chan bool
-       subtract_reader   chan bool
-       wait_zero_readers chan bool
-       closed            bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
-       offset    int
-       stream    *AsyncStream
-       responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-       t := &AsyncStream{
-               buffer:            make([]byte, buffersize),
-               requests:          make(chan sliceRequest),
-               add_reader:        make(chan bool),
-               subtract_reader:   make(chan bool),
-               wait_zero_readers: make(chan bool),
-       }
-
-       go t.transfer(source)
-       go t.readersMonitor()
-
-       return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-       t := &AsyncStream{
-               buffer:            buf,
-               requests:          make(chan sliceRequest),
-               add_reader:        make(chan bool),
-               subtract_reader:   make(chan bool),
-               wait_zero_readers: make(chan bool),
-       }
-
-       go t.transfer(nil)
-       go t.readersMonitor()
-
-       return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
-       this.add_reader <- true
-       return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
-       this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
-       rr, valid := <-this.responses
-       if valid {
-               this.offset += len(rr.slice)
-               return copy(p, rr.slice), rr.err
-       } else {
-               return 0, io.ErrUnexpectedEOF
-       }
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
-       // Record starting offset in order to correctly report the number of bytes sent
-       starting_offset := this.offset
-       for {
-               this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
-               rr, valid := <-this.responses
-               if valid {
-                       this.offset += len(rr.slice)
-                       if rr.err != nil {
-                               if rr.err == io.EOF {
-                                       // EOF is not an error.
-                                       return int64(this.offset - starting_offset), nil
-                               } else {
-                                       return int64(this.offset - starting_offset), rr.err
-                               }
-                       } else {
-                               dest.Write(rr.slice)
-                       }
-               } else {
-                       return int64(this.offset), io.ErrUnexpectedEOF
-               }
-       }
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
-       if this.stream == nil {
-               return ErrAlreadyClosed
-       }
-       this.stream.subtract_reader <- true
-       close(this.responses)
-       this.stream = nil
-       return nil
-}
-
-func (this *AsyncStream) Close() error {
-       if this.closed {
-               return ErrAlreadyClosed
-       }
-       this.closed = true
-       this.wait_zero_readers <- true
-       close(this.requests)
-       close(this.add_reader)
-       close(this.subtract_reader)
-       close(this.wait_zero_readers)
-       return nil
-}
diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go
deleted file mode 100644 (file)
index f8ddbf5..0000000
+++ /dev/null
@@ -1,381 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
-       . "gopkg.in/check.v1"
-       "io"
-       "testing"
-       "time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
-       ReadIntoBufferHelper(c, 225)
-       ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-       out := make([]byte, 128)
-       for i := 0; i < 128; i += 1 {
-               out[i] = byte(i)
-       }
-       writer.Write(out)
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 128)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 128; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-       out := make([]byte, 96)
-       for i := 0; i < 96; i += 1 {
-               out[i] = byte(i / 2)
-       }
-       writer.Write(out)
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 96)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 96; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i/2))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else if i < (128 + 96) {
-                       c.Check(buffer[i], Equals, byte((i-128)/2))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
-       buffer := make([]byte, bufsize)
-
-       reader, writer := io.Pipe()
-       slices := make(chan nextSlice)
-
-       go readIntoBuffer(buffer, reader, slices)
-
-       HelperWrite128andCheck(c, buffer, writer, slices)
-       HelperWrite96andCheck(c, buffer, writer, slices)
-
-       writer.Close()
-       s1 := <-slices
-       c.Check(len(s1.slice), Equals, 0)
-       c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
-       buffer := make([]byte, 223)
-       reader, writer := io.Pipe()
-       slices := make(chan nextSlice)
-
-       go readIntoBuffer(buffer, reader, slices)
-
-       HelperWrite128andCheck(c, buffer, writer, slices)
-
-       out := make([]byte, 96)
-       for i := 0; i < 96; i += 1 {
-               out[i] = byte(i / 2)
-       }
-
-       // Write will deadlock because it can't write all the data, so
-       // spin it off to a goroutine
-       go writer.Write(out)
-       s1 := <-slices
-
-       c.Check(len(s1.slice), Equals, 95)
-       c.Check(s1.reader_error, Equals, nil)
-       for i := 0; i < 95; i += 1 {
-               c.Check(s1.slice[i], Equals, byte(i/2))
-       }
-       for i := 0; i < len(buffer); i += 1 {
-               if i < 128 {
-                       c.Check(buffer[i], Equals, byte(i))
-               } else if i < (128 + 95) {
-                       c.Check(buffer[i], Equals, byte((i-128)/2))
-               } else {
-                       c.Check(buffer[i], Equals, byte(0))
-               }
-       }
-
-       writer.Close()
-       s1 = <-slices
-       c.Check(len(s1.slice), Equals, 0)
-       c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(512, reader)
-
-       br1 := tr.MakeStreamReader()
-       out := make([]byte, 128)
-
-       {
-               // Write some data, and read into a buffer shorter than
-               // available data
-               for i := 0; i < 128; i += 1 {
-                       out[i] = byte(i)
-               }
-
-               writer.Write(out[:100])
-
-               in := make([]byte, 64)
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 64)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 64; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-       }
-
-       {
-               // Write some more data, and read into buffer longer than
-               // available data
-               in := make([]byte, 64)
-               n, err := br1.Read(in)
-               c.Check(n, Equals, 36)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 36; i += 1 {
-                       c.Check(in[i], Equals, out[64+i])
-               }
-
-       }
-
-       {
-               // Test read before write
-               type Rd struct {
-                       n   int
-                       err error
-               }
-               rd := make(chan Rd)
-               in := make([]byte, 64)
-
-               go func() {
-                       n, err := br1.Read(in)
-                       rd <- Rd{n, err}
-               }()
-
-               time.Sleep(100 * time.Millisecond)
-               writer.Write(out[100:])
-
-               got := <-rd
-
-               c.Check(got.n, Equals, 28)
-               c.Check(got.err, Equals, nil)
-
-               for i := 0; i < 28; i += 1 {
-                       c.Check(in[i], Equals, out[100+i])
-               }
-       }
-
-       br2 := tr.MakeStreamReader()
-       {
-               // Test 'catch up' reader
-               in := make([]byte, 256)
-               n, err := br2.Read(in)
-
-               c.Check(n, Equals, 128)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 128; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-       }
-
-       {
-               // Test closing the reader
-               writer.Close()
-
-               in := make([]byte, 256)
-               n1, err1 := br1.Read(in)
-               n2, err2 := br2.Read(in)
-               c.Check(n1, Equals, 0)
-               c.Check(err1, Equals, io.EOF)
-               c.Check(n2, Equals, 0)
-               c.Check(err2, Equals, io.EOF)
-       }
-
-       {
-               // Test 'catch up' reader after closing
-               br3 := tr.MakeStreamReader()
-               in := make([]byte, 256)
-               n, err := br3.Read(in)
-
-               c.Check(n, Equals, 128)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 128; i += 1 {
-                       c.Check(in[i], Equals, out[i])
-               }
-
-               n, err = br3.Read(in)
-
-               c.Check(n, Equals, 0)
-               c.Check(err, Equals, io.EOF)
-       }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(100, reader)
-       defer tr.Close()
-
-       sr := tr.MakeStreamReader()
-       defer sr.Close()
-
-       out := make([]byte, 101)
-       go writer.Write(out)
-
-       n, err := sr.Read(out)
-       c.Check(n, Equals, 100)
-       c.Check(err, IsNil)
-
-       n, err = sr.Read(out)
-       c.Check(n, Equals, 0)
-       c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-       for i := 0; i < 100; i += 1 {
-               buffer[i] = byte(i)
-       }
-
-       tr := AsyncStreamFromSlice(buffer)
-
-       br1 := tr.MakeStreamReader()
-
-       in := make([]byte, 64)
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 64)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 64; i += 1 {
-                       c.Check(in[i], Equals, buffer[i])
-               }
-       }
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 36)
-               c.Check(err, Equals, nil)
-
-               for i := 0; i < 36; i += 1 {
-                       c.Check(in[i], Equals, buffer[64+i])
-               }
-       }
-       {
-               n, err := br1.Read(in)
-
-               c.Check(n, Equals, 0)
-               c.Check(err, Equals, io.EOF)
-       }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
-       // Buffer for reads from 'r'
-       buffer := make([]byte, 100)
-       for i := 0; i < 100; i += 1 {
-               buffer[i] = byte(i)
-       }
-
-       tr := AsyncStreamFromSlice(buffer)
-       defer tr.Close()
-
-       br1 := tr.MakeStreamReader()
-       defer br1.Close()
-
-       reader, writer := io.Pipe()
-
-       go func() {
-               p := make([]byte, 100)
-               n, err := reader.Read(p)
-               c.Check(n, Equals, 100)
-               c.Check(err, Equals, nil)
-               c.Check(p, DeepEquals, buffer)
-       }()
-
-       io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
-       reader, writer := io.Pipe()
-
-       tr := AsyncStreamFromReader(512, reader)
-       defer tr.Close()
-
-       sr := tr.MakeStreamReader()
-       go func() {
-               time.Sleep(100 * time.Millisecond)
-               sr.Close()
-       }()
-
-       for i := 0; i < 200; i += 1 {
-               go func() {
-                       br1 := tr.MakeStreamReader()
-                       defer br1.Close()
-
-                       p := make([]byte, 3)
-                       n, err := br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("foo"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("bar"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 3)
-                       c.Check(p[0:3], DeepEquals, []byte("baz"))
-
-                       n, err = br1.Read(p)
-                       c.Check(n, Equals, 0)
-                       c.Check(err, Equals, io.EOF)
-               }()
-       }
-
-       writer.Write([]byte("foo"))
-       writer.Write([]byte("bar"))
-       writer.Write([]byte("baz"))
-       writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
-       buffer := make([]byte, 100)
-       tr := AsyncStreamFromSlice(buffer)
-       sr := tr.MakeStreamReader()
-       c.Check(sr.Close(), IsNil)
-       c.Check(sr.Close(), Equals, ErrAlreadyClosed)
-       c.Check(tr.Close(), IsNil)
-       c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}
diff --git a/sdk/go/streamer/transfer.go b/sdk/go/streamer/transfer.go
deleted file mode 100644 (file)
index bea27f8..0000000
+++ /dev/null
@@ -1,310 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* Internal implementation of AsyncStream.
-Outline of operation:
-
-The kernel is the transfer() goroutine.  It manages concurrent reads and
-appends to the "body" slice.  "body" is a slice of "source_buffer" that
-represents the segment of the buffer that is already filled in and available
-for reading.
-
-To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
-from the io.Reader source directly into source_buffer.  Each read goes into a
-slice of buffer which spans the section immediately following the end of the
-current "body".  Each time a Read completes, a slice representing the the
-section just filled in (or any read errors/EOF) is sent over the "slices"
-channel back to the transfer() function.
-
-Meanwhile, the transfer() function selects() on two channels, the "requests"
-channel and the "slices" channel.
-
-When a message is received on the "slices" channel, this means the a new
-section of the buffer has data, or an error is signaled.  Since the data has
-been read directly into the source_buffer, it is able to simply increases the
-size of the body slice to encompass the newly filled in section.  Then any
-pending reads are serviced with handleReadRequest (described below).
-
-When a message is received on the "requests" channel, it means a StreamReader
-wants access to a slice of the buffer.  This is passed to handleReadRequest().
-
-The handleReadRequest() function takes a sliceRequest consisting of a buffer
-offset, maximum size, and channel to send the response.  If there was an error
-reported from the source reader, it is returned.  If the offset is less than
-the size of the body, the request can proceed, and it sends a body slice
-spanning the segment from offset to min(offset+maxsize, end of the body).  If
-source reader status is EOF (done filling the buffer) and the read request
-offset is beyond end of the body, it responds with EOF.  Otherwise, the read
-request is for a slice beyond the current size of "body" but we expect the body
-to expand as more data is added, so the request gets added to a wait list.
-
-The transfer() runs until the requests channel is closed by AsyncStream.Close()
-
-To track readers, streamer uses the readersMonitor() goroutine.  This goroutine
-chooses which channels to receive from based on the number of outstanding
-readers.  When a new reader is created, it sends a message on the add_reader
-channel.  If the number of readers is already at MAX_READERS, this blocks the
-sender until an existing reader is closed.  When a reader is closed, it sends a
-message on the subtract_reader channel.  Finally, when AsyncStream.Close() is
-called, it sends a message on the wait_zero_readers channel, which will block
-the sender unless there are zero readers and it is safe to shut down the
-AsyncStream.
-*/
-
-package streamer
-
-import (
-       "io"
-)
-
-const MAX_READERS = 100
-
-// A slice passed from readIntoBuffer() to transfer()
-type nextSlice struct {
-       slice        []byte
-       reader_error error
-}
-
-// A read request to the Transfer() function
-type sliceRequest struct {
-       offset  int
-       maxsize int
-       result  chan<- sliceResult
-}
-
-// A read result from the Transfer() function
-type sliceResult struct {
-       slice []byte
-       err   error
-}
-
-// Supports writing into a buffer
-type bufferWriter struct {
-       buf []byte
-       ptr int
-}
-
-// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this *bufferWriter) Write(p []byte) (n int, err error) {
-       n = copy(this.buf[this.ptr:], p)
-       this.ptr += n
-       return n, nil
-}
-
-// Read repeatedly from the reader and write sequentially into the specified
-// buffer, and report each read to channel 'c'.  Completes when Reader 'r'
-// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
-       defer close(slices)
-
-       if writeto, ok := r.(io.WriterTo); ok {
-               n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
-               if err != nil {
-                       slices <- nextSlice{nil, err}
-               } else {
-                       slices <- nextSlice{buffer[:n], nil}
-                       slices <- nextSlice{nil, io.EOF}
-               }
-               return
-       } else {
-               // Initially entire buffer is available
-               ptr := buffer[:]
-               for {
-                       var n int
-                       var err error
-                       if len(ptr) > 0 {
-                               const readblock = 64 * 1024
-                               // Read 64KiB into the next part of the buffer
-                               if len(ptr) > readblock {
-                                       n, err = r.Read(ptr[:readblock])
-                               } else {
-                                       n, err = r.Read(ptr)
-                               }
-                       } else {
-                               // Ran out of buffer space, try reading one more byte
-                               var b [1]byte
-                               n, err = r.Read(b[:])
-
-                               if n > 0 {
-                                       // Reader has more data but we have nowhere to
-                                       // put it, so we're stuffed
-                                       slices <- nextSlice{nil, io.ErrShortBuffer}
-                               } else {
-                                       // Return some other error (hopefully EOF)
-                                       slices <- nextSlice{nil, err}
-                               }
-                               return
-                       }
-
-                       // End on error (includes EOF)
-                       if err != nil {
-                               slices <- nextSlice{nil, err}
-                               return
-                       }
-
-                       if n > 0 {
-                               // Make a slice with the contents of the read
-                               slices <- nextSlice{ptr[:n], nil}
-
-                               // Adjust the scratch space slice
-                               ptr = ptr[n:]
-                       }
-               }
-       }
-}
-
-// Handle a read request.  Returns true if a response was sent, and false if
-// the request should be queued.
-func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
-       if (reader_status != nil) && (reader_status != io.EOF) {
-               req.result <- sliceResult{nil, reader_status}
-               return true
-       } else if req.offset < len(body) {
-               var end int
-               if req.offset+req.maxsize < len(body) {
-                       end = req.offset + req.maxsize
-               } else {
-                       end = len(body)
-               }
-               req.result <- sliceResult{body[req.offset:end], nil}
-               return true
-       } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
-               req.result <- sliceResult{nil, io.EOF}
-               return true
-       } else {
-               return false
-       }
-}
-
-// Mediates between reads and appends.
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel.  Completes
-// when 'requests' channel is closed.
-func (this *AsyncStream) transfer(source_reader io.Reader) {
-       source_buffer := this.buffer
-       requests := this.requests
-
-       // currently buffered data
-       var body []byte
-
-       // for receiving slices from readIntoBuffer
-       var slices chan nextSlice = nil
-
-       // indicates the status of the underlying reader
-       var reader_status error = nil
-
-       if source_reader != nil {
-               // 'body' is the buffer slice representing the body content read so far
-               body = source_buffer[:0]
-
-               // used to communicate slices of the buffer as they are
-               // readIntoBuffer will close 'slices' when it is done with it
-               slices = make(chan nextSlice)
-
-               // Spin it off
-               go readIntoBuffer(source_buffer, source_reader, slices)
-       } else {
-               // use the whole buffer
-               body = source_buffer[:]
-
-               // buffer is complete
-               reader_status = io.EOF
-       }
-
-       pending_requests := make([]sliceRequest, 0)
-
-       for {
-               select {
-               case req, valid := <-requests:
-                       // Handle a buffer read request
-                       if valid {
-                               if !handleReadRequest(req, body, reader_status) {
-                                       pending_requests = append(pending_requests, req)
-                               }
-                       } else {
-                               // closed 'requests' channel indicates we're done
-                               return
-                       }
-
-               case bk, valid := <-slices:
-                       // Got a new slice from the reader
-                       if valid {
-                               reader_status = bk.reader_error
-
-                               if bk.slice != nil {
-                                       // adjust body bounds now that another slice has been read
-                                       body = source_buffer[0 : len(body)+len(bk.slice)]
-                               }
-
-                               // handle pending reads
-                               n := 0
-                               for n < len(pending_requests) {
-                                       if handleReadRequest(pending_requests[n], body, reader_status) {
-                                               // move the element from the back of the slice to
-                                               // position 'n', then shorten the slice by one element
-                                               pending_requests[n] = pending_requests[len(pending_requests)-1]
-                                               pending_requests = pending_requests[0 : len(pending_requests)-1]
-                                       } else {
-
-                                               // Request wasn't handled, so keep it in the request slice
-                                               n += 1
-                                       }
-                               }
-                       } else {
-                               if reader_status == nil {
-                                       // slices channel closed without signaling EOF
-                                       reader_status = io.ErrUnexpectedEOF
-                               }
-                               slices = nil
-                       }
-               }
-       }
-}
-
-func (this *AsyncStream) readersMonitor() {
-       var readers int = 0
-
-       for {
-               if readers == 0 {
-                       select {
-                       case _, ok := <-this.wait_zero_readers:
-                               if ok {
-                                       // nothing, just implicitly unblock the sender
-                               } else {
-                                       return
-                               }
-                       case _, ok := <-this.add_reader:
-                               if ok {
-                                       readers += 1
-                               } else {
-                                       return
-                               }
-                       }
-               } else if readers > 0 && readers < MAX_READERS {
-                       select {
-                       case _, ok := <-this.add_reader:
-                               if ok {
-                                       readers += 1
-                               } else {
-                                       return
-                               }
-
-                       case _, ok := <-this.subtract_reader:
-                               if ok {
-                                       readers -= 1
-                               } else {
-                                       return
-                               }
-                       }
-               } else if readers == MAX_READERS {
-                       _, ok := <-this.subtract_reader
-                       if ok {
-                               readers -= 1
-                       } else {
-                               return
-                       }
-               }
-       }
-}