X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/864c3b0afd16c77e046f0072d8517d34c5a44792..72d7d41944006d1f48f570784dafe56b9812b0c8:/services/keepstore/handlers.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index a60d17d576..910033ebb1 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -18,6 +18,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -252,6 +253,13 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { for i, sc := range wantStorageClasses { wantStorageClasses[i] = strings.TrimSpace(sc) } + } else { + // none specified -- use configured default + for class, cfg := range rtr.cluster.StorageClasses { + if cfg.Default { + wantStorageClasses = append(wantStorageClasses, class) + } + } } buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength)) @@ -734,6 +742,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b } type putProgress struct { + classNeeded map[string]bool classTodo map[string]bool mountUsed map[*VolumeMount]bool totalReplication int @@ -762,7 +771,7 @@ func (pr putProgress) ClassReplication() string { func (pr *putProgress) Add(mnt *VolumeMount) { if pr.mountUsed[mnt] { - logrus.Warnf("BUG? superfluous extra write to mount %s", mnt) + logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID) return } pr.mountUsed[mnt] = true @@ -773,6 +782,21 @@ func (pr *putProgress) Add(mnt *VolumeMount) { } } +func (pr *putProgress) Sub(mnt *VolumeMount) { + if !pr.mountUsed[mnt] { + logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID) + return + } + pr.mountUsed[mnt] = false + pr.totalReplication -= mnt.Replication + for class := range mnt.StorageClasses { + pr.classDone[class] -= mnt.Replication + if pr.classNeeded[class] { + pr.classTodo[class] = true + } + } +} + func (pr *putProgress) Done() bool { return len(pr.classTodo) == 0 && pr.totalReplication > 0 } @@ -793,47 +817,65 @@ func (pr *putProgress) Want(mnt *VolumeMount) bool { return false } -func newPutResult(classes []string) putProgress { +func (pr *putProgress) Copy() *putProgress { + cp := putProgress{ + classNeeded: pr.classNeeded, + classTodo: make(map[string]bool, len(pr.classTodo)), + classDone: make(map[string]int, len(pr.classDone)), + mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)), + totalReplication: pr.totalReplication, + } + for k, v := range pr.classTodo { + cp.classTodo[k] = v + } + for k, v := range pr.classDone { + cp.classDone[k] = v + } + for k, v := range pr.mountUsed { + cp.mountUsed[k] = v + } + return &cp +} + +func newPutProgress(classes []string) putProgress { pr := putProgress{ - classTodo: make(map[string]bool, len(classes)), - classDone: map[string]int{}, - mountUsed: map[*VolumeMount]bool{}, + classNeeded: make(map[string]bool, len(classes)), + classTodo: make(map[string]bool, len(classes)), + classDone: map[string]int{}, + mountUsed: map[*VolumeMount]bool{}, } for _, c := range classes { if c != "" { + pr.classNeeded[c] = true pr.classTodo[c] = true } } return pr } -// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep. -// -// PutBlock(ctx, block, hash) -// Stores the BLOCK (identified by the content id HASH) in Keep. +// PutBlock stores the given block on one or more volumes. // -// The MD5 checksum of the block must be identical to the content id HASH. -// If not, an error is returned. +// The MD5 checksum of the block must match the given hash. // -// PutBlock stores the BLOCK on the first Keep volume with free space. -// A failure code is returned to the user only if all volumes fail. +// The block is written to each writable volume (ordered by priority +// and then UUID, see volume.go) until at least one replica has been +// stored in each of the requested storage classes. // -// On success, PutBlock returns nil. -// On failure, it returns a KeepError with one of the following codes: -// -// 500 Collision -// A different block with the same hash already exists on this -// Keep server. -// 422 MD5Fail -// The MD5 hash of the BLOCK does not match the argument HASH. -// 503 Full -// There was not enough space left in any Keep volume to store -// the object. -// 500 Fail -// The object could not be stored for some other reason (e.g. -// all writes failed). The text of the error message should -// provide as much detail as possible. +// The returned error, if any, is a KeepError with one of the +// following codes: // +// 500 Collision +// A different block with the same hash already exists on this +// Keep server. +// 422 MD5Fail +// The MD5 hash of the BLOCK does not match the argument HASH. +// 503 Full +// There was not enough space left in any Keep volume to store +// the object. +// 500 Fail +// The object could not be stored for some other reason (e.g. +// all writes failed). The text of the error message should +// provide as much detail as possible. func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) { log := ctxlog.FromContext(ctx) @@ -844,72 +886,88 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s return putProgress{}, RequestHashError } - result := newPutResult(wantStorageClasses) + result := newPutProgress(wantStorageClasses) // If we already have this data, it's intact on disk, and we // can update its timestamp, return success. If we have // different data with the same hash, return failure. - if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil { + if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() { return result, err } if ctx.Err() != nil { return result, ErrClientDisconnect } - // Choose a Keep volume to write to. - // If this volume fails, try all of the volumes in order. - if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) { - // fall through to "try all volumes" below - } else if err := mnt.Put(ctx, hash, block); err != nil { - log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) - } else { - result.Add(mnt) - if result.Done() { - return result, nil - } - } - if ctx.Err() != nil { - return putProgress{}, ErrClientDisconnect - } - - writables := volmgr.AllWritable() + writables := volmgr.NextWritable() if len(writables) == 0 { log.Error("no writable volumes") - return putProgress{}, FullError + return result, FullError } - allFull := true + var wg sync.WaitGroup + var mtx sync.Mutex + cond := sync.Cond{L: &mtx} + // pending predicts what result will be if all pending writes + // succeed. + pending := result.Copy() + var allFull atomic.Value + allFull.Store(true) + + // We hold the lock for the duration of the "each volume" loop + // below, except when it is released during cond.Wait(). + mtx.Lock() + for _, mnt := range writables { + // Wait until our decision to use this mount does not + // depend on the outcome of pending writes. + for result.Want(mnt) && !pending.Want(mnt) { + cond.Wait() + } if !result.Want(mnt) { continue } - err := mnt.Put(ctx, hash, block) - if ctx.Err() != nil { - return result, ErrClientDisconnect - } - switch err { - case nil: - result.Add(mnt) - if result.Done() { - return result, nil + mnt := mnt + pending.Add(mnt) + wg.Add(1) + go func() { + log.Debugf("PutBlock: start write to %s", mnt.UUID) + defer wg.Done() + err := mnt.Put(ctx, hash, block) + + mtx.Lock() + if err != nil { + log.Debugf("PutBlock: write to %s failed", mnt.UUID) + pending.Sub(mnt) + } else { + log.Debugf("PutBlock: write to %s succeeded", mnt.UUID) + result.Add(mnt) } - continue - case FullError: - continue - default: - // The volume is not full but the - // write did not succeed. Report the - // error and continue trying. - allFull = false - log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) - } + cond.Broadcast() + mtx.Unlock() + + if err != nil && err != FullError && ctx.Err() == nil { + // The volume is not full but the + // write did not succeed. Report the + // error and continue trying. + allFull.Store(false) + log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) + } + }() + } + mtx.Unlock() + wg.Wait() + if ctx.Err() != nil { + return result, ErrClientDisconnect + } + if result.Done() { + return result, nil } if result.totalReplication > 0 { // Some, but not all, of the storage classes were // satisfied. This qualifies as success. return result, nil - } else if allFull { + } else if allFull.Load().(bool) { log.Error("all volumes with qualifying storage classes are full") return putProgress{}, FullError } else {