package main
-// REST handlers for Keep are implemented here.
-//
-// GetBlockHandler (GET /locator)
-// PutBlockHandler (PUT /locator)
-// IndexHandler (GET /index, GET /index/prefix)
-// StatusHandler (GET /status.json)
-
import (
"container/list"
"context"
"encoding/json"
"fmt"
"io"
+ "log"
"net/http"
"os"
"regexp"
"sync"
"time"
- "github.com/gorilla/mux"
-
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/gorilla/mux"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
type router struct {
*mux.Router
- limiter httpserver.RequestCounter
+ cluster *arvados.Cluster
+ logger logrus.FieldLogger
+ remoteProxy remoteProxy
+ metrics *nodeMetrics
+ volmgr *RRVolumeManager
+ pullq *WorkQueue
+ trashq *WorkQueue
}
// MakeRESTRouter returns a new router that forwards all Keep requests
// to the appropriate handlers.
-func MakeRESTRouter() http.Handler {
- rtr := &router{Router: mux.NewRouter()}
+func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
+ rtr := &router{
+ Router: mux.NewRouter(),
+ cluster: cluster,
+ logger: ctxlog.FromContext(ctx),
+ metrics: &nodeMetrics{reg: reg},
+ volmgr: volmgr,
+ pullq: pullq,
+ trashq: trashq,
+ }
rtr.HandleFunc(
- `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+ `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
rtr.HandleFunc(
`/{hash:[0-9a-f]{32}}+{hints}`,
- GetBlockHandler).Methods("GET", "HEAD")
+ rtr.handleGET).Methods("GET", "HEAD")
- rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
- rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
// List all blocks stored here. Privileged client only.
- rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
+ rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
// List blocks stored here whose hash has the given prefix.
// Privileged client only.
- rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
+ rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
// Internals/debugging info (runtime.MemStats)
rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
// List mounts: UUID, readonly, tier, device ID, ...
rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
- rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
- rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
+ rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
+ rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
// Replace the current pull queue.
- rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+ rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
// Replace the current trash queue.
- rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+ rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
// Untrash moves blocks from trash back into store
- rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+ rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
rtr.Handle("/_health/{check}", &health.Handler{
- Token: theConfig.ManagementToken,
+ Token: cluster.ManagementToken,
Prefix: "/_health/",
}).Methods("GET")
// 400 Bad Request.
rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
- theConfig.metrics.setup()
-
- rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
+ rtr.metrics.setupBufferPoolMetrics(bufs)
+ rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
+ rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
- mux := http.NewServeMux()
- mux.Handle("/", theConfig.metrics.Instrument(
- httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
- mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON)
- mux.Handle("/metrics", theConfig.metrics.exportProm)
-
- return mux
+ return rtr
}
// BadRequestHandler is a HandleFunc to address bad requests.
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
-// GetBlockHandler is a HandleFunc to address Get block requests.
-func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := contextForResponse(context.TODO(), resp)
defer cancel()
- if theConfig.RequireSignatures {
+ locator := req.URL.Path[1:]
+ if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
+ rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+ return
+ }
+
+ if rtr.cluster.Collections.BlobSigning {
locator := req.URL.Path[1:] // strip leading slash
- if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
+ if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
}
defer bufs.Put(buf)
- size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
+ size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
go func(c <-chan bool) {
select {
case <-c:
- theConfig.debugLogf("cancel context")
cancel()
case <-ctx.Done():
}
}
}
-// PutBlockHandler is a HandleFunc to address Put block requests.
-func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := contextForResponse(context.TODO(), resp)
defer cancel()
return
}
- if len(KeepVM.AllWritable()) == 0 {
+ if len(rtr.volmgr.AllWritable()) == 0 {
http.Error(resp, FullError.Error(), FullError.HTTPCode)
return
}
return
}
- replication, err := PutBlock(ctx, buf, hash)
+ replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
bufs.Put(buf)
if err != nil {
// return it to the client.
returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
apiToken := GetAPIToken(req)
- if theConfig.blobSigningKey != nil && apiToken != "" {
- expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
- returnHash = SignLocator(returnHash, apiToken, expiry)
+ if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
+ expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
+ returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
}
resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
resp.Write([]byte(returnHash + "\n"))
// IndexHandler responds to "/index", "/index/{prefix}", and
// "/mounts/{uuid}/blocks" requests.
-func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
- if !IsSystemAuth(GetAPIToken(req)) {
+func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
uuid := mux.Vars(req)["uuid"]
- var vols []Volume
+ var vols []*VolumeMount
if uuid == "" {
- vols = KeepVM.AllReadable()
- } else if v := KeepVM.Lookup(uuid, false); v == nil {
+ vols = rtr.volmgr.AllReadable()
+ } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
http.Error(resp, "mount not found", http.StatusNotFound)
return
} else {
- vols = []Volume{v}
+ vols = []*VolumeMount{mnt}
}
for _, v := range vols {
if err := v.IndexTo(prefix, resp); err != nil {
- // The only errors returned by IndexTo are
- // write errors returned by resp.Write(),
- // which probably means the client has
- // disconnected and this error will never be
- // reported to the client -- but it will
- // appear in our own error log.
- http.Error(resp, err.Error(), http.StatusInternalServerError)
+ // We can't send an error message to the
+ // client because we might have already sent
+ // headers and index content. All we can do is
+ // log the error in our own logs, and (in
+ // cases where headers haven't been sent yet)
+ // set a 500 status.
+ //
+ // If headers have already been sent, the
+ // client must notice the lack of trailing
+ // newline as an indication that the response
+ // is incomplete.
+ log.Printf("index error from volume %s: %s", v, err)
+ http.Error(resp, "", http.StatusInternalServerError)
return
}
}
// MountsHandler responds to "GET /mounts" requests.
func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
- err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
+ err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
if err != nil {
- http.Error(resp, err.Error(), http.StatusInternalServerError)
+ httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
}
}
// populate the given NodeStatus struct with current values.
func (rtr *router) readNodeStatus(st *NodeStatus) {
st.Version = version
- vols := KeepVM.AllReadable()
+ vols := rtr.volmgr.AllReadable()
if cap(st.Volumes) < len(vols) {
st.Volumes = make([]*volumeStatusEnt, len(vols))
}
st.Volumes = st.Volumes[:0]
for _, vol := range vols {
var internalStats interface{}
- if vol, ok := vol.(InternalStatser); ok {
+ if vol, ok := vol.Volume.(InternalStatser); ok {
internalStats = vol.InternalStats()
}
st.Volumes = append(st.Volumes, &volumeStatusEnt{
Label: vol.String(),
Status: vol.Status(),
InternalStats: internalStats,
- //VolumeStats: KeepVM.VolumeStats(vol),
+ //VolumeStats: rtr.volmgr.VolumeStats(vol),
})
}
st.BufferPool.Alloc = bufs.Alloc()
st.BufferPool.Cap = bufs.Cap()
st.BufferPool.Len = bufs.Len()
- st.PullQueue = getWorkQueueStatus(pullq)
- st.TrashQueue = getWorkQueueStatus(trashq)
- if rtr.limiter != nil {
- st.RequestsCurrent = rtr.limiter.Current()
- st.RequestsMax = rtr.limiter.Max()
- }
+ st.PullQueue = getWorkQueueStatus(rtr.pullq)
+ st.TrashQueue = getWorkQueueStatus(rtr.trashq)
}
// return a WorkQueueStatus for the given queue. If q is nil (which
return q.Status()
}
-// DeleteHandler processes DELETE requests.
+// handleDELETE processes DELETE requests.
//
// DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
// from all connected volumes.
// a PermissionError.
//
// Upon receiving a valid request from an authorized user,
-// DeleteHandler deletes all copies of the specified block on local
+// handleDELETE deletes all copies of the specified block on local
// writable volumes.
//
// Response format:
// where d and f are integers representing the number of blocks that
// were successfully and unsuccessfully deleted.
//
-func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
// Confirm that this user is an admin and has a token with unlimited scope.
var tok = GetAPIToken(req)
- if tok == "" || !CanDelete(tok) {
+ if tok == "" || !rtr.canDelete(tok) {
http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
return
}
- if !theConfig.EnableDelete {
+ if !rtr.cluster.Collections.BlobTrash {
http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
return
}
Deleted int `json:"copies_deleted"`
Failed int `json:"copies_failed"`
}
- for _, vol := range KeepVM.AllWritable() {
+ for _, vol := range rtr.volmgr.AllWritable() {
if err := vol.Trash(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
}
// PullHandler processes "PUT /pull" requests for the data manager.
-func PullHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsSystemAuth(GetAPIToken(req)) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
for _, p := range pr {
plist.PushBack(p)
}
- pullq.ReplaceQueue(plist)
+ rtr.pullq.ReplaceQueue(plist)
}
-// TrashRequest consists of a block locator and it's Mtime
+// TrashRequest consists of a block locator and its Mtime
type TrashRequest struct {
Locator string `json:"locator"`
BlockMtime int64 `json:"block_mtime"`
}
// TrashHandler processes /trash requests.
-func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsSystemAuth(GetAPIToken(req)) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
for _, t := range trash {
tlist.PushBack(t)
}
- trashq.ReplaceQueue(tlist)
+ rtr.trashq.ReplaceQueue(tlist)
}
// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
-func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsSystemAuth(GetAPIToken(req)) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
hash := mux.Vars(req)["hash"]
- if len(KeepVM.AllWritable()) == 0 {
+ if len(rtr.volmgr.AllWritable()) == 0 {
http.Error(resp, "No writable volumes", http.StatusNotFound)
return
}
var untrashedOn, failedOn []string
var numNotFound int
- for _, vol := range KeepVM.AllWritable() {
+ for _, vol := range rtr.volmgr.AllWritable() {
err := vol.Untrash(hash)
if os.IsNotExist(err) {
}
}
- if numNotFound == len(KeepVM.AllWritable()) {
+ if numNotFound == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
return
}
- if len(failedOn) == len(KeepVM.AllWritable()) {
+ if len(failedOn) == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
} else {
respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
// If the block found does not have the correct MD5 hash, returns
// DiskHashError.
//
-func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
- for _, vol := range KeepVM.AllReadable() {
+ for _, vol := range volmgr.AllReadable() {
size, err := vol.Get(ctx, hash, buf)
select {
case <-ctx.Done():
if !os.IsNotExist(err) {
log.Printf("%s: Get(%s): %s", vol, hash, err)
}
+ // If some volume returns a transient error, return it to the caller
+ // instead of "Not found" so it can retry.
+ if err == VolumeBusyError {
+ errorToCaller = err.(*KeepError)
+ }
continue
}
// Check the file checksum.
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
+ if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
return n, err
} else if ctx.Err() != nil {
return 0, ErrClientDisconnect
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
- if vol := KeepVM.NextWritable(); vol != nil {
- if err := vol.Put(ctx, hash, block); err == nil {
- return vol.Replication(), nil // success!
+ if mnt := volmgr.NextWritable(); mnt != nil {
+ if err := mnt.Put(ctx, hash, block); err == nil {
+ return mnt.Replication, nil // success!
}
if ctx.Err() != nil {
return 0, ErrClientDisconnect
}
}
- writables := KeepVM.AllWritable()
+ writables := volmgr.AllWritable()
if len(writables) == 0 {
log.Print("No writable volumes.")
return 0, FullError
return 0, ErrClientDisconnect
}
if err == nil {
- return vol.Replication(), nil // success!
+ return vol.Replication, nil // success!
}
if err != FullError {
// The volume is not full but the
// the relevant block's modification time in order to protect it from
// premature garbage collection. Otherwise, it returns a non-nil
// error.
-func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
var bestErr error = NotFoundError
- for _, vol := range KeepVM.AllWritable() {
- err := vol.Compare(ctx, hash, buf)
+ for _, mnt := range volmgr.AllWritable() {
+ err := mnt.Compare(ctx, hash, buf)
if ctx.Err() != nil {
return 0, ctx.Err()
} else if err == CollisionError {
// to tell which one is wanted if we have
// both, so there's no point writing it even
// on a different volume.)
- log.Printf("%s: Compare(%s): %s", vol, hash, err)
+ log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
return 0, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// Couldn't open file, data is corrupt on
// disk, etc.: log this abnormal condition,
// and try the next volume.
- log.Printf("%s: Compare(%s): %s", vol, hash, err)
+ log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
continue
}
- if err := vol.Touch(hash); err != nil {
- log.Printf("%s: Touch %s failed: %s", vol, hash, err)
+ if err := mnt.Touch(hash); err != nil {
+ log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
bestErr = err
continue
}
// Compare and Touch both worked --> done.
- return vol.Replication(), nil
+ return mnt.Replication, nil
}
return 0, bestErr
}
return validLocatorRe.MatchString(loc)
}
-var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
+var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
// GetAPIToken returns the OAuth2 token from the Authorization
// header of a HTTP request, or an empty string if no matching
func GetAPIToken(req *http.Request) string {
if auth, ok := req.Header["Authorization"]; ok {
if match := authRe.FindStringSubmatch(auth[0]); match != nil {
- return match[1]
+ return match[2]
}
}
return ""
return time.Unix(ts, 0).Before(time.Now())
}
-// CanDelete returns true if the user identified by apiToken is
+// canDelete returns true if the user identified by apiToken is
// allowed to delete blocks.
-func CanDelete(apiToken string) bool {
+func (rtr *router) canDelete(apiToken string) bool {
if apiToken == "" {
return false
}
// Blocks may be deleted only when Keep has been configured with a
// data manager.
- if IsSystemAuth(apiToken) {
+ if rtr.isSystemAuth(apiToken) {
return true
}
// TODO(twp): look up apiToken with the API server
return false
}
-// IsSystemAuth returns true if the given token is allowed to perform
+// isSystemAuth returns true if the given token is allowed to perform
// system level actions like deleting data.
-func IsSystemAuth(token string) bool {
- return token != "" && token == theConfig.systemAuthToken
+func (rtr *router) isSystemAuth(token string) bool {
+ return token != "" && token == rtr.cluster.SystemRootToken
}