Merge branch 'master' into 4194-keep-logging
[arvados.git] / services / keepstore / handlers.go
index 9838694ac83b5d5b1db75e2778f8718fead226af..bd1ca67bfc26643190b2e94a0169bc58f2030c88 100644 (file)
@@ -10,6 +10,7 @@ package main
 import (
        "bufio"
        "bytes"
+       "container/list"
        "crypto/md5"
        "encoding/json"
        "fmt"
@@ -58,6 +59,19 @@ func MakeRESTRouter() *mux.Router {
                `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
        rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
 
+       // The PullHandler and TrashHandler process "PUT /pull" and "PUT
+       // /trash" requests from Data Manager.  These requests instruct
+       // Keep to replicate or delete blocks; see
+       // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
+       // for more details.
+       //
+       // Each handler parses the JSON list of block management requests
+       // in the message body, and replaces any existing pull queue or
+       // trash queue with their contentes.
+       //
+       rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+       rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+
        // Any request which does not match any of these routes gets
        // 400 Bad Request.
        rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -104,8 +118,6 @@ func FindKeepVolumes() []string {
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        hash := mux.Vars(req)["hash"]
 
-       log.Printf("%s %s", req.Method, hash)
-
        hints := mux.Vars(req)["hints"]
 
        // Parse the locator string and hints from the request.
@@ -148,7 +160,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       block, err := GetBlock(hash)
+       block, err := GetBlock(hash, false)
 
        // Garbage collect after each GET. Fixes #2865.
        // TODO(twp): review Keep memory usage and see if there's
@@ -159,9 +171,6 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        if err != nil {
                // This type assertion is safe because the only errors
                // GetBlock can return are DiskHashError or NotFoundError.
-               if err == NotFoundError {
-                       log.Printf("%s: not found, giving up\n", hash)
-               }
                http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
                return
        }
@@ -169,9 +178,6 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
 
        _, err = resp.Write(block)
-       if err != nil {
-               log.Printf("GetBlockHandler: writing response: %s", err)
-       }
 
        return
 }
@@ -183,8 +189,6 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 
        hash := mux.Vars(req)["hash"]
 
-       log.Printf("%s %s", req.Method, hash)
-
        // Read the block data to be stored.
        // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
        //
@@ -222,18 +226,14 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 //     A HandleFunc to address /index and /index/{prefix} requests.
 //
 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
-       prefix := mux.Vars(req)["prefix"]
-
-       // Only the data manager may issue /index requests,
-       // and only if enforce_permissions is enabled.
-       // All other requests return 403 Forbidden.
-       api_token := GetApiToken(req)
-       if !enforce_permissions ||
-               api_token == "" ||
-               data_manager_token != api_token {
-               http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
+       // Reject unauthorized requests.
+       if !IsDataManagerToken(GetApiToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
+
+       prefix := mux.Vars(req)["prefix"]
+
        var index string
        for _, vol := range KeepVM.Volumes() {
                index = index + vol.Index(prefix)
@@ -345,7 +345,6 @@ func GetVolumeStatus(volume string) *VolumeStatus {
 //
 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
        hash := mux.Vars(req)["hash"]
-       log.Printf("%s %s", req.Method, hash)
 
        // Confirm that this user is an admin and has a token with unlimited scope.
        var tok = GetApiToken(req)
@@ -397,7 +396,143 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
        }
 }
 
-func GetBlock(hash string) ([]byte, error) {
+/* PullHandler processes "PUT /pull" requests for the data manager.
+   The request body is a JSON message containing a list of pull
+   requests in the following format:
+
+   [
+      {
+         "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
+         "servers":[
+                       "keep0.qr1hi.arvadosapi.com:25107",
+                       "keep1.qr1hi.arvadosapi.com:25108"
+                ]
+         },
+         {
+                "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
+                "servers":[
+                       "10.0.1.5:25107",
+                       "10.0.1.6:25107",
+                       "10.0.1.7:25108"
+                ]
+         },
+         ...
+   ]
+
+   Each pull request in the list consists of a block locator string
+   and an ordered list of servers.  Keepstore should try to fetch the
+   block from each server in turn.
+
+   If the request has not been sent by the Data Manager, return 401
+   Unauthorized.
+
+   If the JSON unmarshalling fails, return 400 Bad Request.
+*/
+
+type PullRequest struct {
+       Locator string   `json:"locator"`
+       Servers []string `json:"servers"`
+}
+
+func PullHandler(resp http.ResponseWriter, req *http.Request) {
+       // Reject unauthorized requests.
+       if !IsDataManagerToken(GetApiToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               return
+       }
+
+       // Parse the request body.
+       var pr []PullRequest
+       r := json.NewDecoder(req.Body)
+       if err := r.Decode(&pr); err != nil {
+               http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+               return
+       }
+
+       // We have a properly formatted pull list sent from the data
+       // manager.  Report success and send the list to the pull list
+       // manager for further handling.
+       resp.WriteHeader(http.StatusOK)
+       resp.Write([]byte(
+               fmt.Sprintf("Received %d pull requests\n", len(pr))))
+
+       plist := list.New()
+       for _, p := range pr {
+               plist.PushBack(p)
+       }
+
+       if pullq == nil {
+               pullq = NewWorkQueue()
+       }
+       pullq.ReplaceQueue(plist)
+}
+
+type TrashRequest struct {
+       Locator    string `json:"locator"`
+       BlockMtime int64  `json:"block_mtime"`
+}
+
+func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+       // Reject unauthorized requests.
+       if !IsDataManagerToken(GetApiToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               return
+       }
+
+       // Parse the request body.
+       var trash []TrashRequest
+       r := json.NewDecoder(req.Body)
+       if err := r.Decode(&trash); err != nil {
+               http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+               return
+       }
+
+       // We have a properly formatted trash list sent from the data
+       // manager.  Report success and send the list to the trash work
+       // queue for further handling.
+       resp.WriteHeader(http.StatusOK)
+       resp.Write([]byte(
+               fmt.Sprintf("Received %d trash requests\n", len(trash))))
+
+       tlist := list.New()
+       for _, t := range trash {
+               tlist.PushBack(t)
+       }
+
+       if trashq == nil {
+               trashq = NewWorkQueue()
+       }
+       trashq.ReplaceQueue(tlist)
+}
+
+// ==============================
+// GetBlock and PutBlock implement lower-level code for handling
+// blocks by rooting through volumes connected to the local machine.
+// Once the handler has determined that system policy permits the
+// request, it calls these methods to perform the actual operation.
+//
+// TODO(twp): this code would probably be better located in the
+// VolumeManager interface. As an abstraction, the VolumeManager
+// should be the only part of the code that cares about which volume a
+// block is stored on, so it should be responsible for figuring out
+// which volume to check for fetching blocks, storing blocks, etc.
+
+// ==============================
+// GetBlock fetches and returns the block identified by "hash".  If
+// the update_timestamp argument is true, GetBlock also updates the
+// block's file modification time (for the sake of PutBlock, which
+// must update the file's timestamp when the block already exists).
+//
+// On success, GetBlock returns a byte slice with the block data, and
+// a nil error.
+//
+// If the block cannot be found on any volume, returns NotFoundError.
+//
+// If the block found does not have the correct MD5 hash, returns
+// DiskHashError.
+//
+
+func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
        // Attempt to read the requested hash from a keep volume.
        error_to_caller := NotFoundError
 
@@ -432,6 +567,14 @@ func GetBlock(hash string) ([]byte, error) {
                                        log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
                                                vol, hash)
                                }
+                               // Update the timestamp if the caller requested.
+                               // If we could not update the timestamp, continue looking on
+                               // other volumes.
+                               if update_timestamp {
+                                       if vol.Touch(hash) != nil {
+                                               continue
+                                       }
+                               }
                                return buf, nil
                        }
                }
@@ -478,12 +621,15 @@ func PutBlock(block []byte, hash string) error {
        }
 
        // If we already have a block on disk under this identifier, return
-       // success (but check for MD5 collisions).
+       // success (but check for MD5 collisions).  While fetching the block,
+       // update its timestamp.
        // The only errors that GetBlock can return are DiskHashError and NotFoundError.
        // In either case, we want to write our new (good) block to disk,
        // so there is nothing special to do if err != nil.
-       if oldblock, err := GetBlock(hash); err == nil {
+       //
+       if oldblock, err := GetBlock(hash, true); err == nil {
                if bytes.Compare(block, oldblock) == 0 {
+                       // The block already exists; return success.
                        return nil
                } else {
                        return CollisionError
@@ -568,10 +714,7 @@ func CanDelete(api_token string) bool {
        }
        // Blocks may be deleted only when Keep has been configured with a
        // data manager.
-       if data_manager_token == "" {
-               return false
-       }
-       if api_token == data_manager_token {
+       if IsDataManagerToken(api_token) {
                return true
        }
        // TODO(twp): look up api_token with the API server
@@ -579,3 +722,9 @@ func CanDelete(api_token string) bool {
        // has unlimited scope
        return false
 }
+
+// IsDataManagerToken returns true if api_token represents the data
+// manager's token.
+func IsDataManagerToken(api_token string) bool {
+       return data_manager_token != "" && api_token == data_manager_token
+}