3761: additional tests
[arvados.git] / services / keepstore / pull_worker.go
1 package main
2
3 import (
4         "crypto/rand"
5         "errors"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "log"
11         "os"
12         "strconv"
13         "time"
14 )
15
16 var arv arvadosclient.ArvadosClient
17 var keepClient keepclient.KeepClient
18
19 /*
20         Keepstore initiates pull worker channel goroutine.
21         The channel will process pull list.
22                 For each (next) pull request:
23                         For each locator listed, execute Pull on the server(s) listed
24                         Skip the rest of the servers if no errors
25                 Repeat
26 */
27 func RunPullWorker(nextItem <-chan interface{}) {
28         var err error
29         arv, err = arvadosclient.MakeArvadosClient()
30         if err != nil {
31                 log.Fatalf("Error setting up arvados client %s", err.Error())
32         }
33         arv.ApiToken = os.Getenv("ARVADOS_API_TOKEN")
34
35         keepClient, err = keepclient.MakeKeepClient(&arv)
36         if err != nil {
37                 log.Fatalf("Error setting up keep client %s", err.Error())
38         }
39
40         for item := range nextItem {
41                 pullReq := item.(PullRequest)
42                 for _, addr := range pullReq.Servers {
43                         err := Pull(addr, pullReq.Locator)
44                         if err == nil {
45                                 break
46                         }
47                 }
48         }
49 }
50
51 /*
52         For each Pull request:
53                 Generate a random API token.
54                 Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
55                 Using this token & signature, retrieve the given block.
56                 Write to storage
57 */
58 func Pull(addr string, locator string) (err error) {
59         log.Printf("Pull %s/%s starting", addr, locator)
60
61         defer func() {
62                 if err == nil {
63                         log.Printf("Pull %s/%s success", addr, locator)
64                 } else {
65                         log.Printf("Pull %s/%s error: %s", addr, locator, err)
66                 }
67         }()
68
69         service_roots := make(map[string]string)
70         service_roots[locator] = addr
71         keepClient.SetServiceRoots(service_roots)
72
73         read_content, err := GetContent(addr, locator)
74         log.Print(read_content, err)
75         if err != nil {
76                 return
77         }
78
79         err = PutContent(read_content, locator)
80         return
81 }
82
83 // Fetch the content for the given locator using keepclient.
84 var GetContent = func(addr string, locator string) ([]byte, error) {
85         // Generate signature with a random token
86         expires_at := time.Now().Unix() + 60 // now + 1 min in seconds
87         hints := "+A" + GenerateRandomApiToken() + "@" + strconv.FormatInt(expires_at, 16)
88         signature := keepclient.MakeLocator2(locator, hints)
89
90         reader, blocklen, _, err := keepClient.AuthorizedGet(locator, signature.Signature, signature.Timestamp)
91         defer reader.Close()
92         if err != nil {
93                 return nil, err
94         }
95
96         read_content, err := ioutil.ReadAll(reader)
97         log.Print(read_content, err)
98         if err != nil {
99                 return nil, err
100         }
101
102         if (read_content == nil) || (int64(len(read_content)) != blocklen) {
103                 return nil, errors.New(fmt.Sprintf("Content not found for: %s/%s", addr, locator))
104         }
105
106         return read_content, nil
107 }
108
109 const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
110
111 func GenerateRandomApiToken() string {
112         var bytes = make([]byte, 36)
113         rand.Read(bytes)
114         for i, b := range bytes {
115                 bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
116         }
117         return (string(bytes))
118 }
119
120 // Put block
121 var PutContent = func(content []byte, locator string) (err error) {
122         err = PutBlock(content, locator)
123         return
124 }