X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/62d28600cbfc31f8e72c61e4519ff198cb66a02a..96c664032e219cab6113acadca843f94009d39e4:/services/keepstore/handlers.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 0fcc121441..07c2946668 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -20,10 +20,10 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/ctxlog" - "git.curoverse.com/arvados.git/sdk/go/health" - "git.curoverse.com/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/health" + "git.arvados.org/arvados.git/sdk/go/httpserver" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" @@ -66,6 +66,8 @@ func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *promethe // List blocks stored here whose hash has the given prefix. // Privileged client only. rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD") + // Update timestamp on existing block. Privileged client only. + rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH") // Internals/debugging info (runtime.MemStats) rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD") @@ -191,6 +193,34 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([ } } +func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) { + if !rtr.isSystemAuth(GetAPIToken(req)) { + http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) + return + } + hash := mux.Vars(req)["hash"] + vols := rtr.volmgr.AllWritable() + if len(vols) == 0 { + http.Error(resp, "no volumes", http.StatusNotFound) + return + } + var err error + for _, mnt := range vols { + err = mnt.Touch(hash) + if err == nil { + break + } + } + switch { + case err == nil: + return + case os.IsNotExist(err): + http.Error(resp, err.Error(), http.StatusNotFound) + default: + http.Error(resp, err.Error(), http.StatusInternalServerError) + } +} + func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { ctx, cancel := contextForResponse(context.TODO(), resp) defer cancel() @@ -216,6 +246,14 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } + var wantStorageClasses []string + if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" { + wantStorageClasses = strings.Split(hdr, ",") + for i, sc := range wantStorageClasses { + wantStorageClasses[i] = strings.TrimSpace(sc) + } + } + buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength)) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) @@ -229,7 +267,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - replication, err := PutBlock(ctx, rtr.volmgr, buf, hash) + result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses) bufs.Put(buf) if err != nil { @@ -249,7 +287,8 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration()) returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry) } - resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication)) + resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication()) + resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication()) resp.Write([]byte(returnHash + "\n")) } @@ -694,6 +733,80 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b return 0, errorToCaller } +type putResult struct { + classTodo map[string]bool + mountUsed map[*VolumeMount]bool + totalReplication int + classDone map[string]int +} + +// Number of distinct replicas stored. "2" can mean the block was +// stored on 2 different volumes with replication 1, or on 1 volume +// with replication 2. +func (pr putResult) TotalReplication() string { + return strconv.Itoa(pr.totalReplication) +} + +// Number of replicas satisfying each storage class, formatted like +// "default=2; special=1". +func (pr putResult) ClassReplication() string { + s := "" + for k, v := range pr.classDone { + if len(s) > 0 { + s += ", " + } + s += k + "=" + strconv.Itoa(v) + } + return s +} + +func (pr *putResult) Add(mnt *VolumeMount) { + if pr.mountUsed[mnt] { + logrus.Warnf("BUG? superfluous extra write to mount %s", mnt) + return + } + pr.mountUsed[mnt] = true + pr.totalReplication += mnt.Replication + for class := range mnt.StorageClasses { + pr.classDone[class] += mnt.Replication + delete(pr.classTodo, class) + } +} + +func (pr *putResult) Done() bool { + return len(pr.classTodo) == 0 && pr.totalReplication > 0 +} + +func (pr *putResult) Want(mnt *VolumeMount) bool { + if pr.Done() || pr.mountUsed[mnt] { + return false + } + if len(pr.classTodo) == 0 { + // none specified == "any" + return true + } + for class := range mnt.StorageClasses { + if pr.classTodo[class] { + return true + } + } + return false +} + +func newPutResult(classes []string) putResult { + pr := putResult{ + classTodo: make(map[string]bool, len(classes)), + classDone: map[string]int{}, + mountUsed: map[*VolumeMount]bool{}, + } + for _, c := range classes { + if c != "" { + pr.classTodo[c] = true + } + } + return pr +} + // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep. // // PutBlock(ctx, block, hash) @@ -721,53 +834,66 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b // all writes failed). The text of the error message should // provide as much detail as possible. // -func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) { +func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putResult, error) { log := ctxlog.FromContext(ctx) // Check that BLOCK's checksum matches HASH. blockhash := fmt.Sprintf("%x", md5.Sum(block)) if blockhash != hash { log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash) - return 0, RequestHashError + return putResult{}, RequestHashError } + result := newPutResult(wantStorageClasses) + // If we already have this data, it's intact on disk, and we // can update its timestamp, return success. If we have // different data with the same hash, return failure. - if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError { - return n, err - } else if ctx.Err() != nil { - return 0, ErrClientDisconnect + if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil { + return result, err + } + if ctx.Err() != nil { + return result, ErrClientDisconnect } // Choose a Keep volume to write to. // If this volume fails, try all of the volumes in order. - if mnt := volmgr.NextWritable(); mnt != nil { - if err := mnt.Put(ctx, hash, block); err != nil { - log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) - } else { - return mnt.Replication, nil // success! + if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) { + // fall through to "try all volumes" below + } else if err := mnt.Put(ctx, hash, block); err != nil { + log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) + } else { + result.Add(mnt) + if result.Done() { + return result, nil } } if ctx.Err() != nil { - return 0, ErrClientDisconnect + return putResult{}, ErrClientDisconnect } writables := volmgr.AllWritable() if len(writables) == 0 { log.Error("no writable volumes") - return 0, FullError + return putResult{}, FullError } allFull := true - for _, vol := range writables { - err := vol.Put(ctx, hash, block) + for _, mnt := range writables { + if !result.Want(mnt) { + continue + } + err := mnt.Put(ctx, hash, block) if ctx.Err() != nil { - return 0, ErrClientDisconnect + return result, ErrClientDisconnect } switch err { case nil: - return vol.Replication, nil // success! + result.Add(mnt) + if result.Done() { + return result, nil + } + continue case FullError: continue default: @@ -775,30 +901,37 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s // write did not succeed. Report the // error and continue trying. allFull = false - log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash) + log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) } } - if allFull { - log.Error("all volumes are full") - return 0, FullError + if result.totalReplication > 0 { + // Some, but not all, of the storage classes were + // satisfied. This qualifies as success. + return result, nil + } else if allFull { + log.Error("all volumes with qualifying storage classes are full") + return putResult{}, FullError + } else { + // Already logged the non-full errors. + return putResult{}, GenericError } - // Already logged the non-full errors. - return 0, GenericError } -// CompareAndTouch returns the current replication level if one of the -// volumes already has the given content and it successfully updates -// the relevant block's modification time in order to protect it from -// premature garbage collection. Otherwise, it returns a non-nil -// error. -func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) { +// CompareAndTouch looks for volumes where the given content already +// exists and its modification time can be updated (i.e., it is +// protected from garbage collection), and updates result accordingly. +// It returns when the result is Done() or all volumes have been +// checked. +func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putResult) error { log := ctxlog.FromContext(ctx) - var bestErr error = NotFoundError for _, mnt := range volmgr.AllWritable() { + if !result.Want(mnt) { + continue + } err := mnt.Compare(ctx, hash, buf) if ctx.Err() != nil { - return 0, ctx.Err() + return nil } else if err == CollisionError { // Stop if we have a block with same hash but // different content. (It will be impossible @@ -806,7 +939,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, // both, so there's no point writing it even // on a different volume.) log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume) - return 0, err + return CollisionError } else if os.IsNotExist(err) { // Block does not exist. This is the only // "normal" error: we don't log anything. @@ -820,13 +953,15 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, } if err := mnt.Touch(hash); err != nil { log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume) - bestErr = err continue } // Compare and Touch both worked --> done. - return mnt.Replication, nil + result.Add(mnt) + if result.Done() { + return nil + } } - return 0, bestErr + return nil } var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)