13382: Report storage class(es) in headers after successful write.
authorTom Clegg <tom@curii.com>
Thu, 25 Mar 2021 14:57:52 +0000 (10:57 -0400)
committerTom Clegg <tom@curii.com>
Thu, 1 Apr 2021 15:07:36 +0000 (11:07 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keepstore/handler_test.go
services/keepstore/handlers.go

index 17ed6402ce0d79ab4dc1bddf30e6b0315df7fa16..76cacc569532972664996a7d4a5eede9efc33323 100644 (file)
@@ -1113,7 +1113,7 @@ func (s *HandlerSuite) TestGetHandlerNoBufferLeak(c *check.C) {
        }
 }
 
-func (s *HandlerSuite) TestPutReplicationHeader(c *check.C) {
+func (s *HandlerSuite) TestPutResponseHeader(c *check.C) {
        c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
 
        resp := IssueRequest(s.handler, &RequestTester{
@@ -1121,10 +1121,9 @@ func (s *HandlerSuite) TestPutReplicationHeader(c *check.C) {
                uri:         "/" + TestHash,
                requestBody: TestBlock,
        })
-       if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
-               c.Logf("%#v", resp)
-               c.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
-       }
+       c.Logf("%#v", resp)
+       c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), check.Equals, "1")
+       c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), check.Equals, "default=1")
 }
 
 func (s *HandlerSuite) TestUntrashHandler(c *check.C) {
index eb0ea5ad2f133f3b8a569fa354255521f08c5965..a0e7fd0e0132fadef7ca8276b26b10a87df52c30 100644 (file)
@@ -259,7 +259,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+       result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
        bufs.Put(buf)
 
        if err != nil {
@@ -279,7 +279,8 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
                returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
        }
-       resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
+       resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
+       resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
        resp.Write([]byte(returnHash + "\n"))
 }
 
@@ -724,6 +725,42 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
        return 0, errorToCaller
 }
 
+type putResult struct {
+       totalReplication int
+       classReplication map[string]int
+}
+
+// Number of distinct replicas stored. "2" can mean the block was
+// stored on 2 different volumes with replication 1, or on 1 volume
+// with replication 2.
+func (pr putResult) TotalReplication() string {
+       return strconv.Itoa(pr.totalReplication)
+}
+
+// Number of replicas satisfying each storage class, formatted like
+// "default=2; special=1".
+func (pr putResult) ClassReplication() string {
+       s := ""
+       for k, v := range pr.classReplication {
+               if len(s) > 0 {
+                       s += ", "
+               }
+               s += k + "=" + strconv.Itoa(v)
+       }
+       return s
+}
+
+func newPutResult(mnt *VolumeMount) putResult {
+       result := putResult{
+               totalReplication: mnt.Replication,
+               classReplication: map[string]int{},
+       }
+       for class := range mnt.StorageClasses {
+               result.classReplication[class] += mnt.Replication
+       }
+       return result
+}
+
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
 //
 // PutBlock(ctx, block, hash)
@@ -751,23 +788,23 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
        log := ctxlog.FromContext(ctx)
 
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return 0, RequestHashError
+               return putResult{}, RequestHashError
        }
 
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
-               return n, err
+       if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
+               return result, err
        } else if ctx.Err() != nil {
-               return 0, ErrClientDisconnect
+               return putResult{}, ErrClientDisconnect
        }
 
        // Choose a Keep volume to write to.
@@ -776,28 +813,28 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
                if err := mnt.Put(ctx, hash, block); err != nil {
                        log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
                } else {
-                       return mnt.Replication, nil // success!
+                       return newPutResult(mnt), nil // success!
                }
        }
        if ctx.Err() != nil {
-               return 0, ErrClientDisconnect
+               return putResult{}, ErrClientDisconnect
        }
 
        writables := volmgr.AllWritable()
        if len(writables) == 0 {
                log.Error("no writable volumes")
-               return 0, FullError
+               return putResult{}, FullError
        }
 
        allFull := true
-       for _, vol := range writables {
-               err := vol.Put(ctx, hash, block)
+       for _, mnt := range writables {
+               err := mnt.Put(ctx, hash, block)
                if ctx.Err() != nil {
-                       return 0, ErrClientDisconnect
+                       return putResult{}, ErrClientDisconnect
                }
                switch err {
                case nil:
-                       return vol.Replication, nil // success!
+                       return newPutResult(mnt), nil // success!
                case FullError:
                        continue
                default:
@@ -805,16 +842,16 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
                        // write did not succeed.  Report the
                        // error and continue trying.
                        allFull = false
-                       log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
+                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
                }
        }
 
        if allFull {
                log.Error("all volumes are full")
-               return 0, FullError
+               return putResult{}, FullError
        }
        // Already logged the non-full errors.
-       return 0, GenericError
+       return putResult{}, GenericError
 }
 
 // CompareAndTouch returns the current replication level if one of the
@@ -822,13 +859,13 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
        log := ctxlog.FromContext(ctx)
        var bestErr error = NotFoundError
        for _, mnt := range volmgr.AllWritable() {
                err := mnt.Compare(ctx, hash, buf)
                if ctx.Err() != nil {
-                       return 0, ctx.Err()
+                       return putResult{}, ctx.Err()
                } else if err == CollisionError {
                        // Stop if we have a block with same hash but
                        // different content. (It will be impossible
@@ -836,7 +873,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // both, so there's no point writing it even
                        // on a different volume.)
                        log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
-                       return 0, err
+                       return putResult{}, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
                        // "normal" error: we don't log anything.
@@ -854,9 +891,9 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        continue
                }
                // Compare and Touch both worked --> done.
-               return mnt.Replication, nil
+               return newPutResult(mnt), nil
        }
-       return 0, bestErr
+       return putResult{}, bestErr
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)