Merge branch 'master' into 3761-pull-list-worker
[arvados.git] / services / keepstore / pull_worker.go
1 package main
2
3 import (
4         "crypto/rand"
5         "errors"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "log"
11         "os"
12         "time"
13 )
14
15 var arv arvadosclient.ArvadosClient
16 var keepClient keepclient.KeepClient
17
18 /*
19         Keepstore initiates pull worker channel goroutine.
20         The channel will process pull list.
21                 For each (next) pull request:
22                         For each locator listed, execute Pull on the server(s) listed
23                         Skip the rest of the servers if no errors
24                 Repeat
25 */
26 func RunPullWorker(nextItem <-chan interface{}) {
27         var err error
28         arv, err = arvadosclient.MakeArvadosClient()
29         if err != nil {
30                 log.Fatalf("Error setting up arvados client %s", err.Error())
31         }
32         arv.ApiToken = os.Getenv("ARVADOS_API_TOKEN")
33
34         keepClient, err = keepclient.MakeKeepClient(&arv)
35         if err != nil {
36                 log.Fatalf("Error setting up keep client %s", err.Error())
37         }
38
39         for item := range nextItem {
40                 pullReq := item.(PullRequest)
41                 for _, addr := range pullReq.Servers {
42                         err := Pull(addr, pullReq.Locator)
43                         if err == nil {
44                                 break
45                         }
46                 }
47         }
48 }
49
50 /*
51         For each Pull request:
52                 Generate a random API token.
53                 Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
54                 Using this token & signature, retrieve the given block.
55                 Write to storage
56 */
57 func Pull(addr string, locator string) (err error) {
58         log.Printf("Pull %s/%s starting", addr, locator)
59
60         defer func() {
61                 if err == nil {
62                         log.Printf("Pull %s/%s success", addr, locator)
63                 } else {
64                         log.Printf("Pull %s/%s error: %s", addr, locator, err)
65                 }
66         }()
67
68         service_roots := make(map[string]string)
69         service_roots[locator] = addr
70         keepClient.SetServiceRoots(service_roots)
71
72         read_content, err := GetContent(addr, locator)
73         log.Print(read_content, err)
74         if err != nil {
75                 return
76         }
77
78         err = PutContent(read_content, locator)
79         return
80 }
81
82 // Fetch the content for the given locator using keepclient.
83 var GetContent = func(addr string, locator string) ([]byte, error) {
84         // Generate signature with a random token
85         PermissionSecret = []byte(os.Getenv("ARVADOS_API_TOKEN"))
86         expires_at := time.Now().Add(60 * time.Second)
87         signedLocator := SignLocator(locator, GenerateRandomApiToken(), expires_at)
88         reader, blocklen, _, err := keepClient.Get(signedLocator)
89         defer reader.Close()
90         if err != nil {
91                 return nil, err
92         }
93
94         read_content, err := ioutil.ReadAll(reader)
95         log.Print(read_content, err)
96         if err != nil {
97                 return nil, err
98         }
99
100         if (read_content == nil) || (int64(len(read_content)) != blocklen) {
101                 return nil, errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
102         }
103
104         return read_content, nil
105 }
106
107 const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
108
109 func GenerateRandomApiToken() string {
110         var bytes = make([]byte, 36)
111         rand.Read(bytes)
112         for i, b := range bytes {
113                 bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
114         }
115         return (string(bytes))
116 }
117
118 // Put block
119 var PutContent = func(content []byte, locator string) (err error) {
120         err = PutBlock(content, locator)
121         return
122 }