X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/16f704326f44fd1e5e5e60b936c9b5895d6a6ff8..2031e72047634a3f5944f36d175c1ae351a3bd3e:/sdk/go/keepclient/support.go diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index e589593fa8..3b1afe1e28 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -15,10 +15,10 @@ import ( "os" "strings" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" ) -// Function used to emit debug messages. The easiest way to enable +// DebugPrintf emits debug messages. The easiest way to enable // keepclient debug messages in your application is to assign // log.Printf to DebugPrintf. var DebugPrintf = func(string, ...interface{}) {} @@ -48,22 +48,22 @@ type svcList struct { } type uploadStatus struct { - err error - url string - statusCode int - replicas_stored int - response string + err error + url string + statusCode int + replicasStored int + response string } -func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, - upload_status chan<- uploadStatus, expectedLength int64, reqid string) { +func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, + uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) { var req *http.Request var err error var url = fmt.Sprintf("%s/%s", host, hash) if req, err = http.NewRequest("PUT", url, nil); err != nil { DebugPrintf("DEBUG: [%s] Error creating request PUT %v error: %v", reqid, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, ""} + uploadStatusChan <- uploadStatus{err, url, 0, 0, ""} return } @@ -77,22 +77,22 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea } req.Header.Add("X-Request-Id", reqid) - req.Header.Add("Authorization", "OAuth2 "+this.Arvados.ApiToken) + req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken) req.Header.Add("Content-Type", "application/octet-stream") - req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas)) - if len(this.StorageClasses) > 0 { - req.Header.Add("X-Keep-Storage-Classes", strings.Join(this.StorageClasses, ", ")) + req.Header.Add(XKeepDesiredReplicas, fmt.Sprint(kc.Want_replicas)) + if len(kc.StorageClasses) > 0 { + req.Header.Add("X-Keep-Storage-Classes", strings.Join(kc.StorageClasses, ", ")) } var resp *http.Response - if resp, err = this.httpClient().Do(req); err != nil { + if resp, err = kc.httpClient().Do(req); err != nil { DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, err.Error()} + uploadStatusChan <- uploadStatus{err, url, 0, 0, err.Error()} return } rep := 1 - if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" { + if xr := resp.Header.Get(XKeepReplicasStored); xr != "" { fmt.Sscanf(xr, "%d", &rep) } @@ -103,75 +103,75 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea response := strings.TrimSpace(string(respbody)) if err2 != nil && err2 != io.EOF { DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, err2.Error(), response) - upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response} + uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, response} } else if resp.StatusCode == http.StatusOK { DebugPrintf("DEBUG: [%s] Upload %v success", reqid, url) - upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response} + uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, response} } else { if resp.StatusCode >= 300 && response == "" { response = resp.Status } DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, resp.StatusCode, response) - upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} + uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} } } -func (this *KeepClient) putReplicas( +func (kc *KeepClient) putReplicas( hash string, getReader func() io.Reader, expectedLength int64) (locator string, replicas int, err error) { - reqid := this.getRequestID() + reqid := kc.getRequestID() // Calculate the ordering for uploading to servers - sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots() + sv := NewRootSorter(kc.WritableLocalRoots(), hash).GetSortedRoots() // The next server to try contacting - next_server := 0 + nextServer := 0 // The number of active writers active := 0 // Used to communicate status from the upload goroutines - upload_status := make(chan uploadStatus) + uploadStatusChan := make(chan uploadStatus) defer func() { // Wait for any abandoned uploads (e.g., we started // two uploads and the first replied with replicas=2) // to finish before closing the status channel. go func() { for active > 0 { - <-upload_status + <-uploadStatusChan } - close(upload_status) + close(uploadStatusChan) }() }() replicasDone := 0 - replicasTodo := this.Want_replicas + replicasTodo := kc.Want_replicas - replicasPerThread := this.replicasPerService + replicasPerThread := kc.replicasPerService if replicasPerThread < 1 { // unlimited or unknown replicasPerThread = replicasTodo } - retriesRemaining := 1 + this.Retries + retriesRemaining := 1 + kc.Retries var retryServers []string lastError := make(map[string]string) for retriesRemaining > 0 { - retriesRemaining -= 1 - next_server = 0 + retriesRemaining-- + nextServer = 0 retryServers = []string{} for replicasTodo > 0 { for active*replicasPerThread < replicasTodo { // Start some upload requests - if next_server < len(sv) { - DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[next_server]) - go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, reqid) - next_server += 1 - active += 1 + if nextServer < len(sv) { + DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer]) + go kc.uploadToKeepServer(sv[nextServer], hash, getReader(), uploadStatusChan, expectedLength, reqid) + nextServer++ + active++ } else { if active == 0 && retriesRemaining == 0 { msg := "Could not write sufficient replicas: " @@ -180,9 +180,8 @@ func (this *KeepClient) putReplicas( } msg = msg[:len(msg)-2] return locator, replicasDone, InsufficientReplicasError(errors.New(msg)) - } else { - break } + break } } DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", @@ -190,13 +189,13 @@ func (this *KeepClient) putReplicas( // Now wait for something to happen. if active > 0 { - status := <-upload_status - active -= 1 + status := <-uploadStatusChan + active-- if status.statusCode == 200 { // good news! - replicasDone += status.replicas_stored - replicasTodo -= status.replicas_stored + replicasDone += status.replicasStored + replicasTodo -= status.replicasStored locator = status.response delete(lastError, status.url) } else {