13382: Use caller-specified storage classes when writing.
authorTom Clegg <tom@curii.com>
Wed, 31 Mar 2021 20:56:14 +0000 (16:56 -0400)
committerTom Clegg <tom@curii.com>
Thu, 1 Apr 2021 15:07:36 +0000 (11:07 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/proxy_remote.go
services/keepstore/pull_worker.go
services/keepstore/pull_worker_test.go
services/keepstore/status_test.go
services/keepstore/volume_test.go

index 76cacc569532972664996a7d4a5eede9efc33323..a9d7330c0be4fdbe781fef654a91b7c53d93d194 100644 (file)
@@ -22,6 +22,7 @@ import (
        "net/http/httptest"
        "os"
        "regexp"
+       "sort"
        "strings"
        "time"
 
@@ -71,10 +72,11 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
 // A RequestTester represents the parameters for an HTTP request to
 // be issued on behalf of a unit test.
 type RequestTester struct {
-       uri         string
-       apiToken    string
-       method      string
-       requestBody []byte
+       uri            string
+       apiToken       string
+       method         string
+       requestBody    []byte
+       storageClasses string
 }
 
 // Test GetBlockHandler on the following situations:
@@ -754,25 +756,25 @@ func (s *HandlerSuite) TestPullHandler(c *check.C) {
        var testcases = []pullTest{
                {
                        "Valid pull list from an ordinary user",
-                       RequestTester{"/pull", userToken, "PUT", goodJSON},
+                       RequestTester{"/pull", userToken, "PUT", goodJSON, ""},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
                        "Invalid pull request from an ordinary user",
-                       RequestTester{"/pull", userToken, "PUT", badJSON},
+                       RequestTester{"/pull", userToken, "PUT", badJSON, ""},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
                        "Valid pull request from the data manager",
-                       RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON},
+                       RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON, ""},
                        http.StatusOK,
                        "Received 3 pull requests\n",
                },
                {
                        "Invalid pull request from the data manager",
-                       RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON},
+                       RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON, ""},
                        http.StatusBadRequest,
                        "",
                },
@@ -866,25 +868,25 @@ func (s *HandlerSuite) TestTrashHandler(c *check.C) {
        var testcases = []trashTest{
                {
                        "Valid trash list from an ordinary user",
-                       RequestTester{"/trash", userToken, "PUT", goodJSON},
+                       RequestTester{"/trash", userToken, "PUT", goodJSON, ""},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
                        "Invalid trash list from an ordinary user",
-                       RequestTester{"/trash", userToken, "PUT", badJSON},
+                       RequestTester{"/trash", userToken, "PUT", badJSON, ""},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
                        "Valid trash list from the data manager",
-                       RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON},
+                       RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON, ""},
                        http.StatusOK,
                        "Received 3 trash requests\n",
                },
                {
                        "Invalid trash list from the data manager",
-                       RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON},
+                       RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON, ""},
                        http.StatusBadRequest,
                        "",
                },
@@ -921,6 +923,9 @@ func IssueRequest(handler http.Handler, rt *RequestTester) *httptest.ResponseRec
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
+       if rt.storageClasses != "" {
+               req.Header.Set("X-Keep-Storage-Classes", rt.storageClasses)
+       }
        handler.ServeHTTP(response, req)
        return response
 }
@@ -1113,6 +1118,46 @@ func (s *HandlerSuite) TestGetHandlerNoBufferLeak(c *check.C) {
        }
 }
 
+func (s *HandlerSuite) TestPutStorageClasses(c *check.C) {
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "mock"}, // "default" is implicit
+               "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"special": true, "extra": true}},
+               "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
+       }
+       c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+       rt := RequestTester{
+               method:      "PUT",
+               uri:         "/" + TestHash,
+               requestBody: TestBlock,
+       }
+       for _, trial := range []struct {
+               ask    string
+               expect string
+       }{
+               {"", ""},
+               {"default", "default=1"},
+               {"special", "extra=1, special=1"},
+               {"extra, special", "extra=1, special=1"},
+               {"default, special", "default=1, extra=1, special=1"},
+       } {
+               c.Logf("%#v", trial)
+               rt.storageClasses = trial.ask
+               resp := IssueRequest(s.handler, &rt)
+               if trial.expect == "" {
+                       // any non-empty value is correct
+                       c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), check.Not(check.Equals), "")
+               } else {
+                       c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), check.Equals, trial.expect)
+               }
+       }
+}
+
+func sortCommaSeparated(s string) string {
+       slice := strings.Split(s, ", ")
+       sort.Strings(slice)
+       return strings.Join(slice, ", ")
+}
+
 func (s *HandlerSuite) TestPutResponseHeader(c *check.C) {
        c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
 
index a0e7fd0e0132fadef7ca8276b26b10a87df52c30..07c2946668836dd2c03fb05d14a37da477740ce6 100644 (file)
@@ -246,6 +246,14 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
+       var wantStorageClasses []string
+       if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
+               wantStorageClasses = strings.Split(hdr, ",")
+               for i, sc := range wantStorageClasses {
+                       wantStorageClasses[i] = strings.TrimSpace(sc)
+               }
+       }
+
        buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
@@ -259,7 +267,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+       result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
        bufs.Put(buf)
 
        if err != nil {
@@ -726,8 +734,10 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 }
 
 type putResult struct {
+       classTodo        map[string]bool
+       mountUsed        map[*VolumeMount]bool
        totalReplication int
-       classReplication map[string]int
+       classDone        map[string]int
 }
 
 // Number of distinct replicas stored. "2" can mean the block was
@@ -741,7 +751,7 @@ func (pr putResult) TotalReplication() string {
 // "default=2; special=1".
 func (pr putResult) ClassReplication() string {
        s := ""
-       for k, v := range pr.classReplication {
+       for k, v := range pr.classDone {
                if len(s) > 0 {
                        s += ", "
                }
@@ -750,15 +760,51 @@ func (pr putResult) ClassReplication() string {
        return s
 }
 
-func newPutResult(mnt *VolumeMount) putResult {
-       result := putResult{
-               totalReplication: mnt.Replication,
-               classReplication: map[string]int{},
+func (pr *putResult) Add(mnt *VolumeMount) {
+       if pr.mountUsed[mnt] {
+               logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
+               return
+       }
+       pr.mountUsed[mnt] = true
+       pr.totalReplication += mnt.Replication
+       for class := range mnt.StorageClasses {
+               pr.classDone[class] += mnt.Replication
+               delete(pr.classTodo, class)
+       }
+}
+
+func (pr *putResult) Done() bool {
+       return len(pr.classTodo) == 0 && pr.totalReplication > 0
+}
+
+func (pr *putResult) Want(mnt *VolumeMount) bool {
+       if pr.Done() || pr.mountUsed[mnt] {
+               return false
+       }
+       if len(pr.classTodo) == 0 {
+               // none specified == "any"
+               return true
        }
        for class := range mnt.StorageClasses {
-               result.classReplication[class] += mnt.Replication
+               if pr.classTodo[class] {
+                       return true
+               }
+       }
+       return false
+}
+
+func newPutResult(classes []string) putResult {
+       pr := putResult{
+               classTodo: make(map[string]bool, len(classes)),
+               classDone: map[string]int{},
+               mountUsed: map[*VolumeMount]bool{},
+       }
+       for _, c := range classes {
+               if c != "" {
+                       pr.classTodo[c] = true
+               }
        }
-       return result
+       return pr
 }
 
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
@@ -788,7 +834,7 @@ func newPutResult(mnt *VolumeMount) putResult {
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putResult, error) {
        log := ctxlog.FromContext(ctx)
 
        // Check that BLOCK's checksum matches HASH.
@@ -798,22 +844,28 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
                return putResult{}, RequestHashError
        }
 
+       result := newPutResult(wantStorageClasses)
+
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
+       if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
                return result, err
-       } else if ctx.Err() != nil {
-               return putResult{}, ErrClientDisconnect
+       }
+       if ctx.Err() != nil {
+               return result, ErrClientDisconnect
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
-       if mnt := volmgr.NextWritable(); mnt != nil {
-               if err := mnt.Put(ctx, hash, block); err != nil {
-                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
-               } else {
-                       return newPutResult(mnt), nil // success!
+       if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
+               // fall through to "try all volumes" below
+       } else if err := mnt.Put(ctx, hash, block); err != nil {
+               log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+       } else {
+               result.Add(mnt)
+               if result.Done() {
+                       return result, nil
                }
        }
        if ctx.Err() != nil {
@@ -828,13 +880,20 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 
        allFull := true
        for _, mnt := range writables {
+               if !result.Want(mnt) {
+                       continue
+               }
                err := mnt.Put(ctx, hash, block)
                if ctx.Err() != nil {
-                       return putResult{}, ErrClientDisconnect
+                       return result, ErrClientDisconnect
                }
                switch err {
                case nil:
-                       return newPutResult(mnt), nil // success!
+                       result.Add(mnt)
+                       if result.Done() {
+                               return result, nil
+                       }
+                       continue
                case FullError:
                        continue
                default:
@@ -846,26 +905,33 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
                }
        }
 
-       if allFull {
-               log.Error("all volumes are full")
+       if result.totalReplication > 0 {
+               // Some, but not all, of the storage classes were
+               // satisfied. This qualifies as success.
+               return result, nil
+       } else if allFull {
+               log.Error("all volumes with qualifying storage classes are full")
                return putResult{}, FullError
+       } else {
+               // Already logged the non-full errors.
+               return putResult{}, GenericError
        }
-       // Already logged the non-full errors.
-       return putResult{}, GenericError
 }
 
-// CompareAndTouch returns the current replication level if one of the
-// volumes already has the given content and it successfully updates
-// the relevant block's modification time in order to protect it from
-// premature garbage collection. Otherwise, it returns a non-nil
-// error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
+// CompareAndTouch looks for volumes where the given content already
+// exists and its modification time can be updated (i.e., it is
+// protected from garbage collection), and updates result accordingly.
+// It returns when the result is Done() or all volumes have been
+// checked.
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putResult) error {
        log := ctxlog.FromContext(ctx)
-       var bestErr error = NotFoundError
        for _, mnt := range volmgr.AllWritable() {
+               if !result.Want(mnt) {
+                       continue
+               }
                err := mnt.Compare(ctx, hash, buf)
                if ctx.Err() != nil {
-                       return putResult{}, ctx.Err()
+                       return nil
                } else if err == CollisionError {
                        // Stop if we have a block with same hash but
                        // different content. (It will be impossible
@@ -873,7 +939,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // both, so there's no point writing it even
                        // on a different volume.)
                        log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
-                       return putResult{}, err
+                       return CollisionError
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
                        // "normal" error: we don't log anything.
@@ -887,13 +953,15 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                }
                if err := mnt.Touch(hash); err != nil {
                        log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
-                       bestErr = err
                        continue
                }
                // Compare and Touch both worked --> done.
-               return newPutResult(mnt), nil
+               result.Add(mnt)
+               if result.Done() {
+                       return nil
+               }
        }
-       return putResult{}, bestErr
+       return nil
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
index 171dee3c4bb1c5ed15c21885599f0452251cbabc..8c88a406f4f39fb998be7edcab8cf16d598caa3f 100644 (file)
@@ -177,7 +177,7 @@ func (rrc *remoteResponseCacher) Close() error {
                rrc.ResponseWriter.Write(rrc.Buffer)
                return nil
        }
-       _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32])
+       _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil)
        if rrc.Context.Err() != nil {
                // If caller hung up, log that instead of subsequent/misleading errors.
                http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout)
index 670fa1a4140fc14229279d1ff920d76959679afd..57b9469244d8e101193185bd190d6db9ce37f853 100644 (file)
@@ -89,6 +89,6 @@ var writePulledBlock = func(volmgr *RRVolumeManager, volume Volume, data []byte,
        if volume != nil {
                return volume.Put(context.Background(), locator, data)
        }
-       _, err := PutBlock(context.Background(), volmgr, data, locator)
+       _, err := PutBlock(context.Background(), volmgr, data, locator, nil)
        return err
 }
index dfc94b67608235cb77bf3191e82f80016f5c8ae9..2013c8c04da76ffbf8e22f85d501ecf4710cbfdc 100644 (file)
@@ -136,7 +136,7 @@ func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
-               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "hello",
@@ -150,7 +150,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_one_locator",
-               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hola",
@@ -164,7 +164,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_one_locator",
-               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "unused",
@@ -178,7 +178,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_two_locators",
-               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "unused",
@@ -192,7 +192,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_one_locator",
-               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hello hello",
@@ -206,7 +206,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_two_locators",
-               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "hello again",
@@ -221,7 +221,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
-               req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
+               req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList, ""},
                responseCode: http.StatusUnauthorized,
                responseBody: "Unauthorized\n",
                readContent:  "hello",
index 7bff2584e5ae30529c7fb79dcccc6c95156521e2..cafe9f72ff0e0eaf3ec7b1da842c5ac083ca4028 100644 (file)
@@ -15,7 +15,7 @@ import (
 // getStatusItem("foo","bar","baz") retrieves /status.json, decodes
 // the response body into resp, and returns resp["foo"]["bar"]["baz"].
 func getStatusItem(h *handler, keys ...string) interface{} {
-       resp := IssueRequest(h, &RequestTester{"/status.json", "", "GET", nil})
+       resp := IssueRequest(h, &RequestTester{"/status.json", "", "GET", nil, ""})
        var s interface{}
        json.NewDecoder(resp.Body).Decode(&s)
        for _, k := range keys {
index 2de21edde6708faa3aff96ef32f2b61f191a9775..cc2d21e5a94fc30669dc808e6e6bade48613672a 100644 (file)
@@ -148,7 +148,7 @@ func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error
                }
                return nil
        } else {
-               return NotFoundError
+               return os.ErrNotExist
        }
 }