Merge branch '17528-install-warnings'
[arvados.git] / sdk / go / keepclient / support.go
index 9adbb4878f40541eb13c0feed39bf22241f4c3f5..7b2e47ff8042e379c1ac01825f4060011c81b3f9 100644 (file)
@@ -1,30 +1,31 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package keepclient
 
 import (
        "crypto/md5"
        "errors"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
        "io"
        "io/ioutil"
        "log"
-       "math/rand"
-       "net"
        "net/http"
        "os"
-       "regexp"
+       "strconv"
        "strings"
-       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
 )
 
-// Function used to emit debug messages. The easiest way to enable
+// DebugPrintf emits debug messages. The easiest way to enable
 // keepclient debug messages in your application is to assign
 // log.Printf to DebugPrintf.
 var DebugPrintf = func(string, ...interface{}) {}
 
 func init() {
-       var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
-       if matchTrue.MatchString(os.Getenv("ARVADOS_DEBUG")) {
+       if arvadosclient.StringBool(os.Getenv("ARVADOS_DEBUG")) {
                DebugPrintf = log.Printf
        }
 }
@@ -43,107 +44,64 @@ func Md5String(s string) string {
        return fmt.Sprintf("%x", md5.Sum([]byte(s)))
 }
 
-// Set timeouts applicable when connecting to non-disk services
-// (assumed to be over the Internet).
-func (this *KeepClient) setClientSettingsNonDisk() {
-       if this.Client.Timeout == 0 {
-               // Maximum time to wait for a complete response
-               this.Client.Timeout = 300 * time.Second
-
-               // TCP and TLS connection settings
-               this.Client.Transport = &http.Transport{
-                       Dial: (&net.Dialer{
-                               // The maximum time to wait to set up
-                               // the initial TCP connection.
-                               Timeout: 30 * time.Second,
-
-                               // The TCP keep alive heartbeat
-                               // interval.
-                               KeepAlive: 120 * time.Second,
-                       }).Dial,
-
-                       TLSHandshakeTimeout: 10 * time.Second,
-               }
-       }
-}
-
-// Set timeouts applicable when connecting to keepstore services directly
-// (assumed to be on the local network).
-func (this *KeepClient) setClientSettingsDisk() {
-       if this.Client.Timeout == 0 {
-               // Maximum time to wait for a complete response
-               this.Client.Timeout = 20 * time.Second
-
-               // TCP and TLS connection timeouts
-               this.Client.Transport = &http.Transport{
-                       Dial: (&net.Dialer{
-                               // The maximum time to wait to set up
-                               // the initial TCP connection.
-                               Timeout: 2 * time.Second,
-
-                               // The TCP keep alive heartbeat
-                               // interval.
-                               KeepAlive: 180 * time.Second,
-                       }).Dial,
-
-                       TLSHandshakeTimeout: 4 * time.Second,
-               }
-       }
-}
-
 type svcList struct {
        Items []keepService `json:"items"`
 }
 
 type uploadStatus struct {
-       err             error
-       url             string
-       statusCode      int
-       replicas_stored int
-       response        string
+       err            error
+       url            string
+       statusCode     int
+       replicasStored int
+       classesStored  map[string]int
+       response       string
 }
 
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
-       upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
+func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo []string, body io.Reader,
+       uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) {
 
        var req *http.Request
        var err error
        var url = fmt.Sprintf("%s/%s", host, hash)
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
-               DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
-               upload_status <- uploadStatus{err, url, 0, 0, ""}
-               body.Close()
+               DebugPrintf("DEBUG: [%s] Error creating request PUT %v error: %v", reqid, url, err.Error())
+               uploadStatusChan <- uploadStatus{err, url, 0, 0, nil, ""}
                return
        }
 
        req.ContentLength = expectedLength
        if expectedLength > 0 {
-               // http.Client.Do will close the body ReadCloser when it is
-               // done with it.
-               req.Body = body
+               req.Body = ioutil.NopCloser(body)
        } else {
-               // "For client requests, a value of 0 means unknown if Body is
-               // not nil."  In this case we do want the body to be empty, so
-               // don't set req.Body.  However, we still need to close the
-               // body ReadCloser.
-               body.Close()
+               // "For client requests, a value of 0 means unknown if
+               // Body is not nil."  In this case we do want the body
+               // to be empty, so don't set req.Body.
        }
 
-       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
+       req.Header.Add("X-Request-Id", reqid)
+       req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
        req.Header.Add("Content-Type", "application/octet-stream")
-       req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
+       req.Header.Add(XKeepDesiredReplicas, fmt.Sprint(kc.Want_replicas))
+       if len(classesTodo) > 0 {
+               req.Header.Add(XKeepStorageClasses, strings.Join(classesTodo, ", "))
+       }
 
        var resp *http.Response
-       if resp, err = this.Client.Do(req); err != nil {
-               DebugPrintf("DEBUG: [%08x] Upload failed %v error: %v", requestID, url, err.Error())
-               upload_status <- uploadStatus{err, url, 0, 0, ""}
+       if resp, err = kc.httpClient().Do(req); err != nil {
+               DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error())
+               uploadStatusChan <- uploadStatus{err, url, 0, 0, nil, err.Error()}
                return
        }
 
        rep := 1
-       if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" {
+       if xr := resp.Header.Get(XKeepReplicasStored); xr != "" {
                fmt.Sscanf(xr, "%d", &rep)
        }
+       scc := resp.Header.Get(XKeepStorageClassesConfirmed)
+       classesStored, err := parseStorageClassesConfirmedHeader(scc)
+       if err != nil {
+               DebugPrintf("DEBUG: [%s] Ignoring invalid %s header %q: %s", reqid, XKeepStorageClassesConfirmed, scc, err)
+       }
 
        defer resp.Body.Close()
        defer io.Copy(ioutil.Discard, resp.Body)
@@ -151,102 +109,155 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
        respbody, err2 := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: 4096})
        response := strings.TrimSpace(string(respbody))
        if err2 != nil && err2 != io.EOF {
-               DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, err2.Error(), response)
-               upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response}
+               DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, err2.Error(), response)
+               uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, classesStored, response}
        } else if resp.StatusCode == http.StatusOK {
-               DebugPrintf("DEBUG: [%08x] Upload %v success", requestID, url)
-               upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response}
+               DebugPrintf("DEBUG: [%s] Upload %v success", reqid, url)
+               uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, classesStored, response}
        } else {
-               DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, resp.StatusCode, response)
-               upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
+               if resp.StatusCode >= 300 && response == "" {
+                       response = resp.Status
+               }
+               DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, resp.StatusCode, response)
+               uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, classesStored, response}
        }
 }
 
-func (this *KeepClient) putReplicas(
+func (kc *KeepClient) putReplicas(
        hash string,
-       tr *streamer.AsyncStream,
+       getReader func() io.Reader,
        expectedLength int64) (locator string, replicas int, err error) {
 
-       // Generate an arbitrary ID to identify this specific
-       // transaction in debug logs.
-       requestID := rand.Int31()
+       reqid := kc.getRequestID()
 
        // Calculate the ordering for uploading to servers
-       sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots()
+       sv := NewRootSorter(kc.WritableLocalRoots(), hash).GetSortedRoots()
 
        // The next server to try contacting
-       next_server := 0
+       nextServer := 0
 
        // The number of active writers
        active := 0
 
        // Used to communicate status from the upload goroutines
-       upload_status := make(chan uploadStatus)
+       uploadStatusChan := make(chan uploadStatus)
        defer func() {
                // Wait for any abandoned uploads (e.g., we started
                // two uploads and the first replied with replicas=2)
                // to finish before closing the status channel.
                go func() {
                        for active > 0 {
-                               <-upload_status
+                               <-uploadStatusChan
                        }
-                       close(upload_status)
+                       close(uploadStatusChan)
                }()
        }()
 
+       replicasWanted := kc.Want_replicas
+       replicasTodo := map[string]int{}
+       for _, c := range kc.StorageClasses {
+               replicasTodo[c] = replicasWanted
+       }
        replicasDone := 0
-       replicasTodo := this.Want_replicas
 
-       replicasPerThread := this.replicasPerService
+       replicasPerThread := kc.replicasPerService
        if replicasPerThread < 1 {
                // unlimited or unknown
-               replicasPerThread = replicasTodo
+               replicasPerThread = replicasWanted
        }
 
-       retriesRemaining := 1 + this.Retries
+       retriesRemaining := 1 + kc.Retries
        var retryServers []string
 
+       lastError := make(map[string]string)
+       trackingClasses := len(replicasTodo) > 0
+
        for retriesRemaining > 0 {
-               retriesRemaining -= 1
-               next_server = 0
+               retriesRemaining--
+               nextServer = 0
                retryServers = []string{}
-               for replicasTodo > 0 {
-                       for active*replicasPerThread < replicasTodo {
+               for {
+                       var classesTodo []string
+                       var maxConcurrency int
+                       for sc, r := range replicasTodo {
+                               classesTodo = append(classesTodo, sc)
+                               if maxConcurrency == 0 || maxConcurrency > r {
+                                       // Having more than r
+                                       // writes in flight
+                                       // would overreplicate
+                                       // class sc.
+                                       maxConcurrency = r
+                               }
+                       }
+                       if !trackingClasses {
+                               maxConcurrency = replicasWanted - replicasDone
+                       }
+                       if maxConcurrency < 1 {
+                               // If there are no non-zero entries in
+                               // replicasTodo, we're done.
+                               break
+                       }
+                       for active*replicasPerThread < maxConcurrency {
                                // Start some upload requests
-                               if next_server < len(sv) {
-                                       DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
-                                       go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
-                                       next_server += 1
-                                       active += 1
+                               if nextServer < len(sv) {
+                                       DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer])
+                                       go kc.uploadToKeepServer(sv[nextServer], hash, classesTodo, getReader(), uploadStatusChan, expectedLength, reqid)
+                                       nextServer++
+                                       active++
                                } else {
                                        if active == 0 && retriesRemaining == 0 {
-                                               return locator, replicasDone, InsufficientReplicasError
-                                       } else {
-                                               break
+                                               msg := "Could not write sufficient replicas: "
+                                               for _, resp := range lastError {
+                                                       msg += resp + "; "
+                                               }
+                                               msg = msg[:len(msg)-2]
+                                               return locator, replicasDone, InsufficientReplicasError(errors.New(msg))
                                        }
+                                       break
                                }
                        }
-                       DebugPrintf("DEBUG: [%08x] Replicas remaining to write: %v active uploads: %v",
-                               requestID, replicasTodo, active)
-
-                       // Now wait for something to happen.
-                       if active > 0 {
-                               status := <-upload_status
-                               active -= 1
-
-                               if status.statusCode == 200 {
-                                       // good news!
-                                       replicasDone += status.replicas_stored
-                                       replicasTodo -= status.replicas_stored
-                                       locator = status.response
-                               } else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
-                                       (status.statusCode >= 500 && status.statusCode != 503) {
-                                       // Timeout, too many requests, or other server side failure
-                                       // Do not retry when status code is 503, which means the keep server is full
-                                       retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
+
+                       DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", reqid, replicasTodo, active)
+                       if active < 1 {
+                               break
+                       }
+
+                       // Wait for something to happen.
+                       status := <-uploadStatusChan
+                       active--
+
+                       if status.statusCode == http.StatusOK {
+                               delete(lastError, status.url)
+                               replicasDone += status.replicasStored
+                               if len(status.classesStored) == 0 {
+                                       // Server doesn't report
+                                       // storage classes. Give up
+                                       // trying to track which ones
+                                       // are satisfied; just rely on
+                                       // total # replicas.
+                                       trackingClasses = false
                                }
+                               for className, replicas := range status.classesStored {
+                                       if replicasTodo[className] > replicas {
+                                               replicasTodo[className] -= replicas
+                                       } else {
+                                               delete(replicasTodo, className)
+                                       }
+                               }
+                               locator = status.response
                        } else {
-                               break
+                               msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
+                               if len(msg) > 100 {
+                                       msg = msg[:100]
+                               }
+                               lastError[status.url] = msg
+                       }
+
+                       if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+                               (status.statusCode >= 500 && status.statusCode != 503) {
+                               // Timeout, too many requests, or other server side failure
+                               // Do not retry when status code is 503, which means the keep server is full
+                               retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
                        }
                }
 
@@ -255,3 +266,30 @@ func (this *KeepClient) putReplicas(
 
        return locator, replicasDone, nil
 }
+
+func parseStorageClassesConfirmedHeader(hdr string) (map[string]int, error) {
+       if hdr == "" {
+               return nil, nil
+       }
+       classesStored := map[string]int{}
+       for _, cr := range strings.Split(hdr, ",") {
+               cr = strings.TrimSpace(cr)
+               if cr == "" {
+                       continue
+               }
+               fields := strings.SplitN(cr, "=", 2)
+               if len(fields) != 2 {
+                       return nil, fmt.Errorf("expected exactly one '=' char in entry %q", cr)
+               }
+               className := fields[0]
+               if className == "" {
+                       return nil, fmt.Errorf("empty class name in entry %q", cr)
+               }
+               replicas, err := strconv.Atoi(fields[1])
+               if err != nil || replicas < 1 {
+                       return nil, fmt.Errorf("invalid replica count %q", fields[1])
+               }
+               classesStored[className] = replicas
+       }
+       return classesStored, nil
+}