X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5b2ebfe3b248790efbce3206b693dd93f369ea4e..2031e72047634a3f5944f36d175c1ae351a3bd3e:/sdk/go/keepclient/support.go diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index 37912506a2..3b1afe1e28 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -11,15 +11,14 @@ import ( "io" "io/ioutil" "log" - "math/rand" "net/http" "os" "strings" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" ) -// Function used to emit debug messages. The easiest way to enable +// DebugPrintf emits debug messages. The easiest way to enable // keepclient debug messages in your application is to assign // log.Printf to DebugPrintf. var DebugPrintf = func(string, ...interface{}) {} @@ -49,22 +48,22 @@ type svcList struct { } type uploadStatus struct { - err error - url string - statusCode int - replicas_stored int - response string + err error + url string + statusCode int + replicasStored int + response string } -func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, - upload_status chan<- uploadStatus, expectedLength int64, requestID int32) { +func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, + uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) { var req *http.Request var err error var url = fmt.Sprintf("%s/%s", host, hash) if req, err = http.NewRequest("PUT", url, nil); err != nil { - DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, ""} + DebugPrintf("DEBUG: [%s] Error creating request PUT %v error: %v", reqid, url, err.Error()) + uploadStatusChan <- uploadStatus{err, url, 0, 0, ""} return } @@ -77,19 +76,23 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea // to be empty, so don't set req.Body. } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) + req.Header.Add("X-Request-Id", reqid) + req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken) req.Header.Add("Content-Type", "application/octet-stream") - req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas)) + req.Header.Add(XKeepDesiredReplicas, fmt.Sprint(kc.Want_replicas)) + if len(kc.StorageClasses) > 0 { + req.Header.Add("X-Keep-Storage-Classes", strings.Join(kc.StorageClasses, ", ")) + } var resp *http.Response - if resp, err = this.httpClient().Do(req); err != nil { - DebugPrintf("DEBUG: [%08x] Upload failed %v error: %v", requestID, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, ""} + if resp, err = kc.httpClient().Do(req); err != nil { + DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error()) + uploadStatusChan <- uploadStatus{err, url, 0, 0, err.Error()} return } rep := 1 - if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" { + if xr := resp.Header.Get(XKeepReplicasStored); xr != "" { fmt.Sscanf(xr, "%d", &rep) } @@ -99,78 +102,76 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea respbody, err2 := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096}) response := strings.TrimSpace(string(respbody)) if err2 != nil && err2 != io.EOF { - DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, err2.Error(), response) - upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response} + DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, err2.Error(), response) + uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, response} } else if resp.StatusCode == http.StatusOK { - DebugPrintf("DEBUG: [%08x] Upload %v success", requestID, url) - upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response} + DebugPrintf("DEBUG: [%s] Upload %v success", reqid, url) + uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, response} } else { if resp.StatusCode >= 300 && response == "" { response = resp.Status } - DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, resp.StatusCode, response) - upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} + DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, resp.StatusCode, response) + uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} } } -func (this *KeepClient) putReplicas( +func (kc *KeepClient) putReplicas( hash string, getReader func() io.Reader, expectedLength int64) (locator string, replicas int, err error) { - // Generate an arbitrary ID to identify this specific - // transaction in debug logs. - requestID := rand.Int31() + reqid := kc.getRequestID() // Calculate the ordering for uploading to servers - sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots() + sv := NewRootSorter(kc.WritableLocalRoots(), hash).GetSortedRoots() // The next server to try contacting - next_server := 0 + nextServer := 0 // The number of active writers active := 0 // Used to communicate status from the upload goroutines - upload_status := make(chan uploadStatus) + uploadStatusChan := make(chan uploadStatus) defer func() { // Wait for any abandoned uploads (e.g., we started // two uploads and the first replied with replicas=2) // to finish before closing the status channel. go func() { for active > 0 { - <-upload_status + <-uploadStatusChan } - close(upload_status) + close(uploadStatusChan) }() }() replicasDone := 0 - replicasTodo := this.Want_replicas + replicasTodo := kc.Want_replicas - replicasPerThread := this.replicasPerService + replicasPerThread := kc.replicasPerService if replicasPerThread < 1 { // unlimited or unknown replicasPerThread = replicasTodo } - retriesRemaining := 1 + this.Retries + retriesRemaining := 1 + kc.Retries var retryServers []string lastError := make(map[string]string) for retriesRemaining > 0 { - retriesRemaining -= 1 - next_server = 0 + retriesRemaining-- + nextServer = 0 retryServers = []string{} for replicasTodo > 0 { for active*replicasPerThread < replicasTodo { // Start some upload requests - if next_server < len(sv) { - DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server]) - go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID) - next_server += 1 - active += 1 + if nextServer < len(sv) { + DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer]) + go kc.uploadToKeepServer(sv[nextServer], hash, getReader(), uploadStatusChan, expectedLength, reqid) + nextServer++ + active++ } else { if active == 0 && retriesRemaining == 0 { msg := "Could not write sufficient replicas: " @@ -179,23 +180,22 @@ func (this *KeepClient) putReplicas( } msg = msg[:len(msg)-2] return locator, replicasDone, InsufficientReplicasError(errors.New(msg)) - } else { - break } + break } } - DebugPrintf("DEBUG: [%08x] Replicas remaining to write: %v active uploads: %v", - requestID, replicasTodo, active) + DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", + reqid, replicasTodo, active) // Now wait for something to happen. if active > 0 { - status := <-upload_status - active -= 1 + status := <-uploadStatusChan + active-- if status.statusCode == 200 { // good news! - replicasDone += status.replicas_stored - replicasTodo -= status.replicas_stored + replicasDone += status.replicasStored + replicasTodo -= status.replicasStored locator = status.response delete(lastError, status.url) } else {