13382: Report storage class(es) in headers after successful write.
[arvados.git] / services / keepstore / handlers.go
index 86504422d52f24f2659166e7cbfa975cb45772da..a0e7fd0e0132fadef7ca8276b26b10a87df52c30 100644 (file)
@@ -11,7 +11,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "log"
        "net/http"
        "os"
        "regexp"
@@ -21,10 +20,10 @@ import (
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
-       "git.curoverse.com/arvados.git/sdk/go/health"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
        "github.com/gorilla/mux"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
@@ -67,6 +66,8 @@ func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *promethe
        // List blocks stored here whose hash has the given prefix.
        // Privileged client only.
        rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
+       // Update timestamp on existing block. Privileged client only.
+       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
 
        // Internals/debugging info (runtime.MemStats)
        rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
@@ -192,6 +193,34 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
        }
 }
 
+func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
+       if !rtr.isSystemAuth(GetAPIToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               return
+       }
+       hash := mux.Vars(req)["hash"]
+       vols := rtr.volmgr.AllWritable()
+       if len(vols) == 0 {
+               http.Error(resp, "no volumes", http.StatusNotFound)
+               return
+       }
+       var err error
+       for _, mnt := range vols {
+               err = mnt.Touch(hash)
+               if err == nil {
+                       break
+               }
+       }
+       switch {
+       case err == nil:
+               return
+       case os.IsNotExist(err):
+               http.Error(resp, err.Error(), http.StatusNotFound)
+       default:
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+       }
+}
+
 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
        ctx, cancel := contextForResponse(context.TODO(), resp)
        defer cancel()
@@ -230,7 +259,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+       result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
        bufs.Put(buf)
 
        if err != nil {
@@ -250,7 +279,8 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
                returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
        }
-       resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
+       resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
+       resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
        resp.Write([]byte(returnHash + "\n"))
 }
 
@@ -282,19 +312,15 @@ func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
 
        for _, v := range vols {
                if err := v.IndexTo(prefix, resp); err != nil {
-                       // We can't send an error message to the
-                       // client because we might have already sent
-                       // headers and index content. All we can do is
-                       // log the error in our own logs, and (in
-                       // cases where headers haven't been sent yet)
-                       // set a 500 status.
+                       // We can't send an error status/message to
+                       // the client because IndexTo() might have
+                       // already written body content. All we can do
+                       // is log the error in our own logs.
                        //
-                       // If headers have already been sent, the
-                       // client must notice the lack of trailing
+                       // The client must notice the lack of trailing
                        // newline as an indication that the response
                        // is incomplete.
-                       log.Printf("index error from volume %s: %s", v, err)
-                       http.Error(resp, "", http.StatusInternalServerError)
+                       ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
                        return
                }
        }
@@ -346,25 +372,25 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
        }
        var ds debugStats
        runtime.ReadMemStats(&ds.MemStats)
-       err := json.NewEncoder(resp).Encode(&ds)
+       data, err := json.Marshal(&ds)
        if err != nil {
-               http.Error(resp, err.Error(), 500)
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // StatusHandler addresses /status.json requests.
 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
        rtr.readNodeStatus(&st)
-       jstat, err := json.Marshal(&st)
+       data, err := json.Marshal(&st)
        stLock.Unlock()
-       if err == nil {
-               resp.Write(jstat)
-       } else {
-               log.Printf("json.Marshal: %s", err)
-               log.Printf("NodeStatus = %v", &st)
-               http.Error(resp, err.Error(), 500)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // populate the given NodeStatus struct with current values.
@@ -461,28 +487,19 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
                        continue
                } else {
                        result.Failed++
-                       log.Println("DeleteHandler:", err)
+                       ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
                }
        }
-
-       var st int
-
        if result.Deleted == 0 && result.Failed == 0 {
-               st = http.StatusNotFound
-       } else {
-               st = http.StatusOK
+               resp.WriteHeader(http.StatusNotFound)
+               return
        }
-
-       resp.WriteHeader(st)
-
-       if st == http.StatusOK {
-               if body, err := json.Marshal(result); err == nil {
-                       resp.Write(body)
-               } else {
-                       log.Printf("json.Marshal: %s (result = %v)", err, result)
-                       http.Error(resp, err.Error(), 500)
-               }
+       body, err := json.Marshal(result)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(body)
 }
 
 /* PullHandler processes "PUT /pull" requests for the data manager.
@@ -604,6 +621,7 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
+       log := ctxlog.FromContext(req.Context())
        hash := mux.Vars(req)["hash"]
 
        if len(rtr.volmgr.AllWritable()) == 0 {
@@ -619,27 +637,26 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
                if os.IsNotExist(err) {
                        numNotFound++
                } else if err != nil {
-                       log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+                       log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
                        failedOn = append(failedOn, vol.String())
                } else {
-                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       log.Infof("Untrashed %v on volume %v", hash, vol.String())
                        untrashedOn = append(untrashedOn, vol.String())
                }
        }
 
        if numNotFound == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
-               return
-       }
-
-       if len(failedOn) == len(rtr.volmgr.AllWritable()) {
+       } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
        } else {
-               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
                if len(failedOn) > 0 {
-                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+                       http.Error(resp, respBody, http.StatusInternalServerError)
+               } else {
+                       fmt.Fprintln(resp, respBody)
                }
-               resp.Write([]byte(respBody))
        }
 }
 
@@ -663,6 +680,8 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
 // DiskHashError.
 //
 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+       log := ctxlog.FromContext(ctx)
+
        // Attempt to read the requested hash from a keep volume.
        errorToCaller := NotFoundError
 
@@ -680,7 +699,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                        // volumes. If all volumes report IsNotExist,
                        // we return a NotFoundError.
                        if !os.IsNotExist(err) {
-                               log.Printf("%s: Get(%s): %s", vol, hash, err)
+                               log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
                        }
                        // If some volume returns a transient error, return it to the caller
                        // instead of "Not found" so it can retry.
@@ -690,25 +709,58 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                        continue
                }
                // Check the file checksum.
-               //
                filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
-                       log.Printf("%s: checksum mismatch for request %s (actual %s)",
-                               vol, hash, filehash)
+                       log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
                        errorToCaller = DiskHashError
                        continue
                }
                if errorToCaller == DiskHashError {
-                       log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
-                               vol, hash)
+                       log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
                }
                return size, nil
        }
        return 0, errorToCaller
 }
 
+type putResult struct {
+       totalReplication int
+       classReplication map[string]int
+}
+
+// Number of distinct replicas stored. "2" can mean the block was
+// stored on 2 different volumes with replication 1, or on 1 volume
+// with replication 2.
+func (pr putResult) TotalReplication() string {
+       return strconv.Itoa(pr.totalReplication)
+}
+
+// Number of replicas satisfying each storage class, formatted like
+// "default=2; special=1".
+func (pr putResult) ClassReplication() string {
+       s := ""
+       for k, v := range pr.classReplication {
+               if len(s) > 0 {
+                       s += ", "
+               }
+               s += k + "=" + strconv.Itoa(v)
+       }
+       return s
+}
+
+func newPutResult(mnt *VolumeMount) putResult {
+       result := putResult{
+               totalReplication: mnt.Replication,
+               classReplication: map[string]int{},
+       }
+       for class := range mnt.StorageClasses {
+               result.classReplication[class] += mnt.Replication
+       }
+       return result
+}
+
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
 //
 // PutBlock(ctx, block, hash)
@@ -736,64 +788,70 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
+       log := ctxlog.FromContext(ctx)
+
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return 0, RequestHashError
+               return putResult{}, RequestHashError
        }
 
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
-               return n, err
+       if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
+               return result, err
        } else if ctx.Err() != nil {
-               return 0, ErrClientDisconnect
+               return putResult{}, ErrClientDisconnect
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if mnt := volmgr.NextWritable(); mnt != nil {
-               if err := mnt.Put(ctx, hash, block); err == nil {
-                       return mnt.Replication, nil // success!
-               }
-               if ctx.Err() != nil {
-                       return 0, ErrClientDisconnect
+               if err := mnt.Put(ctx, hash, block); err != nil {
+                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+               } else {
+                       return newPutResult(mnt), nil // success!
                }
        }
+       if ctx.Err() != nil {
+               return putResult{}, ErrClientDisconnect
+       }
 
        writables := volmgr.AllWritable()
        if len(writables) == 0 {
-               log.Print("No writable volumes.")
-               return 0, FullError
+               log.Error("no writable volumes")
+               return putResult{}, FullError
        }
 
        allFull := true
-       for _, vol := range writables {
-               err := vol.Put(ctx, hash, block)
+       for _, mnt := range writables {
+               err := mnt.Put(ctx, hash, block)
                if ctx.Err() != nil {
-                       return 0, ErrClientDisconnect
-               }
-               if err == nil {
-                       return vol.Replication, nil // success!
+                       return putResult{}, ErrClientDisconnect
                }
-               if err != FullError {
+               switch err {
+               case nil:
+                       return newPutResult(mnt), nil // success!
+               case FullError:
+                       continue
+               default:
                        // The volume is not full but the
                        // write did not succeed.  Report the
                        // error and continue trying.
                        allFull = false
-                       log.Printf("%s: Write(%s): %s", vol, hash, err)
+                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
                }
        }
 
        if allFull {
-               log.Print("All volumes are full.")
-               return 0, FullError
+               log.Error("all volumes are full")
+               return putResult{}, FullError
        }
        // Already logged the non-full errors.
-       return 0, GenericError
+       return putResult{}, GenericError
 }
 
 // CompareAndTouch returns the current replication level if one of the
@@ -801,20 +859,21 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
+       log := ctxlog.FromContext(ctx)
        var bestErr error = NotFoundError
        for _, mnt := range volmgr.AllWritable() {
                err := mnt.Compare(ctx, hash, buf)
                if ctx.Err() != nil {
-                       return 0, ctx.Err()
+                       return putResult{}, ctx.Err()
                } else if err == CollisionError {
                        // Stop if we have a block with same hash but
                        // different content. (It will be impossible
                        // to tell which one is wanted if we have
                        // both, so there's no point writing it even
                        // on a different volume.)
-                       log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
-                       return 0, err
+                       log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
+                       return putResult{}, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
                        // "normal" error: we don't log anything.
@@ -823,18 +882,18 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // Couldn't open file, data is corrupt on
                        // disk, etc.: log this abnormal condition,
                        // and try the next volume.
-                       log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+                       log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
                        continue
                }
                if err := mnt.Touch(hash); err != nil {
-                       log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
+                       log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
                        bestErr = err
                        continue
                }
                // Compare and Touch both worked --> done.
-               return mnt.Replication, nil
+               return newPutResult(mnt), nil
        }
-       return 0, bestErr
+       return putResult{}, bestErr
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
@@ -861,18 +920,6 @@ func GetAPIToken(req *http.Request) string {
        return ""
 }
 
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
-       ts, err := strconv.ParseInt(timestampHex, 16, 0)
-       if err != nil {
-               log.Printf("IsExpired: %s", err)
-               return true
-       }
-       return time.Unix(ts, 0).Before(time.Now())
-}
-
 // canDelete returns true if the user identified by apiToken is
 // allowed to delete blocks.
 func (rtr *router) canDelete(apiToken string) bool {