13382: Report storage class(es) in headers after successful write.
[arvados.git] / services / keepstore / handlers.go
index eb0ea5ad2f133f3b8a569fa354255521f08c5965..a0e7fd0e0132fadef7ca8276b26b10a87df52c30 100644 (file)
@@ -259,7 +259,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+       result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
        bufs.Put(buf)
 
        if err != nil {
@@ -279,7 +279,8 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
                returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
        }
-       resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
+       resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
+       resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
        resp.Write([]byte(returnHash + "\n"))
 }
 
@@ -724,6 +725,42 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
        return 0, errorToCaller
 }
 
+type putResult struct {
+       totalReplication int
+       classReplication map[string]int
+}
+
+// Number of distinct replicas stored. "2" can mean the block was
+// stored on 2 different volumes with replication 1, or on 1 volume
+// with replication 2.
+func (pr putResult) TotalReplication() string {
+       return strconv.Itoa(pr.totalReplication)
+}
+
+// Number of replicas satisfying each storage class, formatted like
+// "default=2; special=1".
+func (pr putResult) ClassReplication() string {
+       s := ""
+       for k, v := range pr.classReplication {
+               if len(s) > 0 {
+                       s += ", "
+               }
+               s += k + "=" + strconv.Itoa(v)
+       }
+       return s
+}
+
+func newPutResult(mnt *VolumeMount) putResult {
+       result := putResult{
+               totalReplication: mnt.Replication,
+               classReplication: map[string]int{},
+       }
+       for class := range mnt.StorageClasses {
+               result.classReplication[class] += mnt.Replication
+       }
+       return result
+}
+
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
 //
 // PutBlock(ctx, block, hash)
@@ -751,23 +788,23 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
        log := ctxlog.FromContext(ctx)
 
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return 0, RequestHashError
+               return putResult{}, RequestHashError
        }
 
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
-               return n, err
+       if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
+               return result, err
        } else if ctx.Err() != nil {
-               return 0, ErrClientDisconnect
+               return putResult{}, ErrClientDisconnect
        }
 
        // Choose a Keep volume to write to.
@@ -776,28 +813,28 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
                if err := mnt.Put(ctx, hash, block); err != nil {
                        log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
                } else {
-                       return mnt.Replication, nil // success!
+                       return newPutResult(mnt), nil // success!
                }
        }
        if ctx.Err() != nil {
-               return 0, ErrClientDisconnect
+               return putResult{}, ErrClientDisconnect
        }
 
        writables := volmgr.AllWritable()
        if len(writables) == 0 {
                log.Error("no writable volumes")
-               return 0, FullError
+               return putResult{}, FullError
        }
 
        allFull := true
-       for _, vol := range writables {
-               err := vol.Put(ctx, hash, block)
+       for _, mnt := range writables {
+               err := mnt.Put(ctx, hash, block)
                if ctx.Err() != nil {
-                       return 0, ErrClientDisconnect
+                       return putResult{}, ErrClientDisconnect
                }
                switch err {
                case nil:
-                       return vol.Replication, nil // success!
+                       return newPutResult(mnt), nil // success!
                case FullError:
                        continue
                default:
@@ -805,16 +842,16 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
                        // write did not succeed.  Report the
                        // error and continue trying.
                        allFull = false
-                       log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
+                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
                }
        }
 
        if allFull {
                log.Error("all volumes are full")
-               return 0, FullError
+               return putResult{}, FullError
        }
        // Already logged the non-full errors.
-       return 0, GenericError
+       return putResult{}, GenericError
 }
 
 // CompareAndTouch returns the current replication level if one of the
@@ -822,13 +859,13 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
        log := ctxlog.FromContext(ctx)
        var bestErr error = NotFoundError
        for _, mnt := range volmgr.AllWritable() {
                err := mnt.Compare(ctx, hash, buf)
                if ctx.Err() != nil {
-                       return 0, ctx.Err()
+                       return putResult{}, ctx.Err()
                } else if err == CollisionError {
                        // Stop if we have a block with same hash but
                        // different content. (It will be impossible
@@ -836,7 +873,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // both, so there's no point writing it even
                        // on a different volume.)
                        log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
-                       return 0, err
+                       return putResult{}, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
                        // "normal" error: we don't log anything.
@@ -854,9 +891,9 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        continue
                }
                // Compare and Touch both worked --> done.
-               return mnt.Replication, nil
+               return newPutResult(mnt), nil
        }
-       return 0, bestErr
+       return putResult{}, bestErr
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)