From cb9fbffa7f480dae5f17eb44f27d0b3523da0f0a Mon Sep 17 00:00:00 2001 From: Radhika Chippada Date: Fri, 27 Feb 2015 11:12:29 -0500 Subject: [PATCH] 3761: Run pull list worker, which processes pull reqests from the list. --- services/keepstore/keepstore.go | 4 + services/keepstore/pull_worker.go | 118 +++++++++++++++++++++++++ services/keepstore/pull_worker_test.go | 99 +++++++++++++++++++++ 3 files changed, 221 insertions(+) create mode 100644 services/keepstore/pull_worker.go create mode 100644 services/keepstore/pull_worker_test.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 75b6c4014d..b6ab8faca1 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -274,6 +274,10 @@ func main() { log.Fatal(err) } + // Initialize Pull queue and worker + pullq = NewWorkQueue() + go RunPullWorker(pullq.NextItem) + // Shut down the server gracefully (by closing the listener) // if SIGTERM is received. term := make(chan os.Signal, 1) diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go new file mode 100644 index 0000000000..2342fd2256 --- /dev/null +++ b/services/keepstore/pull_worker.go @@ -0,0 +1,118 @@ +package main + +import ( + "crypto/rand" + "errors" + "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "io/ioutil" + "log" + "os" + "strconv" + "time" +) + +var arv arvadosclient.ArvadosClient +var keepClient keepclient.KeepClient + +/* + Keepstore initiates pull worker channel goroutine. + The channel will process pull list. + For each (next) pull request: + For each locator listed, execute Pull on the server(s) listed + Skip the rest of the servers if no errors + Repeat +*/ +func RunPullWorker(nextItem <-chan interface{}) { + var err error + arv, err = arvadosclient.MakeArvadosClient() + if err != nil { + log.Fatalf("Error setting up arvados client %s", err.Error()) + } + arv.ApiToken = os.Getenv("ARVADOS_API_TOKEN") + + keepClient, err = keepclient.MakeKeepClient(&arv) + if err != nil { + log.Fatalf("Error setting up keep client %s", err.Error()) + } + + for item := range nextItem { + pullReq := item.(PullRequest) + for _, addr := range pullReq.Servers { + err := Pull(addr, pullReq.Locator) + if err == nil { + break + } + } + } +} + +/* + For each Pull request: + Generate a random API token. + Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash. + Using this token & signature, retrieve the given block. + Write to storage +*/ +func Pull(addr string, locator string) (err error) { + log.Printf("Pull %s/%s starting", addr, locator) + + defer func() { + if err == nil { + log.Printf("Pull %s/%s success", addr, locator) + } else { + log.Printf("Pull %s/%s error: %s", addr, locator, err) + } + }() + + service_roots := make(map[string]string) + service_roots[locator] = addr + keepClient.SetServiceRoots(service_roots) + + read_content, err := GetContent(addr, locator) + log.Print(read_content, err) + if err != nil { + return + } + + err = PutBlock(read_content, locator) + return +} + +// Fetch the content for the given locator using keepclient. +var GetContent = func(addr string, locator string) ([]byte, error) { + // Generate signature with a random token + expires_at := time.Now().Unix() + 60 // now + 1 min in seconds + hints := "+A" + GenerateRandomApiToken() + "@" + strconv.FormatInt(expires_at, 16) + signature := keepclient.MakeLocator2(locator, hints) + + reader, blocklen, _, err := keepClient.AuthorizedGet(locator, signature.Signature, signature.Timestamp) + defer reader.Close() + if err != nil { + return nil, err + } + + read_content, err := ioutil.ReadAll(reader) + log.Print(read_content, err) + if err != nil { + return nil, err + } + + if (read_content == nil) || (int64(len(read_content)) != blocklen) { + return nil, errors.New(fmt.Sprintf("Content not found for: %s/%s", addr, locator)) + } + + return read_content, nil +} + +const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz" + +func GenerateRandomApiToken() string { + var bytes = make([]byte, 36) + rand.Read(bytes) + for i, b := range bytes { + bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))] + } + return (string(bytes)) +} diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go new file mode 100644 index 0000000000..907bfac49b --- /dev/null +++ b/services/keepstore/pull_worker_test.go @@ -0,0 +1,99 @@ +package main + +import ( + "errors" + "net/http" + "testing" + "time" +) + +func TestPullWorker(t *testing.T) { + defer teardown() + + // Since keepstore does not come into picture in tests, + // we need to explicitly start the goroutine in tests. + go RunPullWorker(pullq.NextItem) + + data_manager_token = "DATA MANAGER TOKEN" + + first_pull_list := []byte(`[ + { + "locator":"locator1_to_verify_first_pull_list", + "servers":[ + "server_1", + "server_2" + ] + }, + { + "locator":"locator2_to_verify_first_pull_list", + "servers":[ + "server_1" + ] + } + ]`) + + second_pull_list := []byte(`[ + { + "locator":"locator_to_verify_second_pull_list", + "servers":[ + "server_1", + "server_2" + ] + } + ]`) + + type PullWorkerTestData struct { + name string + req RequestTester + response_code int + response_body string + read_content string + read_error bool + } + var testcases = []PullWorkerTestData{ + { + "Pull request 1 from the data manager in worker", + RequestTester{"/pull", data_manager_token, "PUT", first_pull_list}, + http.StatusOK, + "Received 2 pull requests\n", + "hello", + false, + }, + { + "Pull request 2 from the data manager in worker", + RequestTester{"/pull", data_manager_token, "PUT", second_pull_list}, + http.StatusOK, + "Received 1 pull requests\n", + "hola", + false, + }, + { + "Pull request with error on get", + RequestTester{"/pull", data_manager_token, "PUT", second_pull_list}, + http.StatusOK, + "Received 1 pull requests\n", + "unused", + true, + }, + } + + for _, testData := range testcases { + // Override GetContent to mock keepclient functionality + GetContent = func(addr string, locator string) ([]byte, error) { + if testData.read_error { + return nil, errors.New("Error getting data") + } else { + return []byte(testData.read_content), nil + } + } + + response := IssueRequest(&testData.req) + ExpectStatusCode(t, testData.name, testData.response_code, response) + ExpectBody(t, testData.name, testData.response_body, response) + + // give the channel a second to read and process all pull list entries + time.Sleep(1000 * time.Millisecond) + + expectChannelEmpty(t, pullq.NextItem) + } +} -- 2.30.2