17392: Ensure requested storage classes are satisfied on write.
authorTom Clegg <tom@curii.com>
Tue, 6 Apr 2021 15:28:14 +0000 (11:28 -0400)
committerTom Clegg <tom@curii.com>
Tue, 6 Apr 2021 15:28:14 +0000 (11:28 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go

index 21913ff967c79f2a56058f79e3688f42acc00e2d..4541812651336096506b9a89da1f69b28ec3bd2a 100644 (file)
@@ -83,8 +83,12 @@ var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is
 // ErrIncompleteIndex is returned when the Index response does not end with a new empty line
 var ErrIncompleteIndex = errors.New("Got incomplete index")
 
-const XKeepDesiredReplicas = "X-Keep-Desired-Replicas"
-const XKeepReplicasStored = "X-Keep-Replicas-Stored"
+const (
+       XKeepDesiredReplicas         = "X-Keep-Desired-Replicas"
+       XKeepReplicasStored          = "X-Keep-Replicas-Stored"
+       XKeepStorageClasses          = "X-Keep-Storage-Classes"
+       XKeepStorageClassesConfirmed = "X-Keep-Storage-Classes-Confirmed"
+)
 
 type HTTPClient interface {
        Do(*http.Request) (*http.Response, error)
index 57a89b50aa74362fcd4c5a229db2312b5870b249..94cfece8c5e3209a89c3c2b8d95ec797fc847b3b 100644 (file)
@@ -95,12 +95,13 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
 }
 
 type StubPutHandler struct {
-       c                  *C
-       expectPath         string
-       expectAPIToken     string
-       expectBody         string
-       expectStorageClass string
-       handled            chan string
+       c                    *C
+       expectPath           string
+       expectAPIToken       string
+       expectBody           string
+       expectStorageClass   string
+       returnStorageClasses string
+       handled              chan string
 }
 
 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
@@ -110,6 +111,10 @@ func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
        body, err := ioutil.ReadAll(req.Body)
        sph.c.Check(err, Equals, nil)
        sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
+       resp.Header().Set("X-Keep-Replicas-Stored", "1")
+       if sph.returnStorageClasses != "" {
+               resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
+       }
        resp.WriteHeader(200)
        sph.handled <- fmt.Sprintf("http://%s", req.Host)
 }
@@ -152,20 +157,19 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
                "acbd18db4cc2f85cedef654fccc4a4d8",
                "abc123",
                "foo",
-               "hot",
+               "", "default=1",
                make(chan string)}
 
        UploadToStubHelper(c, st,
                func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
-                       kc.StorageClasses = []string{"hot"}
-                       go kc.uploadToKeepServer(url, st.expectPath, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+                       go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
 
                        writer.Write([]byte("foo"))
                        writer.Close()
 
                        <-st.handled
                        status := <-uploadStatusChan
-                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
+                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
                })
 }
 
@@ -175,20 +179,53 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                "acbd18db4cc2f85cedef654fccc4a4d8",
                "abc123",
                "foo",
-               "",
+               "", "default=1",
                make(chan string)}
 
        UploadToStubHelper(c, st,
                func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
-                       go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
+                       go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
 
                        <-st.handled
 
                        status := <-uploadStatusChan
-                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
+                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
                })
 }
 
+func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
+       for _, trial := range []struct {
+               respHeader string
+               expectMap  map[string]int
+       }{
+               {"", nil},
+               {"foo=1", map[string]int{"foo": 1}},
+               {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
+               {" =foo=1 ", nil},
+               {"foo", nil},
+       } {
+               st := StubPutHandler{
+                       c,
+                       "acbd18db4cc2f85cedef654fccc4a4d8",
+                       "abc123",
+                       "foo",
+                       "", trial.respHeader,
+                       make(chan string)}
+
+               UploadToStubHelper(c, st,
+                       func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
+                               go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+
+                               writer.Write([]byte("foo"))
+                               writer.Close()
+
+                               <-st.handled
+                               status := <-uploadStatusChan
+                               c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
+                       })
+       }
+}
+
 type FailHandler struct {
        handled chan string
 }
@@ -235,7 +272,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
                func(kc *KeepClient, url string, reader io.ReadCloser,
                        writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
 
-                       go kc.uploadToKeepServer(url, hash, reader, uploadStatusChan, 3, kc.getRequestID())
+                       go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
 
                        writer.Write([]byte("foo"))
                        writer.Close()
@@ -272,6 +309,7 @@ func (s *StandaloneSuite) TestPutB(c *C) {
                "abc123",
                "foo",
                "",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -314,6 +352,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
                "abc123",
                "foo",
                "",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -363,6 +402,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
                "abc123",
                "foo",
                "",
+               "",
                make(chan string, 4)}
 
        fh := FailHandler{
@@ -423,6 +463,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
                "abc123",
                "foo",
                "",
+               "",
                make(chan string, 1)}
 
        fh := FailHandler{
@@ -1031,6 +1072,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
                "abc123",
                "foo",
                "",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1070,6 +1112,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
                "abc123",
                "foo",
                "",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1246,6 +1289,7 @@ func (s *StandaloneSuite) TestPutBRetry(c *C) {
                        "abc123",
                        "foo",
                        "",
+                       "",
                        make(chan string, 5)}}
 
        arv, _ := arvadosclient.MakeArvadosClient()
index 3b1afe1e288cdec5746cc69f167d10d89b57361b..7b2e47ff8042e379c1ac01825f4060011c81b3f9 100644 (file)
@@ -13,6 +13,7 @@ import (
        "log"
        "net/http"
        "os"
+       "strconv"
        "strings"
 
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -52,10 +53,11 @@ type uploadStatus struct {
        url            string
        statusCode     int
        replicasStored int
+       classesStored  map[string]int
        response       string
 }
 
-func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
+func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo []string, body io.Reader,
        uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) {
 
        var req *http.Request
@@ -63,7 +65,7 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
        var url = fmt.Sprintf("%s/%s", host, hash)
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
                DebugPrintf("DEBUG: [%s] Error creating request PUT %v error: %v", reqid, url, err.Error())
-               uploadStatusChan <- uploadStatus{err, url, 0, 0, ""}
+               uploadStatusChan <- uploadStatus{err, url, 0, 0, nil, ""}
                return
        }
 
@@ -80,14 +82,14 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
        req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
        req.Header.Add("Content-Type", "application/octet-stream")
        req.Header.Add(XKeepDesiredReplicas, fmt.Sprint(kc.Want_replicas))
-       if len(kc.StorageClasses) > 0 {
-               req.Header.Add("X-Keep-Storage-Classes", strings.Join(kc.StorageClasses, ", "))
+       if len(classesTodo) > 0 {
+               req.Header.Add(XKeepStorageClasses, strings.Join(classesTodo, ", "))
        }
 
        var resp *http.Response
        if resp, err = kc.httpClient().Do(req); err != nil {
                DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error())
-               uploadStatusChan <- uploadStatus{err, url, 0, 0, err.Error()}
+               uploadStatusChan <- uploadStatus{err, url, 0, 0, nil, err.Error()}
                return
        }
 
@@ -95,6 +97,11 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
        if xr := resp.Header.Get(XKeepReplicasStored); xr != "" {
                fmt.Sscanf(xr, "%d", &rep)
        }
+       scc := resp.Header.Get(XKeepStorageClassesConfirmed)
+       classesStored, err := parseStorageClassesConfirmedHeader(scc)
+       if err != nil {
+               DebugPrintf("DEBUG: [%s] Ignoring invalid %s header %q: %s", reqid, XKeepStorageClassesConfirmed, scc, err)
+       }
 
        defer resp.Body.Close()
        defer io.Copy(ioutil.Discard, resp.Body)
@@ -103,16 +110,16 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
        response := strings.TrimSpace(string(respbody))
        if err2 != nil && err2 != io.EOF {
                DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, err2.Error(), response)
-               uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, response}
+               uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, classesStored, response}
        } else if resp.StatusCode == http.StatusOK {
                DebugPrintf("DEBUG: [%s] Upload %v success", reqid, url)
-               uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, response}
+               uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, classesStored, response}
        } else {
                if resp.StatusCode >= 300 && response == "" {
                        response = resp.Status
                }
                DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, resp.StatusCode, response)
-               uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
+               uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, classesStored, response}
        }
 }
 
@@ -146,30 +153,55 @@ func (kc *KeepClient) putReplicas(
                }()
        }()
 
+       replicasWanted := kc.Want_replicas
+       replicasTodo := map[string]int{}
+       for _, c := range kc.StorageClasses {
+               replicasTodo[c] = replicasWanted
+       }
        replicasDone := 0
-       replicasTodo := kc.Want_replicas
 
        replicasPerThread := kc.replicasPerService
        if replicasPerThread < 1 {
                // unlimited or unknown
-               replicasPerThread = replicasTodo
+               replicasPerThread = replicasWanted
        }
 
        retriesRemaining := 1 + kc.Retries
        var retryServers []string
 
        lastError := make(map[string]string)
+       trackingClasses := len(replicasTodo) > 0
 
        for retriesRemaining > 0 {
                retriesRemaining--
                nextServer = 0
                retryServers = []string{}
-               for replicasTodo > 0 {
-                       for active*replicasPerThread < replicasTodo {
+               for {
+                       var classesTodo []string
+                       var maxConcurrency int
+                       for sc, r := range replicasTodo {
+                               classesTodo = append(classesTodo, sc)
+                               if maxConcurrency == 0 || maxConcurrency > r {
+                                       // Having more than r
+                                       // writes in flight
+                                       // would overreplicate
+                                       // class sc.
+                                       maxConcurrency = r
+                               }
+                       }
+                       if !trackingClasses {
+                               maxConcurrency = replicasWanted - replicasDone
+                       }
+                       if maxConcurrency < 1 {
+                               // If there are no non-zero entries in
+                               // replicasTodo, we're done.
+                               break
+                       }
+                       for active*replicasPerThread < maxConcurrency {
                                // Start some upload requests
                                if nextServer < len(sv) {
                                        DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer])
-                                       go kc.uploadToKeepServer(sv[nextServer], hash, getReader(), uploadStatusChan, expectedLength, reqid)
+                                       go kc.uploadToKeepServer(sv[nextServer], hash, classesTodo, getReader(), uploadStatusChan, expectedLength, reqid)
                                        nextServer++
                                        active++
                                } else {
@@ -184,36 +216,48 @@ func (kc *KeepClient) putReplicas(
                                        break
                                }
                        }
-                       DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v",
-                               reqid, replicasTodo, active)
-
-                       // Now wait for something to happen.
-                       if active > 0 {
-                               status := <-uploadStatusChan
-                               active--
-
-                               if status.statusCode == 200 {
-                                       // good news!
-                                       replicasDone += status.replicasStored
-                                       replicasTodo -= status.replicasStored
-                                       locator = status.response
-                                       delete(lastError, status.url)
-                               } else {
-                                       msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
-                                       if len(msg) > 100 {
-                                               msg = msg[:100]
-                                       }
-                                       lastError[status.url] = msg
-                               }
 
-                               if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
-                                       (status.statusCode >= 500 && status.statusCode != 503) {
-                                       // Timeout, too many requests, or other server side failure
-                                       // Do not retry when status code is 503, which means the keep server is full
-                                       retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
+                       DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", reqid, replicasTodo, active)
+                       if active < 1 {
+                               break
+                       }
+
+                       // Wait for something to happen.
+                       status := <-uploadStatusChan
+                       active--
+
+                       if status.statusCode == http.StatusOK {
+                               delete(lastError, status.url)
+                               replicasDone += status.replicasStored
+                               if len(status.classesStored) == 0 {
+                                       // Server doesn't report
+                                       // storage classes. Give up
+                                       // trying to track which ones
+                                       // are satisfied; just rely on
+                                       // total # replicas.
+                                       trackingClasses = false
                                }
+                               for className, replicas := range status.classesStored {
+                                       if replicasTodo[className] > replicas {
+                                               replicasTodo[className] -= replicas
+                                       } else {
+                                               delete(replicasTodo, className)
+                                       }
+                               }
+                               locator = status.response
                        } else {
-                               break
+                               msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
+                               if len(msg) > 100 {
+                                       msg = msg[:100]
+                               }
+                               lastError[status.url] = msg
+                       }
+
+                       if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+                               (status.statusCode >= 500 && status.statusCode != 503) {
+                               // Timeout, too many requests, or other server side failure
+                               // Do not retry when status code is 503, which means the keep server is full
+                               retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
                        }
                }
 
@@ -222,3 +266,30 @@ func (kc *KeepClient) putReplicas(
 
        return locator, replicasDone, nil
 }
+
+func parseStorageClassesConfirmedHeader(hdr string) (map[string]int, error) {
+       if hdr == "" {
+               return nil, nil
+       }
+       classesStored := map[string]int{}
+       for _, cr := range strings.Split(hdr, ",") {
+               cr = strings.TrimSpace(cr)
+               if cr == "" {
+                       continue
+               }
+               fields := strings.SplitN(cr, "=", 2)
+               if len(fields) != 2 {
+                       return nil, fmt.Errorf("expected exactly one '=' char in entry %q", cr)
+               }
+               className := fields[0]
+               if className == "" {
+                       return nil, fmt.Errorf("empty class name in entry %q", cr)
+               }
+               replicas, err := strconv.Atoi(fields[1])
+               if err != nil || replicas < 1 {
+                       return nil, fmt.Errorf("invalid replica count %q", fields[1])
+               }
+               classesStored[className] = replicas
+       }
+       return classesStored, nil
+}