X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fd3a65529ae255a8c57552760707690437623b0a..729b2762630b343b50aa1cb74733635ebcc52eb4:/sdk/go/keepclient/support.go diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index bfe8d5b77a..f4e99bdbe3 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -15,10 +15,10 @@ import ( "os" "strings" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" ) -// Function used to emit debug messages. The easiest way to enable +// DebugPrintf emits debug messages. The easiest way to enable // keepclient debug messages in your application is to assign // log.Printf to DebugPrintf. var DebugPrintf = func(string, ...interface{}) {} @@ -48,22 +48,22 @@ type svcList struct { } type uploadStatus struct { - err error - url string - statusCode int - replicas_stored int - response string + err error + url string + statusCode int + replicasStored int + response string } func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, - upload_status chan<- uploadStatus, expectedLength int64, reqid string) { + uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) { var req *http.Request var err error var url = fmt.Sprintf("%s/%s", host, hash) if req, err = http.NewRequest("PUT", url, nil); err != nil { DebugPrintf("DEBUG: [%s] Error creating request PUT %v error: %v", reqid, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, ""} + uploadStatusChan <- uploadStatus{err, url, 0, 0, ""} return } @@ -80,11 +80,14 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea req.Header.Add("Authorization", "OAuth2 "+this.Arvados.ApiToken) req.Header.Add("Content-Type", "application/octet-stream") req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas)) + if len(this.StorageClasses) > 0 { + req.Header.Add("X-Keep-Storage-Classes", strings.Join(this.StorageClasses, ", ")) + } var resp *http.Response if resp, err = this.httpClient().Do(req); err != nil { DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, ""} + uploadStatusChan <- uploadStatus{err, url, 0, 0, err.Error()} return } @@ -100,16 +103,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea response := strings.TrimSpace(string(respbody)) if err2 != nil && err2 != io.EOF { DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, err2.Error(), response) - upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response} + uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, response} } else if resp.StatusCode == http.StatusOK { DebugPrintf("DEBUG: [%s] Upload %v success", reqid, url) - upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response} + uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, response} } else { if resp.StatusCode >= 300 && response == "" { response = resp.Status } DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, resp.StatusCode, response) - upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} + uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} } } @@ -124,22 +127,22 @@ func (this *KeepClient) putReplicas( sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots() // The next server to try contacting - next_server := 0 + nextServer := 0 // The number of active writers active := 0 // Used to communicate status from the upload goroutines - upload_status := make(chan uploadStatus) + uploadStatusChan := make(chan uploadStatus) defer func() { // Wait for any abandoned uploads (e.g., we started // two uploads and the first replied with replicas=2) // to finish before closing the status channel. go func() { for active > 0 { - <-upload_status + <-uploadStatusChan } - close(upload_status) + close(uploadStatusChan) }() }() @@ -159,15 +162,15 @@ func (this *KeepClient) putReplicas( for retriesRemaining > 0 { retriesRemaining -= 1 - next_server = 0 + nextServer = 0 retryServers = []string{} for replicasTodo > 0 { for active*replicasPerThread < replicasTodo { // Start some upload requests - if next_server < len(sv) { - DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[next_server]) - go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, reqid) - next_server += 1 + if nextServer < len(sv) { + DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer]) + go this.uploadToKeepServer(sv[nextServer], hash, getReader(), uploadStatusChan, expectedLength, reqid) + nextServer += 1 active += 1 } else { if active == 0 && retriesRemaining == 0 { @@ -177,9 +180,8 @@ func (this *KeepClient) putReplicas( } msg = msg[:len(msg)-2] return locator, replicasDone, InsufficientReplicasError(errors.New(msg)) - } else { - break } + break } } DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", @@ -187,13 +189,13 @@ func (this *KeepClient) putReplicas( // Now wait for something to happen. if active > 0 { - status := <-upload_status + status := <-uploadStatusChan active -= 1 if status.statusCode == 200 { // good news! - replicasDone += status.replicas_stored - replicasTodo -= status.replicas_stored + replicasDone += status.replicasStored + replicasTodo -= status.replicasStored locator = status.response delete(lastError, status.url) } else {