"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
for i, sc := range wantStorageClasses {
wantStorageClasses[i] = strings.TrimSpace(sc)
}
+ } else {
+ // none specified -- use configured default
+ for class, cfg := range rtr.cluster.StorageClasses {
+ if cfg.Default {
+ wantStorageClasses = append(wantStorageClasses, class)
+ }
+ }
}
buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
}
type putProgress struct {
+ classNeeded map[string]bool
classTodo map[string]bool
mountUsed map[*VolumeMount]bool
totalReplication int
func (pr *putProgress) Add(mnt *VolumeMount) {
if pr.mountUsed[mnt] {
- logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
+ logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
return
}
pr.mountUsed[mnt] = true
}
}
+func (pr *putProgress) Sub(mnt *VolumeMount) {
+ if !pr.mountUsed[mnt] {
+ logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
+ return
+ }
+ pr.mountUsed[mnt] = false
+ pr.totalReplication -= mnt.Replication
+ for class := range mnt.StorageClasses {
+ pr.classDone[class] -= mnt.Replication
+ if pr.classNeeded[class] {
+ pr.classTodo[class] = true
+ }
+ }
+}
+
func (pr *putProgress) Done() bool {
return len(pr.classTodo) == 0 && pr.totalReplication > 0
}
return false
}
-func newPutResult(classes []string) putProgress {
+func (pr *putProgress) Copy() *putProgress {
+ cp := putProgress{
+ classNeeded: pr.classNeeded,
+ classTodo: make(map[string]bool, len(pr.classTodo)),
+ classDone: make(map[string]int, len(pr.classDone)),
+ mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
+ totalReplication: pr.totalReplication,
+ }
+ for k, v := range pr.classTodo {
+ cp.classTodo[k] = v
+ }
+ for k, v := range pr.classDone {
+ cp.classDone[k] = v
+ }
+ for k, v := range pr.mountUsed {
+ cp.mountUsed[k] = v
+ }
+ return &cp
+}
+
+func newPutProgress(classes []string) putProgress {
pr := putProgress{
- classTodo: make(map[string]bool, len(classes)),
- classDone: map[string]int{},
- mountUsed: map[*VolumeMount]bool{},
+ classNeeded: make(map[string]bool, len(classes)),
+ classTodo: make(map[string]bool, len(classes)),
+ classDone: map[string]int{},
+ mountUsed: map[*VolumeMount]bool{},
}
for _, c := range classes {
if c != "" {
+ pr.classNeeded[c] = true
pr.classTodo[c] = true
}
}
return pr
}
-// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
-//
-// PutBlock(ctx, block, hash)
-// Stores the BLOCK (identified by the content id HASH) in Keep.
+// PutBlock stores the given block on one or more volumes.
//
-// The MD5 checksum of the block must be identical to the content id HASH.
-// If not, an error is returned.
+// The MD5 checksum of the block must match the given hash.
//
-// PutBlock stores the BLOCK on the first Keep volume with free space.
-// A failure code is returned to the user only if all volumes fail.
+// The block is written to each writable volume (ordered by priority
+// and then UUID, see volume.go) until at least one replica has been
+// stored in each of the requested storage classes.
//
-// On success, PutBlock returns nil.
-// On failure, it returns a KeepError with one of the following codes:
-//
-// 500 Collision
-// A different block with the same hash already exists on this
-// Keep server.
-// 422 MD5Fail
-// The MD5 hash of the BLOCK does not match the argument HASH.
-// 503 Full
-// There was not enough space left in any Keep volume to store
-// the object.
-// 500 Fail
-// The object could not be stored for some other reason (e.g.
-// all writes failed). The text of the error message should
-// provide as much detail as possible.
+// The returned error, if any, is a KeepError with one of the
+// following codes:
//
+// 500 Collision
+// A different block with the same hash already exists on this
+// Keep server.
+// 422 MD5Fail
+// The MD5 hash of the BLOCK does not match the argument HASH.
+// 503 Full
+// There was not enough space left in any Keep volume to store
+// the object.
+// 500 Fail
+// The object could not be stored for some other reason (e.g.
+// all writes failed). The text of the error message should
+// provide as much detail as possible.
func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
log := ctxlog.FromContext(ctx)
return putProgress{}, RequestHashError
}
- result := newPutResult(wantStorageClasses)
+ result := newPutProgress(wantStorageClasses)
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
+ if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
return result, err
}
if ctx.Err() != nil {
return result, ErrClientDisconnect
}
- // Choose a Keep volume to write to.
- // If this volume fails, try all of the volumes in order.
- if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
- // fall through to "try all volumes" below
- } else if err := mnt.Put(ctx, hash, block); err != nil {
- log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
- } else {
- result.Add(mnt)
- if result.Done() {
- return result, nil
- }
- }
- if ctx.Err() != nil {
- return putProgress{}, ErrClientDisconnect
- }
-
- writables := volmgr.AllWritable()
+ writables := volmgr.NextWritable()
if len(writables) == 0 {
log.Error("no writable volumes")
- return putProgress{}, FullError
+ return result, FullError
}
- allFull := true
+ var wg sync.WaitGroup
+ var mtx sync.Mutex
+ cond := sync.Cond{L: &mtx}
+ // pending predicts what result will be if all pending writes
+ // succeed.
+ pending := result.Copy()
+ var allFull atomic.Value
+ allFull.Store(true)
+
+ // We hold the lock for the duration of the "each volume" loop
+ // below, except when it is released during cond.Wait().
+ mtx.Lock()
+
for _, mnt := range writables {
+ // Wait until our decision to use this mount does not
+ // depend on the outcome of pending writes.
+ for result.Want(mnt) && !pending.Want(mnt) {
+ cond.Wait()
+ }
if !result.Want(mnt) {
continue
}
- err := mnt.Put(ctx, hash, block)
- if ctx.Err() != nil {
- return result, ErrClientDisconnect
- }
- switch err {
- case nil:
- result.Add(mnt)
- if result.Done() {
- return result, nil
+ mnt := mnt
+ pending.Add(mnt)
+ wg.Add(1)
+ go func() {
+ log.Debugf("PutBlock: start write to %s", mnt.UUID)
+ defer wg.Done()
+ err := mnt.Put(ctx, hash, block)
+
+ mtx.Lock()
+ if err != nil {
+ log.Debugf("PutBlock: write to %s failed", mnt.UUID)
+ pending.Sub(mnt)
+ } else {
+ log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
+ result.Add(mnt)
}
- continue
- case FullError:
- continue
- default:
- // The volume is not full but the
- // write did not succeed. Report the
- // error and continue trying.
- allFull = false
- log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
- }
+ cond.Broadcast()
+ mtx.Unlock()
+
+ if err != nil && err != FullError && ctx.Err() == nil {
+ // The volume is not full but the
+ // write did not succeed. Report the
+ // error and continue trying.
+ allFull.Store(false)
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+ }
+ }()
+ }
+ mtx.Unlock()
+ wg.Wait()
+ if ctx.Err() != nil {
+ return result, ErrClientDisconnect
+ }
+ if result.Done() {
+ return result, nil
}
if result.totalReplication > 0 {
// Some, but not all, of the storage classes were
// satisfied. This qualifies as success.
return result, nil
- } else if allFull {
+ } else if allFull.Load().(bool) {
log.Error("all volumes with qualifying storage classes are full")
return putProgress{}, FullError
} else {