"encoding/json"
"fmt"
"io"
- "log"
"net/http"
"os"
"regexp"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/ctxlog"
- "git.curoverse.com/arvados.git/sdk/go/health"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/health"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
// List blocks stored here whose hash has the given prefix.
// Privileged client only.
rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
+ // Update timestamp on existing block. Privileged client only.
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
// Internals/debugging info (runtime.MemStats)
rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
}
}
+func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+ hash := mux.Vars(req)["hash"]
+ vols := rtr.volmgr.AllWritable()
+ if len(vols) == 0 {
+ http.Error(resp, "no volumes", http.StatusNotFound)
+ return
+ }
+ var err error
+ for _, mnt := range vols {
+ err = mnt.Touch(hash)
+ if err == nil {
+ break
+ }
+ }
+ switch {
+ case err == nil:
+ return
+ case os.IsNotExist(err):
+ http.Error(resp, err.Error(), http.StatusNotFound)
+ default:
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ }
+}
+
func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := contextForResponse(context.TODO(), resp)
defer cancel()
for _, v := range vols {
if err := v.IndexTo(prefix, resp); err != nil {
- // We can't send an error message to the
- // client because we might have already sent
- // headers and index content. All we can do is
- // log the error in our own logs, and (in
- // cases where headers haven't been sent yet)
- // set a 500 status.
+ // We can't send an error status/message to
+ // the client because IndexTo() might have
+ // already written body content. All we can do
+ // is log the error in our own logs.
//
- // If headers have already been sent, the
- // client must notice the lack of trailing
+ // The client must notice the lack of trailing
// newline as an indication that the response
// is incomplete.
- log.Printf("index error from volume %s: %s", v, err)
- http.Error(resp, "", http.StatusInternalServerError)
+ ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
return
}
}
}
var ds debugStats
runtime.ReadMemStats(&ds.MemStats)
- err := json.NewEncoder(resp).Encode(&ds)
+ data, err := json.Marshal(&ds)
if err != nil {
- http.Error(resp, err.Error(), 500)
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// StatusHandler addresses /status.json requests.
func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
rtr.readNodeStatus(&st)
- jstat, err := json.Marshal(&st)
+ data, err := json.Marshal(&st)
stLock.Unlock()
- if err == nil {
- resp.Write(jstat)
- } else {
- log.Printf("json.Marshal: %s", err)
- log.Printf("NodeStatus = %v", &st)
- http.Error(resp, err.Error(), 500)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// populate the given NodeStatus struct with current values.
continue
} else {
result.Failed++
- log.Println("DeleteHandler:", err)
+ ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
}
}
-
- var st int
-
if result.Deleted == 0 && result.Failed == 0 {
- st = http.StatusNotFound
- } else {
- st = http.StatusOK
+ resp.WriteHeader(http.StatusNotFound)
+ return
}
-
- resp.WriteHeader(st)
-
- if st == http.StatusOK {
- if body, err := json.Marshal(result); err == nil {
- resp.Write(body)
- } else {
- log.Printf("json.Marshal: %s (result = %v)", err, result)
- http.Error(resp, err.Error(), 500)
- }
+ body, err := json.Marshal(result)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(body)
}
/* PullHandler processes "PUT /pull" requests for the data manager.
return
}
+ log := ctxlog.FromContext(req.Context())
hash := mux.Vars(req)["hash"]
if len(rtr.volmgr.AllWritable()) == 0 {
if os.IsNotExist(err) {
numNotFound++
} else if err != nil {
- log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+ log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
failedOn = append(failedOn, vol.String())
} else {
- log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ log.Infof("Untrashed %v on volume %v", hash, vol.String())
untrashedOn = append(untrashedOn, vol.String())
}
}
if numNotFound == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
- return
- }
-
- if len(failedOn) == len(rtr.volmgr.AllWritable()) {
+ } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
} else {
- respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+ respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
if len(failedOn) > 0 {
- respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+ respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+ http.Error(resp, respBody, http.StatusInternalServerError)
+ } else {
+ fmt.Fprintln(resp, respBody)
}
- resp.Write([]byte(respBody))
}
}
// DiskHashError.
//
func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+ log := ctxlog.FromContext(ctx)
+
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
// volumes. If all volumes report IsNotExist,
// we return a NotFoundError.
if !os.IsNotExist(err) {
- log.Printf("%s: Get(%s): %s", vol, hash, err)
+ log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
}
// If some volume returns a transient error, return it to the caller
// instead of "Not found" so it can retry.
continue
}
// Check the file checksum.
- //
filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
if filehash != hash {
// TODO: Try harder to tell a sysadmin about
// this.
- log.Printf("%s: checksum mismatch for request %s (actual %s)",
- vol, hash, filehash)
+ log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
errorToCaller = DiskHashError
continue
}
if errorToCaller == DiskHashError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
- vol, hash)
+ log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
}
return size, nil
}
// provide as much detail as possible.
//
func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+ log := ctxlog.FromContext(ctx)
+
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if mnt := volmgr.NextWritable(); mnt != nil {
- if err := mnt.Put(ctx, hash, block); err == nil {
+ if err := mnt.Put(ctx, hash, block); err != nil {
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+ } else {
return mnt.Replication, nil // success!
}
- if ctx.Err() != nil {
- return 0, ErrClientDisconnect
- }
+ }
+ if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
}
writables := volmgr.AllWritable()
if len(writables) == 0 {
- log.Print("No writable volumes.")
+ log.Error("no writable volumes")
return 0, FullError
}
if ctx.Err() != nil {
return 0, ErrClientDisconnect
}
- if err == nil {
+ switch err {
+ case nil:
return vol.Replication, nil // success!
- }
- if err != FullError {
+ case FullError:
+ continue
+ default:
// The volume is not full but the
// write did not succeed. Report the
// error and continue trying.
allFull = false
- log.Printf("%s: Write(%s): %s", vol, hash, err)
+ log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
}
}
if allFull {
- log.Print("All volumes are full.")
+ log.Error("all volumes are full")
return 0, FullError
}
// Already logged the non-full errors.
// premature garbage collection. Otherwise, it returns a non-nil
// error.
func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+ log := ctxlog.FromContext(ctx)
var bestErr error = NotFoundError
for _, mnt := range volmgr.AllWritable() {
err := mnt.Compare(ctx, hash, buf)
// to tell which one is wanted if we have
// both, so there's no point writing it even
// on a different volume.)
- log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+ log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
return 0, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// Couldn't open file, data is corrupt on
// disk, etc.: log this abnormal condition,
// and try the next volume.
- log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+ log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
continue
}
if err := mnt.Touch(hash); err != nil {
- log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
+ log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
bestErr = err
continue
}
return ""
}
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
- ts, err := strconv.ParseInt(timestampHex, 16, 0)
- if err != nil {
- log.Printf("IsExpired: %s", err)
- return true
- }
- return time.Unix(ts, 0).Before(time.Now())
-}
-
// canDelete returns true if the user identified by apiToken is
// allowed to delete blocks.
func (rtr *router) canDelete(apiToken string) bool {