From cbd11b4bfd5bcc637abe0e7678239dd1e7a2fbd2 Mon Sep 17 00:00:00 2001 From: Tim Pierce Date: Fri, 19 Sep 2014 15:50:14 -0400 Subject: [PATCH] 3413: added TrashHandler * added a trashq WorkQueue to manage the trash list * added TrashHandler to process "PUT /trash" requests * added TestTrashHandler --- services/keepstore/handler_test.go | 114 ++++++++++++++++++++++++++++- services/keepstore/handlers.go | 50 ++++++++++++- services/keepstore/keepstore.go | 9 +-- 3 files changed, 161 insertions(+), 12 deletions(-) diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 0cfa1f30dd..55281764cd 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -630,25 +630,25 @@ func TestPullHandler(t *testing.T) { } var testcases = []pullTest{ { - "user token, good request", + "pull: user token, good request", RequestTester{"/pull", user_token, "PUT", good_json}, http.StatusUnauthorized, "Unauthorized\n", }, { - "user token, bad request", + "pull: user token, bad request", RequestTester{"/pull", user_token, "PUT", bad_json}, http.StatusUnauthorized, "Unauthorized\n", }, { - "data manager token, good request", + "pull: data manager token, good request", RequestTester{"/pull", data_manager_token, "PUT", good_json}, http.StatusOK, "Received 3 pull requests\n", }, { - "data manager token, bad request", + "pull: data manager token, bad request", RequestTester{"/pull", data_manager_token, "PUT", bad_json}, http.StatusBadRequest, "Bad Request\n", @@ -674,6 +674,112 @@ func TestPullHandler(t *testing.T) { } } +// TestTrashHandler +// +// Test cases: +// +// Cases tested: syntactically valid and invalid trash lists, from the +// data manager and from unprivileged users: +// +// 1. Valid trash list from an ordinary user +// (expected result: 401 Unauthorized) +// +// 2. Invalid trash list from an ordinary user +// (expected result: 401 Unauthorized) +// +// 3. Valid trash list from the data manager +// (expected result: 200 OK with request body "Received 3 trash +// requests" +// +// 4. Invalid trash list from the data manager +// (expected result: 400 Bad Request) +// +// Test that in the end, the trash collector received a good list +// trash list with the expected number of requests. +// +// TODO(twp): test concurrency: launch 100 goroutines to update the +// pull list simultaneously. Make sure that none of them return 400 +// Bad Request and that replica.Dump() returns a valid list. +// +func TestTrashHandler(t *testing.T) { + defer teardown() + + // Set up a REST router for testing the handlers. + rest := MakeRESTRouter() + + var user_token = "USER TOKEN" + data_manager_token = "DATA MANAGER TOKEN" + + good_json := []byte(`[ + { + "locator":"block1", + "block_mtime":1409082153 + }, + { + "locator":"block2", + "block_mtime":1409082153 + }, + { + "locator":"block3", + "block_mtime":1409082153 + } + ]`) + + bad_json := []byte(`I am not a valid JSON string`) + + type trashTest struct { + name string + req RequestTester + response_code int + response_body string + } + + var testcases = []trashTest{ + { + "trash: user token, good request", + RequestTester{"/trash", user_token, "PUT", good_json}, + http.StatusUnauthorized, + "Unauthorized\n", + }, + { + "trash: user token, bad request", + RequestTester{"/trash", user_token, "PUT", bad_json}, + http.StatusUnauthorized, + "Unauthorized\n", + }, + { + "trash: data manager token, good request", + RequestTester{"/trash", data_manager_token, "PUT", good_json}, + http.StatusOK, + "Received 3 trash requests\n", + }, + { + "trash: data manager token, bad request", + RequestTester{"/trash", data_manager_token, "PUT", bad_json}, + http.StatusBadRequest, + "Bad Request\n", + }, + } + + for _, tst := range testcases { + response := IssueRequest(rest, &tst.req) + ExpectStatusCode(t, tst.name, tst.response_code, response) + ExpectBody(t, tst.name, tst.response_body, response) + } + + // The trash collector should have received one good list with 3 + // requests on it. + var output_list = make([]TrashRequest, 3) + for i := 0; i < 3; i++ { + item := <-trashq.NextItem + if tr, ok := item.(TrashRequest); ok { + output_list[i] = tr + } else { + t.Errorf("item %v could not be parsed as a TrashRequest", item) + } + } +} + // ==================== // Helper functions // ==================== diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index fde60879a2..038e812c4f 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -59,10 +59,12 @@ func MakeRESTRouter() *mux.Router { `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD") rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD") - // The PullHandler processes "PUT /pull" commands from Data Manager. - // It parses the JSON list of pull requests in the request body, and - // delivers them to the pull list manager for replication. + // The PullHandler and TrashHandler process "PUT /pull" and "PUT + // /trash" requests from Data Manager. Each handler parses the + // JSON list of block management requests in the message body, and + // delivers them to the pull queue or trash queue, respectively. rest.HandleFunc(`/pull`, PullHandler).Methods("PUT") + rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT") // Any request which does not match any of these routes gets // 400 Bad Request. @@ -478,6 +480,48 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) { pullq.ReplaceQueue(plist) } +type TrashRequest struct { + Locator string `json:"locator"` + BlockMtime int64 `json:"block_mtime"` +} + +func TrashHandler(resp http.ResponseWriter, req *http.Request) { + // Reject unauthorized requests. + api_token := GetApiToken(req) + if !IsDataManagerToken(api_token) { + http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) + log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error()) + return + } + + // Parse the request body. + var trash []TrashRequest + r := json.NewDecoder(req.Body) + if err := r.Decode(&trash); err != nil { + http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode) + log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error()) + return + } + + // We have a properly formatted trash list sent from the data + // manager. Report success and send the list to the trash work + // queue for further handling. + log.Printf("%s %s: received %v\n", req.Method, req.URL, trash) + resp.WriteHeader(http.StatusOK) + resp.Write([]byte( + fmt.Sprintf("Received %d trash requests\n", len(trash)))) + + tlist := list.New() + for _, t := range trash { + tlist.PushBack(t) + } + + if trashq == nil { + trashq = NewWorkQueue() + } + trashq.ReplaceQueue(tlist) +} + // ============================== // GetBlock and PutBlock implement lower-level code for handling // blocks by rooting through volumes connected to the local machine. diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 2437638cff..d6eb6b2203 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -91,12 +91,11 @@ func (e *KeepError) Error() string { // Initialized by the --volumes flag (or by FindKeepVolumes). var KeepVM VolumeManager -// The pull list queue is a singleton pull list (a list of blocks -// that the current keepstore process should be pulling from remote -// keepstore servers in order to increase data replication) with -// atomic update methods that are safe to use from multiple -// goroutines. +// The pull list manager and trash queue are threadsafe queues which +// support atomic update operations. The PullHandler and TrashHandler +// store results from Data Manager /pull and /trash requests here. var pullq *WorkQueue +var trashq *WorkQueue // TODO(twp): continue moving as much code as possible out of main // so it can be effectively tested. Esp. handling and postprocessing -- 2.30.2