X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0d06a2984420d9d48e16ccb6d85982b3dce05644..729b2762630b343b50aa1cb74733635ebcc52eb4:/sdk/go/keepclient/support.go diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index 37912506a2..f4e99bdbe3 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -11,15 +11,14 @@ import ( "io" "io/ioutil" "log" - "math/rand" "net/http" "os" "strings" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" ) -// Function used to emit debug messages. The easiest way to enable +// DebugPrintf emits debug messages. The easiest way to enable // keepclient debug messages in your application is to assign // log.Printf to DebugPrintf. var DebugPrintf = func(string, ...interface{}) {} @@ -49,22 +48,22 @@ type svcList struct { } type uploadStatus struct { - err error - url string - statusCode int - replicas_stored int - response string + err error + url string + statusCode int + replicasStored int + response string } func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, - upload_status chan<- uploadStatus, expectedLength int64, requestID int32) { + uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) { var req *http.Request var err error var url = fmt.Sprintf("%s/%s", host, hash) if req, err = http.NewRequest("PUT", url, nil); err != nil { - DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, ""} + DebugPrintf("DEBUG: [%s] Error creating request PUT %v error: %v", reqid, url, err.Error()) + uploadStatusChan <- uploadStatus{err, url, 0, 0, ""} return } @@ -77,14 +76,18 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea // to be empty, so don't set req.Body. } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) + req.Header.Add("X-Request-Id", reqid) + req.Header.Add("Authorization", "OAuth2 "+this.Arvados.ApiToken) req.Header.Add("Content-Type", "application/octet-stream") req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas)) + if len(this.StorageClasses) > 0 { + req.Header.Add("X-Keep-Storage-Classes", strings.Join(this.StorageClasses, ", ")) + } var resp *http.Response if resp, err = this.httpClient().Do(req); err != nil { - DebugPrintf("DEBUG: [%08x] Upload failed %v error: %v", requestID, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, ""} + DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error()) + uploadStatusChan <- uploadStatus{err, url, 0, 0, err.Error()} return } @@ -99,17 +102,17 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea respbody, err2 := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096}) response := strings.TrimSpace(string(respbody)) if err2 != nil && err2 != io.EOF { - DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, err2.Error(), response) - upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response} + DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, err2.Error(), response) + uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, response} } else if resp.StatusCode == http.StatusOK { - DebugPrintf("DEBUG: [%08x] Upload %v success", requestID, url) - upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response} + DebugPrintf("DEBUG: [%s] Upload %v success", reqid, url) + uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, response} } else { if resp.StatusCode >= 300 && response == "" { response = resp.Status } - DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, resp.StatusCode, response) - upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} + DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, resp.StatusCode, response) + uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} } } @@ -118,30 +121,28 @@ func (this *KeepClient) putReplicas( getReader func() io.Reader, expectedLength int64) (locator string, replicas int, err error) { - // Generate an arbitrary ID to identify this specific - // transaction in debug logs. - requestID := rand.Int31() + reqid := this.getRequestID() // Calculate the ordering for uploading to servers sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots() // The next server to try contacting - next_server := 0 + nextServer := 0 // The number of active writers active := 0 // Used to communicate status from the upload goroutines - upload_status := make(chan uploadStatus) + uploadStatusChan := make(chan uploadStatus) defer func() { // Wait for any abandoned uploads (e.g., we started // two uploads and the first replied with replicas=2) // to finish before closing the status channel. go func() { for active > 0 { - <-upload_status + <-uploadStatusChan } - close(upload_status) + close(uploadStatusChan) }() }() @@ -161,15 +162,15 @@ func (this *KeepClient) putReplicas( for retriesRemaining > 0 { retriesRemaining -= 1 - next_server = 0 + nextServer = 0 retryServers = []string{} for replicasTodo > 0 { for active*replicasPerThread < replicasTodo { // Start some upload requests - if next_server < len(sv) { - DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server]) - go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID) - next_server += 1 + if nextServer < len(sv) { + DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer]) + go this.uploadToKeepServer(sv[nextServer], hash, getReader(), uploadStatusChan, expectedLength, reqid) + nextServer += 1 active += 1 } else { if active == 0 && retriesRemaining == 0 { @@ -179,23 +180,22 @@ func (this *KeepClient) putReplicas( } msg = msg[:len(msg)-2] return locator, replicasDone, InsufficientReplicasError(errors.New(msg)) - } else { - break } + break } } - DebugPrintf("DEBUG: [%08x] Replicas remaining to write: %v active uploads: %v", - requestID, replicasTodo, active) + DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", + reqid, replicasTodo, active) // Now wait for something to happen. if active > 0 { - status := <-upload_status + status := <-uploadStatusChan active -= 1 if status.statusCode == 200 { // good news! - replicasDone += status.replicas_stored - replicasTodo -= status.replicas_stored + replicasDone += status.replicasStored + replicasTodo -= status.replicasStored locator = status.response delete(lastError, status.url) } else {