X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0f644e242ef37c911ad3dc25aca8135c339de349..0c888bcc93b559339c8abbce784bdcc44746bca2:/sdk/go/keepclient/support.go diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index 49ef543d87..37912506a2 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -17,7 +17,6 @@ import ( "strings" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/streamer" ) // Function used to emit debug messages. The easiest way to enable @@ -57,7 +56,7 @@ type uploadStatus struct { response string } -func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, +func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, upload_status chan<- uploadStatus, expectedLength int64, requestID int32) { var req *http.Request @@ -66,21 +65,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea if req, err = http.NewRequest("PUT", url, nil); err != nil { DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error()) upload_status <- uploadStatus{err, url, 0, 0, ""} - body.Close() return } req.ContentLength = expectedLength if expectedLength > 0 { - // Do() will close the body ReadCloser when it is done - // with it. - req.Body = body + req.Body = ioutil.NopCloser(body) } else { - // "For client requests, a value of 0 means unknown if Body is - // not nil." In this case we do want the body to be empty, so - // don't set req.Body. However, we still need to close the - // body ReadCloser. - body.Close() + // "For client requests, a value of 0 means unknown if + // Body is not nil." In this case we do want the body + // to be empty, so don't set req.Body. } req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) @@ -121,7 +115,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea func (this *KeepClient) putReplicas( hash string, - tr *streamer.AsyncStream, + getReader func() io.Reader, expectedLength int64) (locator string, replicas int, err error) { // Generate an arbitrary ID to identify this specific @@ -174,7 +168,7 @@ func (this *KeepClient) putReplicas( // Start some upload requests if next_server < len(sv) { DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server]) - go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID) + go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID) next_server += 1 active += 1 } else {