Merge branch '3705-keep-blockworklist'
[arvados.git] / services / keepstore / handlers.go
index 9838694ac83b5d5b1db75e2778f8718fead226af..809520769754506f8c3bae8f7b49db6df3bf4bb0 100644 (file)
@@ -10,6 +10,7 @@ package main
 import (
        "bufio"
        "bytes"
+       "container/list"
        "crypto/md5"
        "encoding/json"
        "fmt"
@@ -58,6 +59,11 @@ func MakeRESTRouter() *mux.Router {
                `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
        rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
 
+       // The PullHandler processes "PUT /pull" commands from Data Manager.
+       // It parses the JSON list of pull requests in the request body, and
+       // delivers them to the pull list manager for replication.
+       rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+
        // Any request which does not match any of these routes gets
        // 400 Bad Request.
        rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -148,7 +154,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       block, err := GetBlock(hash)
+       block, err := GetBlock(hash, false)
 
        // Garbage collect after each GET. Fixes #2865.
        // TODO(twp): review Keep memory usage and see if there's
@@ -397,7 +403,109 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
        }
 }
 
-func GetBlock(hash string) ([]byte, error) {
+/* PullHandler processes "PUT /pull" requests for the data manager.
+   The request body is a JSON message containing a list of pull
+   requests in the following format:
+
+   [
+      {
+         "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
+         "servers":[
+                       "keep0.qr1hi.arvadosapi.com:25107",
+                       "keep1.qr1hi.arvadosapi.com:25108"
+                ]
+         },
+         {
+                "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
+                "servers":[
+                       "10.0.1.5:25107",
+                       "10.0.1.6:25107",
+                       "10.0.1.7:25108"
+                ]
+         },
+         ...
+   ]
+
+   Each pull request in the list consists of a block locator string
+   and an ordered list of servers.  Keepstore should try to fetch the
+   block from each server in turn.
+
+   If the request has not been sent by the Data Manager, return 401
+   Unauthorized.
+
+   If the JSON unmarshalling fails, return 400 Bad Request.
+*/
+
+type PullRequest struct {
+       Locator string   `json:"locator"`
+       Servers []string `json:"servers"`
+}
+
+func PullHandler(resp http.ResponseWriter, req *http.Request) {
+       // Reject unauthorized requests.
+       api_token := GetApiToken(req)
+       if !IsDataManagerToken(api_token) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
+               return
+       }
+
+       // Parse the request body.
+       var pr []PullRequest
+       r := json.NewDecoder(req.Body)
+       if err := r.Decode(&pr); err != nil {
+               http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+               log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
+               return
+       }
+
+       // We have a properly formatted pull list sent from the data
+       // manager.  Report success and send the list to the pull list
+       // manager for further handling.
+       log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
+       resp.WriteHeader(http.StatusOK)
+       resp.Write([]byte(
+               fmt.Sprintf("Received %d pull requests\n", len(pr))))
+
+       plist := list.New()
+       for _, p := range pr {
+               plist.PushBack(p)
+       }
+
+       if pullmgr == nil {
+               pullmgr = NewBlockWorkList()
+       }
+       pullmgr.ReplaceList(plist)
+}
+
+// ==============================
+// GetBlock and PutBlock implement lower-level code for handling
+// blocks by rooting through volumes connected to the local machine.
+// Once the handler has determined that system policy permits the
+// request, it calls these methods to perform the actual operation.
+//
+// TODO(twp): this code would probably be better located in the
+// VolumeManager interface. As an abstraction, the VolumeManager
+// should be the only part of the code that cares about which volume a
+// block is stored on, so it should be responsible for figuring out
+// which volume to check for fetching blocks, storing blocks, etc.
+
+// ==============================
+// GetBlock fetches and returns the block identified by "hash".  If
+// the update_timestamp argument is true, GetBlock also updates the
+// block's file modification time (for the sake of PutBlock, which
+// must update the file's timestamp when the block already exists).
+//
+// On success, GetBlock returns a byte slice with the block data, and
+// a nil error.
+//
+// If the block cannot be found on any volume, returns NotFoundError.
+//
+// If the block found does not have the correct MD5 hash, returns
+// DiskHashError.
+//
+
+func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
        // Attempt to read the requested hash from a keep volume.
        error_to_caller := NotFoundError
 
@@ -432,6 +540,14 @@ func GetBlock(hash string) ([]byte, error) {
                                        log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
                                                vol, hash)
                                }
+                               // Update the timestamp if the caller requested.
+                               // If we could not update the timestamp, continue looking on
+                               // other volumes.
+                               if update_timestamp {
+                                       if vol.Touch(hash) != nil {
+                                               continue
+                                       }
+                               }
                                return buf, nil
                        }
                }
@@ -478,12 +594,15 @@ func PutBlock(block []byte, hash string) error {
        }
 
        // If we already have a block on disk under this identifier, return
-       // success (but check for MD5 collisions).
+       // success (but check for MD5 collisions).  While fetching the block,
+       // update its timestamp.
        // The only errors that GetBlock can return are DiskHashError and NotFoundError.
        // In either case, we want to write our new (good) block to disk,
        // so there is nothing special to do if err != nil.
-       if oldblock, err := GetBlock(hash); err == nil {
+       //
+       if oldblock, err := GetBlock(hash, true); err == nil {
                if bytes.Compare(block, oldblock) == 0 {
+                       // The block already exists; return success.
                        return nil
                } else {
                        return CollisionError
@@ -568,10 +687,7 @@ func CanDelete(api_token string) bool {
        }
        // Blocks may be deleted only when Keep has been configured with a
        // data manager.
-       if data_manager_token == "" {
-               return false
-       }
-       if api_token == data_manager_token {
+       if IsDataManagerToken(api_token) {
                return true
        }
        // TODO(twp): look up api_token with the API server
@@ -579,3 +695,9 @@ func CanDelete(api_token string) bool {
        // has unlimited scope
        return false
 }
+
+// IsDataManagerToken returns true if api_token represents the data
+// manager's token.
+func IsDataManagerToken(api_token string) bool {
+       return data_manager_token != "" && api_token == data_manager_token
+}