3413: added TrashHandler
authorTim Pierce <twp@curoverse.com>
Fri, 19 Sep 2014 19:50:14 +0000 (15:50 -0400)
committerTim Pierce <twp@curoverse.com>
Fri, 19 Sep 2014 20:20:17 +0000 (16:20 -0400)
* added a trashq WorkQueue to manage the trash list
* added TrashHandler to process "PUT /trash" requests
* added TestTrashHandler

services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go

index 0cfa1f30ddbd1e74ab7f5a25f931bac3905dadd3..55281764cdd595bf68434312f4cb9e9401936bd6 100644 (file)
@@ -630,25 +630,25 @@ func TestPullHandler(t *testing.T) {
        }
        var testcases = []pullTest{
                {
-                       "user token, good request",
+                       "pull: user token, good request",
                        RequestTester{"/pull", user_token, "PUT", good_json},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
-                       "user token, bad request",
+                       "pull: user token, bad request",
                        RequestTester{"/pull", user_token, "PUT", bad_json},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
-                       "data manager token, good request",
+                       "pull: data manager token, good request",
                        RequestTester{"/pull", data_manager_token, "PUT", good_json},
                        http.StatusOK,
                        "Received 3 pull requests\n",
                },
                {
-                       "data manager token, bad request",
+                       "pull: data manager token, bad request",
                        RequestTester{"/pull", data_manager_token, "PUT", bad_json},
                        http.StatusBadRequest,
                        "Bad Request\n",
@@ -674,6 +674,112 @@ func TestPullHandler(t *testing.T) {
        }
 }
 
+// TestTrashHandler
+//
+// Test cases:
+//
+// Cases tested: syntactically valid and invalid trash lists, from the
+// data manager and from unprivileged users:
+//
+//   1. Valid trash list from an ordinary user
+//      (expected result: 401 Unauthorized)
+//
+//   2. Invalid trash list from an ordinary user
+//      (expected result: 401 Unauthorized)
+//
+//   3. Valid trash list from the data manager
+//      (expected result: 200 OK with request body "Received 3 trash
+//      requests"
+//
+//   4. Invalid trash list from the data manager
+//      (expected result: 400 Bad Request)
+//
+// Test that in the end, the trash collector received a good list
+// trash list with the expected number of requests.
+//
+// TODO(twp): test concurrency: launch 100 goroutines to update the
+// pull list simultaneously.  Make sure that none of them return 400
+// Bad Request and that replica.Dump() returns a valid list.
+//
+func TestTrashHandler(t *testing.T) {
+       defer teardown()
+
+       // Set up a REST router for testing the handlers.
+       rest := MakeRESTRouter()
+
+       var user_token = "USER TOKEN"
+       data_manager_token = "DATA MANAGER TOKEN"
+
+       good_json := []byte(`[
+               {
+                       "locator":"block1",
+                       "block_mtime":1409082153
+               },
+               {
+                       "locator":"block2",
+                       "block_mtime":1409082153
+               },
+               {
+                       "locator":"block3",
+                       "block_mtime":1409082153
+               }
+       ]`)
+
+       bad_json := []byte(`I am not a valid JSON string`)
+
+       type trashTest struct {
+               name          string
+               req           RequestTester
+               response_code int
+               response_body string
+       }
+
+       var testcases = []trashTest{
+               {
+                       "trash: user token, good request",
+                       RequestTester{"/trash", user_token, "PUT", good_json},
+                       http.StatusUnauthorized,
+                       "Unauthorized\n",
+               },
+               {
+                       "trash: user token, bad request",
+                       RequestTester{"/trash", user_token, "PUT", bad_json},
+                       http.StatusUnauthorized,
+                       "Unauthorized\n",
+               },
+               {
+                       "trash: data manager token, good request",
+                       RequestTester{"/trash", data_manager_token, "PUT", good_json},
+                       http.StatusOK,
+                       "Received 3 trash requests\n",
+               },
+               {
+                       "trash: data manager token, bad request",
+                       RequestTester{"/trash", data_manager_token, "PUT", bad_json},
+                       http.StatusBadRequest,
+                       "Bad Request\n",
+               },
+       }
+
+       for _, tst := range testcases {
+               response := IssueRequest(rest, &tst.req)
+               ExpectStatusCode(t, tst.name, tst.response_code, response)
+               ExpectBody(t, tst.name, tst.response_body, response)
+       }
+
+       // The trash collector should have received one good list with 3
+       // requests on it.
+       var output_list = make([]TrashRequest, 3)
+       for i := 0; i < 3; i++ {
+               item := <-trashq.NextItem
+               if tr, ok := item.(TrashRequest); ok {
+                       output_list[i] = tr
+               } else {
+                       t.Errorf("item %v could not be parsed as a TrashRequest", item)
+               }
+       }
+}
+
 // ====================
 // Helper functions
 // ====================
index fde60879a25659d1e953a32ee8677f8769c51044..038e812c4fd09f2556174bf05a53d098b38bf194 100644 (file)
@@ -59,10 +59,12 @@ func MakeRESTRouter() *mux.Router {
                `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
        rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
 
-       // The PullHandler processes "PUT /pull" commands from Data Manager.
-       // It parses the JSON list of pull requests in the request body, and
-       // delivers them to the pull list manager for replication.
+       // The PullHandler and TrashHandler process "PUT /pull" and "PUT
+       // /trash" requests from Data Manager.  Each handler parses the
+       // JSON list of block management requests in the message body, and
+       // delivers them to the pull queue or trash queue, respectively.
        rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+       rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
        // Any request which does not match any of these routes gets
        // 400 Bad Request.
@@ -478,6 +480,48 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
        pullq.ReplaceQueue(plist)
 }
 
+type TrashRequest struct {
+       Locator    string `json:"locator"`
+       BlockMtime int64  `json:"block_mtime"`
+}
+
+func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+       // Reject unauthorized requests.
+       api_token := GetApiToken(req)
+       if !IsDataManagerToken(api_token) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
+               return
+       }
+
+       // Parse the request body.
+       var trash []TrashRequest
+       r := json.NewDecoder(req.Body)
+       if err := r.Decode(&trash); err != nil {
+               http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+               log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
+               return
+       }
+
+       // We have a properly formatted trash list sent from the data
+       // manager.  Report success and send the list to the trash work
+       // queue for further handling.
+       log.Printf("%s %s: received %v\n", req.Method, req.URL, trash)
+       resp.WriteHeader(http.StatusOK)
+       resp.Write([]byte(
+               fmt.Sprintf("Received %d trash requests\n", len(trash))))
+
+       tlist := list.New()
+       for _, t := range trash {
+               tlist.PushBack(t)
+       }
+
+       if trashq == nil {
+               trashq = NewWorkQueue()
+       }
+       trashq.ReplaceQueue(tlist)
+}
+
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
index 2437638cff8aa2bc9491a8af4ed86e4605f07ff8..d6eb6b22036bf576c47279d6de417eb1d63f037d 100644 (file)
@@ -91,12 +91,11 @@ func (e *KeepError) Error() string {
 // Initialized by the --volumes flag (or by FindKeepVolumes).
 var KeepVM VolumeManager
 
-// The pull list queue is a singleton pull list (a list of blocks
-// that the current keepstore process should be pulling from remote
-// keepstore servers in order to increase data replication) with
-// atomic update methods that are safe to use from multiple
-// goroutines.
+// The pull list manager and trash queue are threadsafe queues which
+// support atomic update operations. The PullHandler and TrashHandler
+// store results from Data Manager /pull and /trash requests here.
 var pullq *WorkQueue
+var trashq *WorkQueue
 
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing