X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1586823b65c7ec7656626e491a31f3f9516a4a56..6bf9e1a4b5640f3cdd057810f0c9b8a945bb88bd:/sdk/go/keepclient/support.go diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index 33ba8720bc..91117f2d32 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package keepclient import ( @@ -7,25 +11,20 @@ import ( "io" "io/ioutil" "log" - "math/rand" - "net" "net/http" "os" - "regexp" "strings" - "time" - "git.curoverse.com/arvados.git/sdk/go/streamer" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" ) -// Function used to emit debug messages. The easiest way to enable +// DebugPrintf emits debug messages. The easiest way to enable // keepclient debug messages in your application is to assign // log.Printf to DebugPrintf. var DebugPrintf = func(string, ...interface{}) {} func init() { - var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$") - if matchTrue.MatchString(os.Getenv("ARVADOS_DEBUG")) { + if arvadosclient.StringBool(os.Getenv("ARVADOS_DEBUG")) { DebugPrintf = log.Printf } } @@ -44,101 +43,56 @@ func Md5String(s string) string { return fmt.Sprintf("%x", md5.Sum([]byte(s))) } -// Set timeouts applicable when connecting to non-disk services -// (assumed to be over the Internet). -func (*KeepClient) setClientSettingsNonDisk(client *http.Client) { - // Maximum time to wait for a complete response - client.Timeout = 300 * time.Second - - // TCP and TLS connection settings - client.Transport = &http.Transport{ - Dial: (&net.Dialer{ - // The maximum time to wait to set up - // the initial TCP connection. - Timeout: 30 * time.Second, - - // The TCP keep alive heartbeat - // interval. - KeepAlive: 120 * time.Second, - }).Dial, - - TLSHandshakeTimeout: 10 * time.Second, - } -} - -// Set timeouts applicable when connecting to keepstore services directly -// (assumed to be on the local network). -func (*KeepClient) setClientSettingsDisk(client *http.Client) { - // Maximum time to wait for a complete response - client.Timeout = 20 * time.Second - - // TCP and TLS connection timeouts - client.Transport = &http.Transport{ - Dial: (&net.Dialer{ - // The maximum time to wait to set up - // the initial TCP connection. - Timeout: 2 * time.Second, - - // The TCP keep alive heartbeat - // interval. - KeepAlive: 180 * time.Second, - }).Dial, - - TLSHandshakeTimeout: 4 * time.Second, - } -} - type svcList struct { Items []keepService `json:"items"` } type uploadStatus struct { - err error - url string - statusCode int - replicas_stored int - response string + err error + url string + statusCode int + replicasStored int + response string } -func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, - upload_status chan<- uploadStatus, expectedLength int64, requestID int32) { +func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, + uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) { var req *http.Request var err error var url = fmt.Sprintf("%s/%s", host, hash) if req, err = http.NewRequest("PUT", url, nil); err != nil { - DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, ""} - body.Close() + DebugPrintf("DEBUG: [%s] Error creating request PUT %v error: %v", reqid, url, err.Error()) + uploadStatusChan <- uploadStatus{err, url, 0, 0, ""} return } req.ContentLength = expectedLength if expectedLength > 0 { - // http.Client.Do will close the body ReadCloser when it is - // done with it. - req.Body = body + req.Body = ioutil.NopCloser(body) } else { - // "For client requests, a value of 0 means unknown if Body is - // not nil." In this case we do want the body to be empty, so - // don't set req.Body. However, we still need to close the - // body ReadCloser. - body.Close() + // "For client requests, a value of 0 means unknown if + // Body is not nil." In this case we do want the body + // to be empty, so don't set req.Body. } - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) + req.Header.Add("X-Request-Id", reqid) + req.Header.Add("Authorization", "OAuth2 "+this.Arvados.ApiToken) req.Header.Add("Content-Type", "application/octet-stream") - req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas)) + req.Header.Add(XKeepDesiredReplicas, fmt.Sprint(this.Want_replicas)) + if len(this.StorageClasses) > 0 { + req.Header.Add("X-Keep-Storage-Classes", strings.Join(this.StorageClasses, ", ")) + } var resp *http.Response - if resp, err = this.Client.Do(req); err != nil { - DebugPrintf("DEBUG: [%08x] Upload failed %v error: %v", requestID, url, err.Error()) - upload_status <- uploadStatus{err, url, 0, 0, ""} + if resp, err = this.httpClient().Do(req); err != nil { + DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error()) + uploadStatusChan <- uploadStatus{err, url, 0, 0, err.Error()} return } rep := 1 - if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" { + if xr := resp.Header.Get(XKeepReplicasStored); xr != "" { fmt.Sscanf(xr, "%d", &rep) } @@ -148,49 +102,47 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea respbody, err2 := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096}) response := strings.TrimSpace(string(respbody)) if err2 != nil && err2 != io.EOF { - DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, err2.Error(), response) - upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response} + DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, err2.Error(), response) + uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, response} } else if resp.StatusCode == http.StatusOK { - DebugPrintf("DEBUG: [%08x] Upload %v success", requestID, url) - upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response} + DebugPrintf("DEBUG: [%s] Upload %v success", reqid, url) + uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, response} } else { if resp.StatusCode >= 300 && response == "" { response = resp.Status } - DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, resp.StatusCode, response) - upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} + DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, resp.StatusCode, response) + uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} } } func (this *KeepClient) putReplicas( hash string, - tr *streamer.AsyncStream, + getReader func() io.Reader, expectedLength int64) (locator string, replicas int, err error) { - // Generate an arbitrary ID to identify this specific - // transaction in debug logs. - requestID := rand.Int31() + reqid := this.getRequestID() // Calculate the ordering for uploading to servers sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots() // The next server to try contacting - next_server := 0 + nextServer := 0 // The number of active writers active := 0 // Used to communicate status from the upload goroutines - upload_status := make(chan uploadStatus) + uploadStatusChan := make(chan uploadStatus) defer func() { // Wait for any abandoned uploads (e.g., we started // two uploads and the first replied with replicas=2) // to finish before closing the status channel. go func() { for active > 0 { - <-upload_status + <-uploadStatusChan } - close(upload_status) + close(uploadStatusChan) }() }() @@ -209,17 +161,17 @@ func (this *KeepClient) putReplicas( lastError := make(map[string]string) for retriesRemaining > 0 { - retriesRemaining -= 1 - next_server = 0 + retriesRemaining-- + nextServer = 0 retryServers = []string{} for replicasTodo > 0 { for active*replicasPerThread < replicasTodo { // Start some upload requests - if next_server < len(sv) { - DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server]) - go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID) - next_server += 1 - active += 1 + if nextServer < len(sv) { + DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer]) + go this.uploadToKeepServer(sv[nextServer], hash, getReader(), uploadStatusChan, expectedLength, reqid) + nextServer++ + active++ } else { if active == 0 && retriesRemaining == 0 { msg := "Could not write sufficient replicas: " @@ -228,23 +180,22 @@ func (this *KeepClient) putReplicas( } msg = msg[:len(msg)-2] return locator, replicasDone, InsufficientReplicasError(errors.New(msg)) - } else { - break } + break } } - DebugPrintf("DEBUG: [%08x] Replicas remaining to write: %v active uploads: %v", - requestID, replicasTodo, active) + DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", + reqid, replicasTodo, active) // Now wait for something to happen. if active > 0 { - status := <-upload_status - active -= 1 + status := <-uploadStatusChan + active-- if status.statusCode == 200 { // good news! - replicasDone += status.replicas_stored - replicasTodo -= status.replicas_stored + replicasDone += status.replicasStored + replicasTodo -= status.replicasStored locator = status.response delete(lastError, status.url) } else {