From e73f90ddd2252f2b21d573ad2179137dca5dcd97 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 31 Mar 2021 16:56:14 -0400 Subject: [PATCH] 13382: Use caller-specified storage classes when writing. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- services/keepstore/handler_test.go | 69 +++++++++--- services/keepstore/handlers.go | 140 ++++++++++++++++++------- services/keepstore/proxy_remote.go | 2 +- services/keepstore/pull_worker.go | 2 +- services/keepstore/pull_worker_test.go | 14 +-- services/keepstore/status_test.go | 2 +- services/keepstore/volume_test.go | 2 +- 7 files changed, 172 insertions(+), 59 deletions(-) diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 76cacc5695..a9d7330c0b 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -22,6 +22,7 @@ import ( "net/http/httptest" "os" "regexp" + "sort" "strings" "time" @@ -71,10 +72,11 @@ func (s *HandlerSuite) SetUpTest(c *check.C) { // A RequestTester represents the parameters for an HTTP request to // be issued on behalf of a unit test. type RequestTester struct { - uri string - apiToken string - method string - requestBody []byte + uri string + apiToken string + method string + requestBody []byte + storageClasses string } // Test GetBlockHandler on the following situations: @@ -754,25 +756,25 @@ func (s *HandlerSuite) TestPullHandler(c *check.C) { var testcases = []pullTest{ { "Valid pull list from an ordinary user", - RequestTester{"/pull", userToken, "PUT", goodJSON}, + RequestTester{"/pull", userToken, "PUT", goodJSON, ""}, http.StatusUnauthorized, "Unauthorized\n", }, { "Invalid pull request from an ordinary user", - RequestTester{"/pull", userToken, "PUT", badJSON}, + RequestTester{"/pull", userToken, "PUT", badJSON, ""}, http.StatusUnauthorized, "Unauthorized\n", }, { "Valid pull request from the data manager", - RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON}, + RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON, ""}, http.StatusOK, "Received 3 pull requests\n", }, { "Invalid pull request from the data manager", - RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON}, + RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON, ""}, http.StatusBadRequest, "", }, @@ -866,25 +868,25 @@ func (s *HandlerSuite) TestTrashHandler(c *check.C) { var testcases = []trashTest{ { "Valid trash list from an ordinary user", - RequestTester{"/trash", userToken, "PUT", goodJSON}, + RequestTester{"/trash", userToken, "PUT", goodJSON, ""}, http.StatusUnauthorized, "Unauthorized\n", }, { "Invalid trash list from an ordinary user", - RequestTester{"/trash", userToken, "PUT", badJSON}, + RequestTester{"/trash", userToken, "PUT", badJSON, ""}, http.StatusUnauthorized, "Unauthorized\n", }, { "Valid trash list from the data manager", - RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON}, + RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON, ""}, http.StatusOK, "Received 3 trash requests\n", }, { "Invalid trash list from the data manager", - RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON}, + RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON, ""}, http.StatusBadRequest, "", }, @@ -921,6 +923,9 @@ func IssueRequest(handler http.Handler, rt *RequestTester) *httptest.ResponseRec if rt.apiToken != "" { req.Header.Set("Authorization", "OAuth2 "+rt.apiToken) } + if rt.storageClasses != "" { + req.Header.Set("X-Keep-Storage-Classes", rt.storageClasses) + } handler.ServeHTTP(response, req) return response } @@ -1113,6 +1118,46 @@ func (s *HandlerSuite) TestGetHandlerNoBufferLeak(c *check.C) { } } +func (s *HandlerSuite) TestPutStorageClasses(c *check.C) { + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "mock"}, // "default" is implicit + "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"special": true, "extra": true}}, + "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true}, + } + c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) + rt := RequestTester{ + method: "PUT", + uri: "/" + TestHash, + requestBody: TestBlock, + } + for _, trial := range []struct { + ask string + expect string + }{ + {"", ""}, + {"default", "default=1"}, + {"special", "extra=1, special=1"}, + {"extra, special", "extra=1, special=1"}, + {"default, special", "default=1, extra=1, special=1"}, + } { + c.Logf("%#v", trial) + rt.storageClasses = trial.ask + resp := IssueRequest(s.handler, &rt) + if trial.expect == "" { + // any non-empty value is correct + c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), check.Not(check.Equals), "") + } else { + c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), check.Equals, trial.expect) + } + } +} + +func sortCommaSeparated(s string) string { + slice := strings.Split(s, ", ") + sort.Strings(slice) + return strings.Join(slice, ", ") +} + func (s *HandlerSuite) TestPutResponseHeader(c *check.C) { c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index a0e7fd0e01..07c2946668 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -246,6 +246,14 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } + var wantStorageClasses []string + if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" { + wantStorageClasses = strings.Split(hdr, ",") + for i, sc := range wantStorageClasses { + wantStorageClasses[i] = strings.TrimSpace(sc) + } + } + buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength)) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) @@ -259,7 +267,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - result, err := PutBlock(ctx, rtr.volmgr, buf, hash) + result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses) bufs.Put(buf) if err != nil { @@ -726,8 +734,10 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b } type putResult struct { + classTodo map[string]bool + mountUsed map[*VolumeMount]bool totalReplication int - classReplication map[string]int + classDone map[string]int } // Number of distinct replicas stored. "2" can mean the block was @@ -741,7 +751,7 @@ func (pr putResult) TotalReplication() string { // "default=2; special=1". func (pr putResult) ClassReplication() string { s := "" - for k, v := range pr.classReplication { + for k, v := range pr.classDone { if len(s) > 0 { s += ", " } @@ -750,15 +760,51 @@ func (pr putResult) ClassReplication() string { return s } -func newPutResult(mnt *VolumeMount) putResult { - result := putResult{ - totalReplication: mnt.Replication, - classReplication: map[string]int{}, +func (pr *putResult) Add(mnt *VolumeMount) { + if pr.mountUsed[mnt] { + logrus.Warnf("BUG? superfluous extra write to mount %s", mnt) + return + } + pr.mountUsed[mnt] = true + pr.totalReplication += mnt.Replication + for class := range mnt.StorageClasses { + pr.classDone[class] += mnt.Replication + delete(pr.classTodo, class) + } +} + +func (pr *putResult) Done() bool { + return len(pr.classTodo) == 0 && pr.totalReplication > 0 +} + +func (pr *putResult) Want(mnt *VolumeMount) bool { + if pr.Done() || pr.mountUsed[mnt] { + return false + } + if len(pr.classTodo) == 0 { + // none specified == "any" + return true } for class := range mnt.StorageClasses { - result.classReplication[class] += mnt.Replication + if pr.classTodo[class] { + return true + } + } + return false +} + +func newPutResult(classes []string) putResult { + pr := putResult{ + classTodo: make(map[string]bool, len(classes)), + classDone: map[string]int{}, + mountUsed: map[*VolumeMount]bool{}, + } + for _, c := range classes { + if c != "" { + pr.classTodo[c] = true + } } - return result + return pr } // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep. @@ -788,7 +834,7 @@ func newPutResult(mnt *VolumeMount) putResult { // all writes failed). The text of the error message should // provide as much detail as possible. // -func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) { +func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putResult, error) { log := ctxlog.FromContext(ctx) // Check that BLOCK's checksum matches HASH. @@ -798,22 +844,28 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s return putResult{}, RequestHashError } + result := newPutResult(wantStorageClasses) + // If we already have this data, it's intact on disk, and we // can update its timestamp, return success. If we have // different data with the same hash, return failure. - if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError { + if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil { return result, err - } else if ctx.Err() != nil { - return putResult{}, ErrClientDisconnect + } + if ctx.Err() != nil { + return result, ErrClientDisconnect } // Choose a Keep volume to write to. // If this volume fails, try all of the volumes in order. - if mnt := volmgr.NextWritable(); mnt != nil { - if err := mnt.Put(ctx, hash, block); err != nil { - log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) - } else { - return newPutResult(mnt), nil // success! + if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) { + // fall through to "try all volumes" below + } else if err := mnt.Put(ctx, hash, block); err != nil { + log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) + } else { + result.Add(mnt) + if result.Done() { + return result, nil } } if ctx.Err() != nil { @@ -828,13 +880,20 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s allFull := true for _, mnt := range writables { + if !result.Want(mnt) { + continue + } err := mnt.Put(ctx, hash, block) if ctx.Err() != nil { - return putResult{}, ErrClientDisconnect + return result, ErrClientDisconnect } switch err { case nil: - return newPutResult(mnt), nil // success! + result.Add(mnt) + if result.Done() { + return result, nil + } + continue case FullError: continue default: @@ -846,26 +905,33 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s } } - if allFull { - log.Error("all volumes are full") + if result.totalReplication > 0 { + // Some, but not all, of the storage classes were + // satisfied. This qualifies as success. + return result, nil + } else if allFull { + log.Error("all volumes with qualifying storage classes are full") return putResult{}, FullError + } else { + // Already logged the non-full errors. + return putResult{}, GenericError } - // Already logged the non-full errors. - return putResult{}, GenericError } -// CompareAndTouch returns the current replication level if one of the -// volumes already has the given content and it successfully updates -// the relevant block's modification time in order to protect it from -// premature garbage collection. Otherwise, it returns a non-nil -// error. -func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) { +// CompareAndTouch looks for volumes where the given content already +// exists and its modification time can be updated (i.e., it is +// protected from garbage collection), and updates result accordingly. +// It returns when the result is Done() or all volumes have been +// checked. +func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putResult) error { log := ctxlog.FromContext(ctx) - var bestErr error = NotFoundError for _, mnt := range volmgr.AllWritable() { + if !result.Want(mnt) { + continue + } err := mnt.Compare(ctx, hash, buf) if ctx.Err() != nil { - return putResult{}, ctx.Err() + return nil } else if err == CollisionError { // Stop if we have a block with same hash but // different content. (It will be impossible @@ -873,7 +939,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, // both, so there's no point writing it even // on a different volume.) log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume) - return putResult{}, err + return CollisionError } else if os.IsNotExist(err) { // Block does not exist. This is the only // "normal" error: we don't log anything. @@ -887,13 +953,15 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, } if err := mnt.Touch(hash); err != nil { log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume) - bestErr = err continue } // Compare and Touch both worked --> done. - return newPutResult(mnt), nil + result.Add(mnt) + if result.Done() { + return nil + } } - return putResult{}, bestErr + return nil } var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`) diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go index 171dee3c4b..8c88a406f4 100644 --- a/services/keepstore/proxy_remote.go +++ b/services/keepstore/proxy_remote.go @@ -177,7 +177,7 @@ func (rrc *remoteResponseCacher) Close() error { rrc.ResponseWriter.Write(rrc.Buffer) return nil } - _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32]) + _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil) if rrc.Context.Err() != nil { // If caller hung up, log that instead of subsequent/misleading errors. http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout) diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go index 670fa1a414..57b9469244 100644 --- a/services/keepstore/pull_worker.go +++ b/services/keepstore/pull_worker.go @@ -89,6 +89,6 @@ var writePulledBlock = func(volmgr *RRVolumeManager, volume Volume, data []byte, if volume != nil { return volume.Put(context.Background(), locator, data) } - _, err := PutBlock(context.Background(), volmgr, data, locator) + _, err := PutBlock(context.Background(), volmgr, data, locator, nil) return err } diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index dfc94b6760..2013c8c04d 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -136,7 +136,7 @@ func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) { func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) { testData := PullWorkerTestData{ name: "TestPullWorkerPullList_with_two_locators", - req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 2 pull requests\n", readContent: "hello", @@ -150,7 +150,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) { func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) { testData := PullWorkerTestData{ name: "TestPullWorkerPullList_with_one_locator", - req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 1 pull requests\n", readContent: "hola", @@ -164,7 +164,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) { testData := PullWorkerTestData{ name: "TestPullWorker_error_on_get_one_locator", - req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 1 pull requests\n", readContent: "unused", @@ -178,7 +178,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) { testData := PullWorkerTestData{ name: "TestPullWorker_error_on_get_two_locators", - req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 2 pull requests\n", readContent: "unused", @@ -192,7 +192,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) { testData := PullWorkerTestData{ name: "TestPullWorker_error_on_put_one_locator", - req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 1 pull requests\n", readContent: "hello hello", @@ -206,7 +206,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) { testData := PullWorkerTestData{ name: "TestPullWorker_error_on_put_two_locators", - req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList}, + req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""}, responseCode: http.StatusOK, responseBody: "Received 2 pull requests\n", readContent: "hello again", @@ -221,7 +221,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) { func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) { testData := PullWorkerTestData{ name: "TestPullWorkerPullList_with_two_locators", - req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList}, + req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList, ""}, responseCode: http.StatusUnauthorized, responseBody: "Unauthorized\n", readContent: "hello", diff --git a/services/keepstore/status_test.go b/services/keepstore/status_test.go index 7bff2584e5..cafe9f72ff 100644 --- a/services/keepstore/status_test.go +++ b/services/keepstore/status_test.go @@ -15,7 +15,7 @@ import ( // getStatusItem("foo","bar","baz") retrieves /status.json, decodes // the response body into resp, and returns resp["foo"]["bar"]["baz"]. func getStatusItem(h *handler, keys ...string) interface{} { - resp := IssueRequest(h, &RequestTester{"/status.json", "", "GET", nil}) + resp := IssueRequest(h, &RequestTester{"/status.json", "", "GET", nil, ""}) var s interface{} json.NewDecoder(resp.Body).Decode(&s) for _, k := range keys { diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go index 2de21edde6..cc2d21e5a9 100644 --- a/services/keepstore/volume_test.go +++ b/services/keepstore/volume_test.go @@ -148,7 +148,7 @@ func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error } return nil } else { - return NotFoundError + return os.ErrNotExist } } -- 2.30.2