X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f04693da1811e670d4cbb981debeecf14d79137c..4411f0b4e2a81f09d0ff6ff3f5e23cac5414236a:/services/keepstore/handlers.go?ds=sidebyside diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 86504422d5..3d0f893d82 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -11,7 +11,6 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" "os" "regexp" @@ -21,10 +20,10 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/ctxlog" - "git.curoverse.com/arvados.git/sdk/go/health" - "git.curoverse.com/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/health" + "git.arvados.org/arvados.git/sdk/go/httpserver" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" @@ -282,19 +281,15 @@ func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) { for _, v := range vols { if err := v.IndexTo(prefix, resp); err != nil { - // We can't send an error message to the - // client because we might have already sent - // headers and index content. All we can do is - // log the error in our own logs, and (in - // cases where headers haven't been sent yet) - // set a 500 status. + // We can't send an error status/message to + // the client because IndexTo() might have + // already written body content. All we can do + // is log the error in our own logs. // - // If headers have already been sent, the - // client must notice the lack of trailing + // The client must notice the lack of trailing // newline as an indication that the response // is incomplete. - log.Printf("index error from volume %s: %s", v, err) - http.Error(resp, "", http.StatusInternalServerError) + ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v) return } } @@ -346,25 +341,25 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) { } var ds debugStats runtime.ReadMemStats(&ds.MemStats) - err := json.NewEncoder(resp).Encode(&ds) + data, err := json.Marshal(&ds) if err != nil { - http.Error(resp, err.Error(), 500) + http.Error(resp, err.Error(), http.StatusInternalServerError) + return } + resp.Write(data) } // StatusHandler addresses /status.json requests. func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) { stLock.Lock() rtr.readNodeStatus(&st) - jstat, err := json.Marshal(&st) + data, err := json.Marshal(&st) stLock.Unlock() - if err == nil { - resp.Write(jstat) - } else { - log.Printf("json.Marshal: %s", err) - log.Printf("NodeStatus = %v", &st) - http.Error(resp, err.Error(), 500) + if err != nil { + http.Error(resp, err.Error(), http.StatusInternalServerError) + return } + resp.Write(data) } // populate the given NodeStatus struct with current values. @@ -461,28 +456,19 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) { continue } else { result.Failed++ - log.Println("DeleteHandler:", err) + ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol) } } - - var st int - if result.Deleted == 0 && result.Failed == 0 { - st = http.StatusNotFound - } else { - st = http.StatusOK + resp.WriteHeader(http.StatusNotFound) + return } - - resp.WriteHeader(st) - - if st == http.StatusOK { - if body, err := json.Marshal(result); err == nil { - resp.Write(body) - } else { - log.Printf("json.Marshal: %s (result = %v)", err, result) - http.Error(resp, err.Error(), 500) - } + body, err := json.Marshal(result) + if err != nil { + http.Error(resp, err.Error(), http.StatusInternalServerError) + return } + resp.Write(body) } /* PullHandler processes "PUT /pull" requests for the data manager. @@ -604,6 +590,7 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { return } + log := ctxlog.FromContext(req.Context()) hash := mux.Vars(req)["hash"] if len(rtr.volmgr.AllWritable()) == 0 { @@ -619,27 +606,26 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { if os.IsNotExist(err) { numNotFound++ } else if err != nil { - log.Printf("Error untrashing %v on volume %v", hash, vol.String()) + log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol) failedOn = append(failedOn, vol.String()) } else { - log.Printf("Untrashed %v on volume %v", hash, vol.String()) + log.Infof("Untrashed %v on volume %v", hash, vol.String()) untrashedOn = append(untrashedOn, vol.String()) } } if numNotFound == len(rtr.volmgr.AllWritable()) { http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound) - return - } - - if len(failedOn) == len(rtr.volmgr.AllWritable()) { + } else if len(failedOn) == len(rtr.volmgr.AllWritable()) { http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError) } else { - respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",") + respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ") if len(failedOn) > 0 { - respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",") + respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ") + http.Error(resp, respBody, http.StatusInternalServerError) + } else { + fmt.Fprintln(resp, respBody) } - resp.Write([]byte(respBody)) } } @@ -663,6 +649,8 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { // DiskHashError. // func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) { + log := ctxlog.FromContext(ctx) + // Attempt to read the requested hash from a keep volume. errorToCaller := NotFoundError @@ -680,7 +668,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b // volumes. If all volumes report IsNotExist, // we return a NotFoundError. if !os.IsNotExist(err) { - log.Printf("%s: Get(%s): %s", vol, hash, err) + log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol) } // If some volume returns a transient error, return it to the caller // instead of "Not found" so it can retry. @@ -690,19 +678,16 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b continue } // Check the file checksum. - // filehash := fmt.Sprintf("%x", md5.Sum(buf[:size])) if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. - log.Printf("%s: checksum mismatch for request %s (actual %s)", - vol, hash, filehash) + log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol) errorToCaller = DiskHashError continue } if errorToCaller == DiskHashError { - log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned", - vol, hash) + log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol) } return size, nil } @@ -737,6 +722,8 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b // provide as much detail as possible. // func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) { + log := ctxlog.FromContext(ctx) + // Check that BLOCK's checksum matches HASH. blockhash := fmt.Sprintf("%x", md5.Sum(block)) if blockhash != hash { @@ -756,17 +743,19 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s // Choose a Keep volume to write to. // If this volume fails, try all of the volumes in order. if mnt := volmgr.NextWritable(); mnt != nil { - if err := mnt.Put(ctx, hash, block); err == nil { + if err := mnt.Put(ctx, hash, block); err != nil { + log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) + } else { return mnt.Replication, nil // success! } - if ctx.Err() != nil { - return 0, ErrClientDisconnect - } + } + if ctx.Err() != nil { + return 0, ErrClientDisconnect } writables := volmgr.AllWritable() if len(writables) == 0 { - log.Print("No writable volumes.") + log.Error("no writable volumes") return 0, FullError } @@ -776,20 +765,22 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s if ctx.Err() != nil { return 0, ErrClientDisconnect } - if err == nil { + switch err { + case nil: return vol.Replication, nil // success! - } - if err != FullError { + case FullError: + continue + default: // The volume is not full but the // write did not succeed. Report the // error and continue trying. allFull = false - log.Printf("%s: Write(%s): %s", vol, hash, err) + log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash) } } if allFull { - log.Print("All volumes are full.") + log.Error("all volumes are full") return 0, FullError } // Already logged the non-full errors. @@ -802,6 +793,7 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s // premature garbage collection. Otherwise, it returns a non-nil // error. func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) { + log := ctxlog.FromContext(ctx) var bestErr error = NotFoundError for _, mnt := range volmgr.AllWritable() { err := mnt.Compare(ctx, hash, buf) @@ -813,7 +805,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, // to tell which one is wanted if we have // both, so there's no point writing it even // on a different volume.) - log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err) + log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume) return 0, err } else if os.IsNotExist(err) { // Block does not exist. This is the only @@ -823,11 +815,11 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, // Couldn't open file, data is corrupt on // disk, etc.: log this abnormal condition, // and try the next volume. - log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err) + log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume) continue } if err := mnt.Touch(hash); err != nil { - log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err) + log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume) bestErr = err continue } @@ -861,18 +853,6 @@ func GetAPIToken(req *http.Request) string { return "" } -// IsExpired returns true if the given Unix timestamp (expressed as a -// hexadecimal string) is in the past, or if timestampHex cannot be -// parsed as a hexadecimal string. -func IsExpired(timestampHex string) bool { - ts, err := strconv.ParseInt(timestampHex, 16, 0) - if err != nil { - log.Printf("IsExpired: %s", err) - return true - } - return time.Unix(ts, 0).Before(time.Now()) -} - // canDelete returns true if the user identified by apiToken is // allowed to delete blocks. func (rtr *router) canDelete(apiToken string) bool {