X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fdd48c8ba17f4cc040ea7e7b9553cefc33717f4e..da83807d6bcef1c1f0bb78479c5ec17f150f5eda:/services/keepstore/handlers.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index a0e7fd0e01..abeb20fe86 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "container/list" @@ -18,8 +18,10 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" + "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/health" @@ -111,12 +113,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) { } func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := contextForResponse(context.TODO(), resp) - defer cancel() - locator := req.URL.Path[1:] if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") { - rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr) + rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr) return } @@ -135,14 +134,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { // isn't here, we can return 404 now instead of waiting for a // buffer. - buf, err := getBufferWithContext(ctx, bufs, BlockSize) + buf, err := getBufferWithContext(req.Context(), bufs, BlockSize) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) return } defer bufs.Put(buf) - size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp) + size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp) if err != nil { code := http.StatusInternalServerError if err, ok := err.(*KeepError); ok { @@ -157,21 +156,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { resp.Write(buf[:size]) } -// Return a new context that gets cancelled by resp's CloseNotifier. -func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(parent) - if cn, ok := resp.(http.CloseNotifier); ok { - go func(c <-chan bool) { - select { - case <-c: - cancel() - case <-ctx.Done(): - } - }(cn.CloseNotify()) - } - return ctx, cancel -} - // Get a buffer from the pool -- but give up and return a non-nil // error if ctx ends before we get a buffer. func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) { @@ -222,9 +206,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) { } func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := contextForResponse(context.TODO(), resp) - defer cancel() - hash := mux.Vars(req)["hash"] // Detect as many error conditions as possible before reading @@ -246,7 +227,22 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength)) + var wantStorageClasses []string + if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" { + wantStorageClasses = strings.Split(hdr, ",") + for i, sc := range wantStorageClasses { + wantStorageClasses[i] = strings.TrimSpace(sc) + } + } else { + // none specified -- use configured default + for class, cfg := range rtr.cluster.StorageClasses { + if cfg.Default { + wantStorageClasses = append(wantStorageClasses, class) + } + } + } + + buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength)) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) return @@ -259,7 +255,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - result, err := PutBlock(ctx, rtr.volmgr, buf, hash) + result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses) bufs.Put(buf) if err != nil { @@ -395,7 +391,7 @@ func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) { // populate the given NodeStatus struct with current values. func (rtr *router) readNodeStatus(st *NodeStatus) { - st.Version = version + st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0] vols := rtr.volmgr.AllReadable() if cap(st.Volumes) < len(vols) { st.Volumes = make([]*volumeStatusEnt, len(vols)) @@ -453,11 +449,10 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus { // Otherwise, the response code is 200 OK, with a response body // consisting of the JSON message // -// {"copies_deleted":d,"copies_failed":f} +// {"copies_deleted":d,"copies_failed":f} // // where d and f are integers representing the number of blocks that // were successfully and unsuccessfully deleted. -// func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) { hash := mux.Vars(req)["hash"] @@ -480,8 +475,10 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) { Deleted int `json:"copies_deleted"` Failed int `json:"copies_failed"` } - for _, vol := range rtr.volmgr.AllWritable() { - if err := vol.Trash(hash); err == nil { + for _, vol := range rtr.volmgr.Mounts() { + if !vol.KeepMount.AllowTrash { + continue + } else if err := vol.Trash(hash); err == nil { result.Deleted++ } else if os.IsNotExist(err) { continue @@ -678,7 +675,6 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { // // If the block found does not have the correct MD5 hash, returns // DiskHashError. -// func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) { log := ctxlog.FromContext(ctx) @@ -713,7 +709,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. - log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol) + log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol) errorToCaller = DiskHashError continue } @@ -725,23 +721,26 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b return 0, errorToCaller } -type putResult struct { +type putProgress struct { + classNeeded map[string]bool + classTodo map[string]bool + mountUsed map[*VolumeMount]bool totalReplication int - classReplication map[string]int + classDone map[string]int } // Number of distinct replicas stored. "2" can mean the block was // stored on 2 different volumes with replication 1, or on 1 volume // with replication 2. -func (pr putResult) TotalReplication() string { +func (pr putProgress) TotalReplication() string { return strconv.Itoa(pr.totalReplication) } // Number of replicas satisfying each storage class, formatted like // "default=2; special=1". -func (pr putResult) ClassReplication() string { +func (pr putProgress) ClassReplication() string { s := "" - for k, v := range pr.classReplication { + for k, v := range pr.classDone { if len(s) > 0 { s += ", " } @@ -750,130 +749,242 @@ func (pr putResult) ClassReplication() string { return s } -func newPutResult(mnt *VolumeMount) putResult { - result := putResult{ - totalReplication: mnt.Replication, - classReplication: map[string]int{}, +func (pr *putProgress) Add(mnt *VolumeMount) { + if pr.mountUsed[mnt] { + logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID) + return } + pr.mountUsed[mnt] = true + pr.totalReplication += mnt.Replication for class := range mnt.StorageClasses { - result.classReplication[class] += mnt.Replication + pr.classDone[class] += mnt.Replication + delete(pr.classTodo, class) } - return result } -// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep. +func (pr *putProgress) Sub(mnt *VolumeMount) { + if !pr.mountUsed[mnt] { + logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID) + return + } + pr.mountUsed[mnt] = false + pr.totalReplication -= mnt.Replication + for class := range mnt.StorageClasses { + pr.classDone[class] -= mnt.Replication + if pr.classNeeded[class] { + pr.classTodo[class] = true + } + } +} + +func (pr *putProgress) Done() bool { + return len(pr.classTodo) == 0 && pr.totalReplication > 0 +} + +func (pr *putProgress) Want(mnt *VolumeMount) bool { + if pr.Done() || pr.mountUsed[mnt] { + return false + } + if len(pr.classTodo) == 0 { + // none specified == "any" + return true + } + for class := range mnt.StorageClasses { + if pr.classTodo[class] { + return true + } + } + return false +} + +func (pr *putProgress) Copy() *putProgress { + cp := putProgress{ + classNeeded: pr.classNeeded, + classTodo: make(map[string]bool, len(pr.classTodo)), + classDone: make(map[string]int, len(pr.classDone)), + mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)), + totalReplication: pr.totalReplication, + } + for k, v := range pr.classTodo { + cp.classTodo[k] = v + } + for k, v := range pr.classDone { + cp.classDone[k] = v + } + for k, v := range pr.mountUsed { + cp.mountUsed[k] = v + } + return &cp +} + +func newPutProgress(classes []string) putProgress { + pr := putProgress{ + classNeeded: make(map[string]bool, len(classes)), + classTodo: make(map[string]bool, len(classes)), + classDone: map[string]int{}, + mountUsed: map[*VolumeMount]bool{}, + } + for _, c := range classes { + if c != "" { + pr.classNeeded[c] = true + pr.classTodo[c] = true + } + } + return pr +} + +// PutBlock stores the given block on one or more volumes. +// +// The MD5 checksum of the block must match the given hash. +// +// The block is written to each writable volume (ordered by priority +// and then UUID, see volume.go) until at least one replica has been +// stored in each of the requested storage classes. +// +// The returned error, if any, is a KeepError with one of the +// following codes: +// +// 500 Collision // -// PutBlock(ctx, block, hash) -// Stores the BLOCK (identified by the content id HASH) in Keep. +// A different block with the same hash already exists on this +// Keep server. // -// The MD5 checksum of the block must be identical to the content id HASH. -// If not, an error is returned. +// 422 MD5Fail // -// PutBlock stores the BLOCK on the first Keep volume with free space. -// A failure code is returned to the user only if all volumes fail. +// The MD5 hash of the BLOCK does not match the argument HASH. // -// On success, PutBlock returns nil. -// On failure, it returns a KeepError with one of the following codes: +// 503 Full // -// 500 Collision -// A different block with the same hash already exists on this -// Keep server. -// 422 MD5Fail -// The MD5 hash of the BLOCK does not match the argument HASH. -// 503 Full -// There was not enough space left in any Keep volume to store -// the object. -// 500 Fail -// The object could not be stored for some other reason (e.g. -// all writes failed). The text of the error message should -// provide as much detail as possible. +// There was not enough space left in any Keep volume to store +// the object. // -func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) { +// 500 Fail +// +// The object could not be stored for some other reason (e.g. +// all writes failed). The text of the error message should +// provide as much detail as possible. +func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) { log := ctxlog.FromContext(ctx) // Check that BLOCK's checksum matches HASH. blockhash := fmt.Sprintf("%x", md5.Sum(block)) if blockhash != hash { log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash) - return putResult{}, RequestHashError + return putProgress{}, RequestHashError } + result := newPutProgress(wantStorageClasses) + // If we already have this data, it's intact on disk, and we // can update its timestamp, return success. If we have // different data with the same hash, return failure. - if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError { + if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() { return result, err - } else if ctx.Err() != nil { - return putResult{}, ErrClientDisconnect - } - - // Choose a Keep volume to write to. - // If this volume fails, try all of the volumes in order. - if mnt := volmgr.NextWritable(); mnt != nil { - if err := mnt.Put(ctx, hash, block); err != nil { - log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) - } else { - return newPutResult(mnt), nil // success! - } } if ctx.Err() != nil { - return putResult{}, ErrClientDisconnect + return result, ErrClientDisconnect } - writables := volmgr.AllWritable() + writables := volmgr.NextWritable() if len(writables) == 0 { log.Error("no writable volumes") - return putResult{}, FullError + return result, FullError } - allFull := true + var wg sync.WaitGroup + var mtx sync.Mutex + cond := sync.Cond{L: &mtx} + // pending predicts what result will be if all pending writes + // succeed. + pending := result.Copy() + var allFull atomic.Value + allFull.Store(true) + + // We hold the lock for the duration of the "each volume" loop + // below, except when it is released during cond.Wait(). + mtx.Lock() + for _, mnt := range writables { - err := mnt.Put(ctx, hash, block) - if ctx.Err() != nil { - return putResult{}, ErrClientDisconnect + // Wait until our decision to use this mount does not + // depend on the outcome of pending writes. + for result.Want(mnt) && !pending.Want(mnt) { + cond.Wait() } - switch err { - case nil: - return newPutResult(mnt), nil // success! - case FullError: + if !result.Want(mnt) { continue - default: - // The volume is not full but the - // write did not succeed. Report the - // error and continue trying. - allFull = false - log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) } + mnt := mnt + pending.Add(mnt) + wg.Add(1) + go func() { + log.Debugf("PutBlock: start write to %s", mnt.UUID) + defer wg.Done() + err := mnt.Put(ctx, hash, block) + + mtx.Lock() + if err != nil { + log.Debugf("PutBlock: write to %s failed", mnt.UUID) + pending.Sub(mnt) + } else { + log.Debugf("PutBlock: write to %s succeeded", mnt.UUID) + result.Add(mnt) + } + cond.Broadcast() + mtx.Unlock() + + if err != nil && err != FullError && ctx.Err() == nil { + // The volume is not full but the + // write did not succeed. Report the + // error and continue trying. + allFull.Store(false) + log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) + } + }() + } + mtx.Unlock() + wg.Wait() + if ctx.Err() != nil { + return result, ErrClientDisconnect + } + if result.Done() { + return result, nil } - if allFull { - log.Error("all volumes are full") - return putResult{}, FullError + if result.totalReplication > 0 { + // Some, but not all, of the storage classes were + // satisfied. This qualifies as success. + return result, nil + } else if allFull.Load().(bool) { + log.Error("all volumes with qualifying storage classes are full") + return putProgress{}, FullError + } else { + // Already logged the non-full errors. + return putProgress{}, GenericError } - // Already logged the non-full errors. - return putResult{}, GenericError } -// CompareAndTouch returns the current replication level if one of the -// volumes already has the given content and it successfully updates -// the relevant block's modification time in order to protect it from -// premature garbage collection. Otherwise, it returns a non-nil -// error. -func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) { +// CompareAndTouch looks for volumes where the given content already +// exists and its modification time can be updated (i.e., it is +// protected from garbage collection), and updates result accordingly. +// It returns when the result is Done() or all volumes have been +// checked. +func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error { log := ctxlog.FromContext(ctx) - var bestErr error = NotFoundError for _, mnt := range volmgr.AllWritable() { + if !result.Want(mnt) { + continue + } err := mnt.Compare(ctx, hash, buf) if ctx.Err() != nil { - return putResult{}, ctx.Err() + return nil } else if err == CollisionError { // Stop if we have a block with same hash but // different content. (It will be impossible // to tell which one is wanted if we have // both, so there's no point writing it even // on a different volume.) - log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume) - return putResult{}, err + log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume) + return CollisionError } else if os.IsNotExist(err) { // Block does not exist. This is the only // "normal" error: we don't log anything. @@ -887,21 +998,22 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, } if err := mnt.Touch(hash); err != nil { log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume) - bestErr = err continue } // Compare and Touch both worked --> done. - return newPutResult(mnt), nil + result.Add(mnt) + if result.Done() { + return nil + } } - return putResult{}, bestErr + return nil } var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`) -// IsValidLocator returns true if the specified string is a valid Keep locator. -// When Keep is extended to support hash types other than MD5, -// this should be updated to cover those as well. -// +// IsValidLocator returns true if the specified string is a valid Keep +// locator. When Keep is extended to support hash types other than +// MD5, this should be updated to cover those as well. func IsValidLocator(loc string) bool { return validLocatorRe.MatchString(loc) }