15521: Convert remaining log uses to logrus.
[arvados.git] / services / keepstore / handlers.go
index 30ea695f0c96e125598fc8042ae1501f7a2fe70b..0fcc12144136ddb72ecfab6911dc1542ab0dcdf8 100644 (file)
@@ -11,7 +11,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "log"
        "net/http"
        "os"
        "regexp"
@@ -342,25 +341,25 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
        }
        var ds debugStats
        runtime.ReadMemStats(&ds.MemStats)
-       err := json.NewEncoder(resp).Encode(&ds)
+       data, err := json.Marshal(&ds)
        if err != nil {
-               http.Error(resp, err.Error(), 500)
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // StatusHandler addresses /status.json requests.
 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
        rtr.readNodeStatus(&st)
-       jstat, err := json.Marshal(&st)
+       data, err := json.Marshal(&st)
        stLock.Unlock()
-       if err == nil {
-               resp.Write(jstat)
-       } else {
-               log.Printf("json.Marshal: %s", err)
-               log.Printf("NodeStatus = %v", &st)
-               http.Error(resp, err.Error(), 500)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // populate the given NodeStatus struct with current values.
@@ -457,28 +456,19 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
                        continue
                } else {
                        result.Failed++
-                       log.Println("DeleteHandler:", err)
+                       ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
                }
        }
-
-       var st int
-
        if result.Deleted == 0 && result.Failed == 0 {
-               st = http.StatusNotFound
-       } else {
-               st = http.StatusOK
+               resp.WriteHeader(http.StatusNotFound)
+               return
        }
-
-       resp.WriteHeader(st)
-
-       if st == http.StatusOK {
-               if body, err := json.Marshal(result); err == nil {
-                       resp.Write(body)
-               } else {
-                       log.Printf("json.Marshal: %s (result = %v)", err, result)
-                       http.Error(resp, err.Error(), 500)
-               }
+       body, err := json.Marshal(result)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(body)
 }
 
 /* PullHandler processes "PUT /pull" requests for the data manager.
@@ -600,6 +590,7 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
+       log := ctxlog.FromContext(req.Context())
        hash := mux.Vars(req)["hash"]
 
        if len(rtr.volmgr.AllWritable()) == 0 {
@@ -615,27 +606,26 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
                if os.IsNotExist(err) {
                        numNotFound++
                } else if err != nil {
-                       log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+                       log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
                        failedOn = append(failedOn, vol.String())
                } else {
-                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       log.Infof("Untrashed %v on volume %v", hash, vol.String())
                        untrashedOn = append(untrashedOn, vol.String())
                }
        }
 
        if numNotFound == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
-               return
-       }
-
-       if len(failedOn) == len(rtr.volmgr.AllWritable()) {
+       } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
        } else {
-               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
                if len(failedOn) > 0 {
-                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+                       http.Error(resp, respBody, http.StatusInternalServerError)
+               } else {
+                       fmt.Fprintln(resp, respBody)
                }
-               resp.Write([]byte(respBody))
        }
 }
 
@@ -659,6 +649,8 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
 // DiskHashError.
 //
 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+       log := ctxlog.FromContext(ctx)
+
        // Attempt to read the requested hash from a keep volume.
        errorToCaller := NotFoundError
 
@@ -676,7 +668,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                        // volumes. If all volumes report IsNotExist,
                        // we return a NotFoundError.
                        if !os.IsNotExist(err) {
-                               log.Printf("%s: Get(%s): %s", vol, hash, err)
+                               log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
                        }
                        // If some volume returns a transient error, return it to the caller
                        // instead of "Not found" so it can retry.
@@ -686,19 +678,16 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                        continue
                }
                // Check the file checksum.
-               //
                filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
-                       log.Printf("%s: checksum mismatch for request %s (actual %s)",
-                               vol, hash, filehash)
+                       log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
                        errorToCaller = DiskHashError
                        continue
                }
                if errorToCaller == DiskHashError {
-                       log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
-                               vol, hash)
+                       log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
                }
                return size, nil
        }
@@ -754,12 +743,14 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if mnt := volmgr.NextWritable(); mnt != nil {
-               if err := mnt.Put(ctx, hash, block); err == nil {
+               if err := mnt.Put(ctx, hash, block); err != nil {
+                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+               } else {
                        return mnt.Replication, nil // success!
                }
-               if ctx.Err() != nil {
-                       return 0, ErrClientDisconnect
-               }
+       }
+       if ctx.Err() != nil {
+               return 0, ErrClientDisconnect
        }
 
        writables := volmgr.AllWritable()
@@ -774,15 +765,17 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
                if ctx.Err() != nil {
                        return 0, ErrClientDisconnect
                }
-               if err == nil {
+               switch err {
+               case nil:
                        return vol.Replication, nil // success!
-               }
-               if err != FullError {
+               case FullError:
+                       continue
+               default:
                        // The volume is not full but the
                        // write did not succeed.  Report the
                        // error and continue trying.
                        allFull = false
-                       log.Errorf("%s: Write(%s): %s", vol, hash, err)
+                       log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
                }
        }
 
@@ -800,6 +793,7 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+       log := ctxlog.FromContext(ctx)
        var bestErr error = NotFoundError
        for _, mnt := range volmgr.AllWritable() {
                err := mnt.Compare(ctx, hash, buf)
@@ -811,7 +805,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // to tell which one is wanted if we have
                        // both, so there's no point writing it even
                        // on a different volume.)
-                       log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+                       log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
                        return 0, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
@@ -821,11 +815,11 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // Couldn't open file, data is corrupt on
                        // disk, etc.: log this abnormal condition,
                        // and try the next volume.
-                       log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+                       log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
                        continue
                }
                if err := mnt.Touch(hash); err != nil {
-                       log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
+                       log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
                        bestErr = err
                        continue
                }
@@ -859,18 +853,6 @@ func GetAPIToken(req *http.Request) string {
        return ""
 }
 
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
-       ts, err := strconv.ParseInt(timestampHex, 16, 0)
-       if err != nil {
-               log.Printf("IsExpired: %s", err)
-               return true
-       }
-       return time.Unix(ts, 0).Before(time.Now())
-}
-
 // canDelete returns true if the user identified by apiToken is
 // allowed to delete blocks.
 func (rtr *router) canDelete(apiToken string) bool {