10666: Merge branch 'master' into 10666-report-version
[arvados.git] / sdk / go / keepclient / support.go
index 49ef543d872f94d169c2e76b422cffee76ef86ed..37912506a2cb6ab7c014a0edac13e922c20526d6 100644 (file)
@@ -17,7 +17,6 @@ import (
        "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
 )
 
 // Function used to emit debug messages. The easiest way to enable
@@ -57,7 +56,7 @@ type uploadStatus struct {
        response        string
 }
 
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
        upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
 
        var req *http.Request
@@ -66,21 +65,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
                DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
                upload_status <- uploadStatus{err, url, 0, 0, ""}
-               body.Close()
                return
        }
 
        req.ContentLength = expectedLength
        if expectedLength > 0 {
-               // Do() will close the body ReadCloser when it is done
-               // with it.
-               req.Body = body
+               req.Body = ioutil.NopCloser(body)
        } else {
-               // "For client requests, a value of 0 means unknown if Body is
-               // not nil."  In this case we do want the body to be empty, so
-               // don't set req.Body.  However, we still need to close the
-               // body ReadCloser.
-               body.Close()
+               // "For client requests, a value of 0 means unknown if
+               // Body is not nil."  In this case we do want the body
+               // to be empty, so don't set req.Body.
        }
 
        req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
@@ -121,7 +115,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
 
 func (this *KeepClient) putReplicas(
        hash string,
-       tr *streamer.AsyncStream,
+       getReader func() io.Reader,
        expectedLength int64) (locator string, replicas int, err error) {
 
        // Generate an arbitrary ID to identify this specific
@@ -174,7 +168,7 @@ func (this *KeepClient) putReplicas(
                                // Start some upload requests
                                if next_server < len(sv) {
                                        DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
-                                       go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+                                       go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
                                        next_server += 1
                                        active += 1
                                } else {