From b185fc94a543b5b1361497c8502e876d6fdc2838 Mon Sep 17 00:00:00 2001 From: Tim Pierce Date: Fri, 22 Aug 2014 11:12:53 -0400 Subject: [PATCH] 3414: adding PullHandler and Replicator Added PullHandler and a "replicator" package to handle "PUT /pull" requests. PUT /pull requests are routed to PullHandler, which authenticates the request and validates the JSON in the request body. Valid requests are sent to the replicator. The Keepstore replicator runs a goroutine which repeatedly listens on its input channel for a new pull list. TestPullHandler tests each combination of: request from superuser; request from ordinary user; properly formatted pull request; improperly formatted pull request. It checks the state of the replicator when done to make sure that it has the expected number of pull requests. 3414: add replicator. --- services/keepstore/handler_test.go | 105 +++++++++++++++++++++++ services/keepstore/handlers.go | 79 ++++++++++++++++- services/keepstore/keepstore.go | 23 ++++- services/keepstore/replicator/replica.go | 79 +++++++++++++++++ 4 files changed, 278 insertions(+), 8 deletions(-) create mode 100644 services/keepstore/replicator/replica.go diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 200e1b1793..2b1f7cd2e0 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -566,6 +566,111 @@ func TestDeleteHandler(t *testing.T) { } } +// TestPullHandler +// +// Test handling of the PUT /pull statement. +// +// Cases tested: syntactically valid and invalid pull lists, from the +// data manager and from unprivileged users: +// +// 1. Valid pull list from an ordinary user +// (expected result: 401 Unauthorized) +// +// 2. Invalid pull request from an ordinary user +// (expected result: 401 Unauthorized) +// +// 3. Valid pull request from the data manager +// (expected result: 200 OK with request body "Received 3 pull +// requests" +// +// 4. Invalid pull request from the data manager +// (expected result: 400 Bad Request) +// +// Test that in the end, the replicator received a good pull list with +// the expected number of requests. +// +// TODO(twp): test concurrency: launch 100 goroutines to update the +// pull list simultaneously. Make sure that none of them return 400 +// Bad Request and that replica.Dump() returns a valid list. +// +func TestPullHandler(t *testing.T) { + defer teardown() + + // Set up a REST router for testing the handlers. + rest := MakeRESTRouter() + + var user_token = "USER TOKEN" + data_manager_token = "DATA MANAGER TOKEN" + + good_json := []byte(`[ + { + "locator":"locator_with_two_servers", + "servers":[ + "server1", + "server2" + ] + }, + { + "locator":"locator_with_no_servers", + "servers":[] + }, + { + "locator":"", + "servers":["empty_locator"] + } + ]`) + + bad_json := []byte(`{ "key":"I'm a little teapot" }`) + + type pullTest struct { + name string + req RequestTester + response_code int + response_body string + } + var testcases = []pullTest{ + { + "user token, good request", + RequestTester{"/pull", user_token, "PUT", good_json}, + http.StatusUnauthorized, + "Unauthorized\n", + }, + { + "user token, bad request", + RequestTester{"/pull", user_token, "PUT", bad_json}, + http.StatusUnauthorized, + "Unauthorized\n", + }, + { + "data manager token, good request", + RequestTester{"/pull", data_manager_token, "PUT", good_json}, + http.StatusOK, + "Received 3 pull requests\n", + }, + { + "data manager token, bad request", + RequestTester{"/pull", data_manager_token, "PUT", bad_json}, + http.StatusBadRequest, + "Bad Request\n", + }, + } + + for _, tst := range testcases { + response := IssueRequest(rest, &tst.req) + ExpectStatusCode(t, tst.name, tst.response_code, response) + ExpectBody(t, tst.name, tst.response_body, response) + } + + // The Keep replicator should have received one good list with 3 + // requests on it. + var saved_pull_list = replica.Dump() + if len(saved_pull_list) != 3 { + t.Errorf( + "saved_pull_list: expected 3 elements, got %d\nsaved_pull_list = %v", + len(saved_pull_list), saved_pull_list) + } +} + // ==================== // Helper functions // ==================== diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 039b2ac9a3..9d4c617d88 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -13,6 +13,7 @@ import ( "crypto/md5" "encoding/json" "fmt" + "git.curoverse.com/arvados.git/services/keepstore/replicator" "github.com/gorilla/mux" "io" "log" @@ -58,6 +59,11 @@ func MakeRESTRouter() *mux.Router { `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD") rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD") + // The PullHandler processes "PUT /pull" commands from Data Manager. + // It parses the JSON list of pull requests in the request body, and + // delivers them to the PullBlocks goroutine for replication. + rest.HandleFunc(`/pull`, PullHandler).Methods("PUT") + // Any request which does not match any of these routes gets // 400 Bad Request. rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler) @@ -397,6 +403,68 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) { } } +/* PullHandler processes "PUT /pull" requests for the data manager. + The request body is a JSON message containing a list of pull + requests in the following format: + + [ + { + "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985", + "servers":[ + "keep0.qr1hi.arvadosapi.com:25107", + "keep1.qr1hi.arvadosapi.com:25108" + ] + }, + { + "locator":"55ae4d45d2db0793d53f03e805f656e5+658395", + "servers":[ + "10.0.1.5:25107", + "10.0.1.6:25107", + "10.0.1.7:25108" + ] + }, + ... + ] + + Each pull request in the list consists of a block locator string + and an ordered list of servers. Keepstore should try to fetch the + block from each server in turn. + + If the request has not been sent by the Data Manager, return 401 + Unauthorized. + + If the JSON unmarshalling fails, return 400 Bad Request. +*/ + +func PullHandler(resp http.ResponseWriter, req *http.Request) { + // Reject unauthorized requests. + api_token := GetApiToken(req) + if !IsDataManagerToken(api_token) { + http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) + return + } + + // Parse the request body. + var pull_list []replicator.PullRequest + r := json.NewDecoder(req.Body) + if err := r.Decode(&pull_list); err != nil { + http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode) + return + } + + // We have a properly formatted pull list sent from the data + // manager. Report success and send the list to the keep + // replicator for further handling. + resp.WriteHeader(http.StatusOK) + resp.Write([]byte( + fmt.Sprintf("Received %d pull requests\n", len(pull_list)))) + + if replica == nil { + replica = replicator.New() + } + replica.Pull(pull_list) +} + // ============================== // GetBlock and PutBlock implement lower-level code for handling // blocks by rooting through volumes connected to the local machine. @@ -606,10 +674,7 @@ func CanDelete(api_token string) bool { } // Blocks may be deleted only when Keep has been configured with a // data manager. - if data_manager_token == "" { - return false - } - if api_token == data_manager_token { + if IsDataManagerToken(api_token) { return true } // TODO(twp): look up api_token with the API server @@ -617,3 +682,9 @@ func CanDelete(api_token string) bool { // has unlimited scope return false } + +// IsDataManagerToken returns true if api_token represents the data +// manager's token. +func IsDataManagerToken(api_token string) bool { + return data_manager_token != "" && api_token == data_manager_token +} diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 9556185fc0..f1e7aa9d14 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -4,6 +4,7 @@ import ( "bytes" "flag" "fmt" + "git.curoverse.com/arvados.git/services/keepstore/replicator" "io/ioutil" "log" "net" @@ -34,10 +35,6 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024 var PROC_MOUNTS = "/proc/mounts" -// The Keep VolumeManager maintains a list of available volumes. -// Initialized by the --volumes flag (or by FindKeepVolumes). -var KeepVM VolumeManager - // enforce_permissions controls whether permission signatures // should be enforced (affecting GET and DELETE requests). // Initialized by the --enforce-permissions flag. @@ -67,6 +64,7 @@ type KeepError struct { var ( BadRequestError = &KeepError{400, "Bad Request"} + UnauthorizedError = &KeepError{401, "Unauthorized"} CollisionError = &KeepError{500, "Collision"} RequestHashError = &KeepError{422, "Hash mismatch in request"} PermissionError = &KeepError{403, "Forbidden"} @@ -83,6 +81,23 @@ func (e *KeepError) Error() string { return e.ErrMsg } +// ======================== +// Internal data structures +// +// These global variables are used by multiple parts of the +// program. They are good candidates for moving into their own +// packages. + +// The Keep VolumeManager maintains a list of available volumes. +// Initialized by the --volumes flag (or by FindKeepVolumes). +var KeepVM VolumeManager + +// The KeepReplica is responsible for pulling blocks from other +// Keep servers to ensure replication. When Keep receives a new +// "pull list" from Data Manager, KeepReplica is responsible for +// fetching blocks on the list. +var replica *replicator.Replicator + // TODO(twp): continue moving as much code as possible out of main // so it can be effectively tested. Esp. handling and postprocessing // of command line flags (identifying Keep volumes and initializing diff --git a/services/keepstore/replicator/replica.go b/services/keepstore/replicator/replica.go new file mode 100644 index 0000000000..b9995c1e22 --- /dev/null +++ b/services/keepstore/replicator/replica.go @@ -0,0 +1,79 @@ +package replicator + +/* The Keep replicator package fulfills replication pull requests sent + by Data Manager. + + The interface is: + + replicator.New() launches a replication goroutine and returns the + new Replicator object. + + replicator.Pull() assigns a new pull list to the goroutine. + + replicator.Dump() reports the goroutine's current pull list. + + replicator.Close() shuts down the replicator. +*/ + +type PullRequest struct { + Locator string + Servers []string +} + +type Replicator struct { + queue chan []PullRequest + dump chan []PullRequest +} + +// New returns a new Replicator object. It launches a goroutine that +// waits for pull requests. +// +func New() *Replicator { + r := Replicator{ + make(chan []PullRequest), + make(chan []PullRequest), + } + go r.listen() + return &r +} + +// Pull sends a new list of pull requests to the replicator goroutine. +// The replicator will discard any outstanding pull requests and begin +// working on the new list. +// +func (r *Replicator) Pull(pr []PullRequest) { + r.queue <- pr +} + +// Dump reports the contents of the current pull list. +func (r *Replicator) Dump() []PullRequest { + return <-r.dump +} + +// Close shuts down the replicator and terminates the goroutine, which +// completes any pull request in progress and abandons any pending +// requests. +// +func (r *Replicator) Close() { + close(r.queue) +} + +// listen is run in a goroutine. It reads new pull lists from its +// input queue until the queue is closed. +func (r *Replicator) listen() { + var current []PullRequest + for { + select { + case p, ok := <-r.queue: + if ok { + current = p + } else { + // The input channel is closed; time to shut down + close(r.dump) + return + } + case r.dump <- current: + // no-op + } + } +} -- 2.30.2