"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
// List blocks stored here whose hash has the given prefix.
// Privileged client only.
rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
+ // Update timestamp on existing block. Privileged client only.
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
// Internals/debugging info (runtime.MemStats)
rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
}
}
+func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+ hash := mux.Vars(req)["hash"]
+ vols := rtr.volmgr.AllWritable()
+ if len(vols) == 0 {
+ http.Error(resp, "no volumes", http.StatusNotFound)
+ return
+ }
+ var err error
+ for _, mnt := range vols {
+ err = mnt.Touch(hash)
+ if err == nil {
+ break
+ }
+ }
+ switch {
+ case err == nil:
+ return
+ case os.IsNotExist(err):
+ http.Error(resp, err.Error(), http.StatusNotFound)
+ default:
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ }
+}
+
func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := contextForResponse(context.TODO(), resp)
defer cancel()
return
}
+ var wantStorageClasses []string
+ if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
+ wantStorageClasses = strings.Split(hdr, ",")
+ for i, sc := range wantStorageClasses {
+ wantStorageClasses[i] = strings.TrimSpace(sc)
+ }
+ } else {
+ // none specified -- use configured default
+ for class, cfg := range rtr.cluster.StorageClasses {
+ if cfg.Default {
+ wantStorageClasses = append(wantStorageClasses, class)
+ }
+ }
+ }
+
buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
- replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+ result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
bufs.Put(buf)
if err != nil {
expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
}
- resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
+ resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
+ resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
resp.Write([]byte(returnHash + "\n"))
}
return 0, errorToCaller
}
+type putProgress struct {
+ classNeeded map[string]bool
+ classTodo map[string]bool
+ mountUsed map[*VolumeMount]bool
+ totalReplication int
+ classDone map[string]int
+}
+
+// Number of distinct replicas stored. "2" can mean the block was
+// stored on 2 different volumes with replication 1, or on 1 volume
+// with replication 2.
+func (pr putProgress) TotalReplication() string {
+ return strconv.Itoa(pr.totalReplication)
+}
+
+// Number of replicas satisfying each storage class, formatted like
+// "default=2; special=1".
+func (pr putProgress) ClassReplication() string {
+ s := ""
+ for k, v := range pr.classDone {
+ if len(s) > 0 {
+ s += ", "
+ }
+ s += k + "=" + strconv.Itoa(v)
+ }
+ return s
+}
+
+func (pr *putProgress) Add(mnt *VolumeMount) {
+ if pr.mountUsed[mnt] {
+ logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
+ return
+ }
+ pr.mountUsed[mnt] = true
+ pr.totalReplication += mnt.Replication
+ for class := range mnt.StorageClasses {
+ pr.classDone[class] += mnt.Replication
+ delete(pr.classTodo, class)
+ }
+}
+
+func (pr *putProgress) Sub(mnt *VolumeMount) {
+ if !pr.mountUsed[mnt] {
+ logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
+ return
+ }
+ pr.mountUsed[mnt] = false
+ pr.totalReplication -= mnt.Replication
+ for class := range mnt.StorageClasses {
+ pr.classDone[class] -= mnt.Replication
+ if pr.classNeeded[class] {
+ pr.classTodo[class] = true
+ }
+ }
+}
+
+func (pr *putProgress) Done() bool {
+ return len(pr.classTodo) == 0 && pr.totalReplication > 0
+}
+
+func (pr *putProgress) Want(mnt *VolumeMount) bool {
+ if pr.Done() || pr.mountUsed[mnt] {
+ return false
+ }
+ if len(pr.classTodo) == 0 {
+ // none specified == "any"
+ return true
+ }
+ for class := range mnt.StorageClasses {
+ if pr.classTodo[class] {
+ return true
+ }
+ }
+ return false
+}
+
+func (pr *putProgress) Copy() *putProgress {
+ cp := putProgress{
+ classNeeded: pr.classNeeded,
+ classTodo: make(map[string]bool, len(pr.classTodo)),
+ classDone: make(map[string]int, len(pr.classDone)),
+ mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
+ totalReplication: pr.totalReplication,
+ }
+ for k, v := range pr.classTodo {
+ cp.classTodo[k] = v
+ }
+ for k, v := range pr.classDone {
+ cp.classDone[k] = v
+ }
+ for k, v := range pr.mountUsed {
+ cp.mountUsed[k] = v
+ }
+ return &cp
+}
+
+func newPutResult(classes []string) putProgress {
+ pr := putProgress{
+ classNeeded: make(map[string]bool, len(classes)),
+ classTodo: make(map[string]bool, len(classes)),
+ classDone: map[string]int{},
+ mountUsed: map[*VolumeMount]bool{},
+ }
+ for _, c := range classes {
+ if c != "" {
+ pr.classNeeded[c] = true
+ pr.classTodo[c] = true
+ }
+ }
+ return pr
+}
+
// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
//
// PutBlock(ctx, block, hash)
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
log := ctxlog.FromContext(ctx)
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return 0, RequestHashError
+ return putProgress{}, RequestHashError
}
+ result := newPutResult(wantStorageClasses)
+
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
- return n, err
- } else if ctx.Err() != nil {
- return 0, ErrClientDisconnect
- }
-
- // Choose a Keep volume to write to.
- // If this volume fails, try all of the volumes in order.
- if mnt := volmgr.NextWritable(); mnt != nil {
- if err := mnt.Put(ctx, hash, block); err != nil {
- log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
- } else {
- return mnt.Replication, nil // success!
- }
+ if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
+ return result, err
}
if ctx.Err() != nil {
- return 0, ErrClientDisconnect
+ return result, ErrClientDisconnect
}
- writables := volmgr.AllWritable()
+ writables := volmgr.NextWritable()
if len(writables) == 0 {
log.Error("no writable volumes")
- return 0, FullError
+ return result, FullError
}
- allFull := true
- for _, vol := range writables {
- err := vol.Put(ctx, hash, block)
- if ctx.Err() != nil {
- return 0, ErrClientDisconnect
+ var wg sync.WaitGroup
+ var mtx sync.Mutex
+ cond := sync.Cond{L: &mtx}
+ // pending predicts what result will be if all pending writes
+ // succeed.
+ pending := result.Copy()
+ var allFull atomic.Value
+ allFull.Store(true)
+ mtx.Lock()
+ for _, mnt := range writables {
+ // Wait until our decision to use this mount does not
+ // depend on the outcome of pending writes.
+ for result.Want(mnt) && !pending.Want(mnt) {
+ cond.Wait()
}
- switch err {
- case nil:
- return vol.Replication, nil // success!
- case FullError:
+ if !result.Want(mnt) {
continue
- default:
- // The volume is not full but the
- // write did not succeed. Report the
- // error and continue trying.
- allFull = false
- log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
}
+ mnt := mnt
+ pending.Add(mnt)
+ wg.Add(1)
+ go func() {
+ log.Debugf("PutBlock: start write to %s", mnt.UUID)
+ defer wg.Done()
+ err := mnt.Put(ctx, hash, block)
+
+ mtx.Lock()
+ if err != nil {
+ log.Debugf("PutBlock: write to %s failed", mnt.UUID)
+ pending.Sub(mnt)
+ } else {
+ log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
+ result.Add(mnt)
+ }
+ cond.Broadcast()
+ mtx.Unlock()
+
+ if err != nil && err != FullError && ctx.Err() == nil {
+ // The volume is not full but the
+ // write did not succeed. Report the
+ // error and continue trying.
+ allFull.Store(false)
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+ }
+ }()
+ }
+ mtx.Unlock()
+ wg.Wait()
+ if ctx.Err() != nil {
+ return result, ErrClientDisconnect
+ }
+ if result.Done() {
+ return result, nil
}
- if allFull {
- log.Error("all volumes are full")
- return 0, FullError
+ if result.totalReplication > 0 {
+ // Some, but not all, of the storage classes were
+ // satisfied. This qualifies as success.
+ return result, nil
+ } else if allFull.Load().(bool) {
+ log.Error("all volumes with qualifying storage classes are full")
+ return putProgress{}, FullError
+ } else {
+ // Already logged the non-full errors.
+ return putProgress{}, GenericError
}
- // Already logged the non-full errors.
- return 0, GenericError
}
-// CompareAndTouch returns the current replication level if one of the
-// volumes already has the given content and it successfully updates
-// the relevant block's modification time in order to protect it from
-// premature garbage collection. Otherwise, it returns a non-nil
-// error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+// CompareAndTouch looks for volumes where the given content already
+// exists and its modification time can be updated (i.e., it is
+// protected from garbage collection), and updates result accordingly.
+// It returns when the result is Done() or all volumes have been
+// checked.
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
log := ctxlog.FromContext(ctx)
- var bestErr error = NotFoundError
for _, mnt := range volmgr.AllWritable() {
+ if !result.Want(mnt) {
+ continue
+ }
err := mnt.Compare(ctx, hash, buf)
if ctx.Err() != nil {
- return 0, ctx.Err()
+ return nil
} else if err == CollisionError {
// Stop if we have a block with same hash but
// different content. (It will be impossible
// both, so there's no point writing it even
// on a different volume.)
log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
- return 0, err
+ return CollisionError
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// "normal" error: we don't log anything.
}
if err := mnt.Touch(hash); err != nil {
log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
- bestErr = err
continue
}
// Compare and Touch both worked --> done.
- return mnt.Replication, nil
+ result.Add(mnt)
+ if result.Done() {
+ return nil
+ }
}
- return 0, bestErr
+ return nil
}
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)