return
}
- replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+ result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
bufs.Put(buf)
if err != nil {
expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
}
- resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
+ resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
+ resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
resp.Write([]byte(returnHash + "\n"))
}
return 0, errorToCaller
}
+type putResult struct {
+ totalReplication int
+ classReplication map[string]int
+}
+
+// Number of distinct replicas stored. "2" can mean the block was
+// stored on 2 different volumes with replication 1, or on 1 volume
+// with replication 2.
+func (pr putResult) TotalReplication() string {
+ return strconv.Itoa(pr.totalReplication)
+}
+
+// Number of replicas satisfying each storage class, formatted like
+// "default=2; special=1".
+func (pr putResult) ClassReplication() string {
+ s := ""
+ for k, v := range pr.classReplication {
+ if len(s) > 0 {
+ s += ", "
+ }
+ s += k + "=" + strconv.Itoa(v)
+ }
+ return s
+}
+
+func newPutResult(mnt *VolumeMount) putResult {
+ result := putResult{
+ totalReplication: mnt.Replication,
+ classReplication: map[string]int{},
+ }
+ for class := range mnt.StorageClasses {
+ result.classReplication[class] += mnt.Replication
+ }
+ return result
+}
+
// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
//
// PutBlock(ctx, block, hash)
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
log := ctxlog.FromContext(ctx)
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return 0, RequestHashError
+ return putResult{}, RequestHashError
}
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
- return n, err
+ if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
+ return result, err
} else if ctx.Err() != nil {
- return 0, ErrClientDisconnect
+ return putResult{}, ErrClientDisconnect
}
// Choose a Keep volume to write to.
if err := mnt.Put(ctx, hash, block); err != nil {
log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
} else {
- return mnt.Replication, nil // success!
+ return newPutResult(mnt), nil // success!
}
}
if ctx.Err() != nil {
- return 0, ErrClientDisconnect
+ return putResult{}, ErrClientDisconnect
}
writables := volmgr.AllWritable()
if len(writables) == 0 {
log.Error("no writable volumes")
- return 0, FullError
+ return putResult{}, FullError
}
allFull := true
- for _, vol := range writables {
- err := vol.Put(ctx, hash, block)
+ for _, mnt := range writables {
+ err := mnt.Put(ctx, hash, block)
if ctx.Err() != nil {
- return 0, ErrClientDisconnect
+ return putResult{}, ErrClientDisconnect
}
switch err {
case nil:
- return vol.Replication, nil // success!
+ return newPutResult(mnt), nil // success!
case FullError:
continue
default:
// write did not succeed. Report the
// error and continue trying.
allFull = false
- log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
}
}
if allFull {
log.Error("all volumes are full")
- return 0, FullError
+ return putResult{}, FullError
}
// Already logged the non-full errors.
- return 0, GenericError
+ return putResult{}, GenericError
}
// CompareAndTouch returns the current replication level if one of the
// the relevant block's modification time in order to protect it from
// premature garbage collection. Otherwise, it returns a non-nil
// error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
log := ctxlog.FromContext(ctx)
var bestErr error = NotFoundError
for _, mnt := range volmgr.AllWritable() {
err := mnt.Compare(ctx, hash, buf)
if ctx.Err() != nil {
- return 0, ctx.Err()
+ return putResult{}, ctx.Err()
} else if err == CollisionError {
// Stop if we have a block with same hash but
// different content. (It will be impossible
// both, so there's no point writing it even
// on a different volume.)
log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
- return 0, err
+ return putResult{}, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// "normal" error: we don't log anything.
continue
}
// Compare and Touch both worked --> done.
- return mnt.Replication, nil
+ return newPutResult(mnt), nil
}
- return 0, bestErr
+ return putResult{}, bestErr
}
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)