2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / pipe_adapters.go
diff --git a/services/keepstore/pipe_adapters.go b/services/keepstore/pipe_adapters.go
deleted file mode 100644 (file)
index 6b55505..0000000
+++ /dev/null
@@ -1,93 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package keepstore
-
-import (
-       "bytes"
-       "context"
-       "io"
-       "io/ioutil"
-)
-
-// getWithPipe invokes getter and copies the resulting data into
-// buf. If ctx is done before all data is copied, getWithPipe closes
-// the pipe with an error, and returns early with an error.
-func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (int, error) {
-       piper, pipew := io.Pipe()
-       go func() {
-               pipew.CloseWithError(br.ReadBlock(ctx, loc, pipew))
-       }()
-       done := make(chan struct{})
-       var size int
-       var err error
-       go func() {
-               size, err = io.ReadFull(piper, buf)
-               if err == io.EOF || err == io.ErrUnexpectedEOF {
-                       err = nil
-               }
-               close(done)
-       }()
-       select {
-       case <-ctx.Done():
-               piper.CloseWithError(ctx.Err())
-               return 0, ctx.Err()
-       case <-done:
-               piper.Close()
-               return size, err
-       }
-}
-
-// putWithPipe invokes putter with a new pipe, and copies data
-// from buf into the pipe. If ctx is done before all data is copied,
-// putWithPipe closes the pipe with an error, and returns early with
-// an error.
-func putWithPipe(ctx context.Context, loc string, buf []byte, bw BlockWriter) error {
-       piper, pipew := io.Pipe()
-       copyErr := make(chan error)
-       go func() {
-               _, err := io.Copy(pipew, bytes.NewReader(buf))
-               copyErr <- err
-               close(copyErr)
-       }()
-
-       putErr := make(chan error, 1)
-       go func() {
-               putErr <- bw.WriteBlock(ctx, loc, piper)
-               close(putErr)
-       }()
-
-       var err error
-       select {
-       case err = <-copyErr:
-       case err = <-putErr:
-       case <-ctx.Done():
-               err = ctx.Err()
-       }
-
-       // Ensure io.Copy goroutine isn't blocked writing to pipew
-       // (otherwise, io.Copy is still using buf so it isn't safe to
-       // return). This can cause pipew to receive corrupt data if
-       // err came from copyErr or ctx.Done() before the copy
-       // finished. That's OK, though: in that case err != nil, and
-       // CloseWithErr(err) ensures putter() will get an error from
-       // piper.Read() before seeing EOF.
-       go pipew.CloseWithError(err)
-       go io.Copy(ioutil.Discard, piper)
-       <-copyErr
-
-       // Note: io.Copy() is finished now, but putter() might still
-       // be running. If we encounter an error before putter()
-       // returns, we return right away without waiting for putter().
-
-       if err != nil {
-               return err
-       }
-       select {
-       case <-ctx.Done():
-               return ctx.Err()
-       case err = <-putErr:
-               return err
-       }
-}