Merge branch 'master' into 3761-pull-list-worker
[arvados.git] / services / keepstore / pull_worker.go
1 package main
2
3 import (
4         "crypto/rand"
5         "errors"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io"
10         "io/ioutil"
11         "log"
12         "os"
13         "time"
14 )
15
16 var arv arvadosclient.ArvadosClient
17 var keepClient keepclient.KeepClient
18
19 /*
20         Keepstore initiates pull worker channel goroutine.
21         The channel will process pull list.
22                 For each (next) pull request:
23                         For each locator listed, execute Pull on the server(s) listed
24                         Skip the rest of the servers if no errors
25                 Repeat
26 */
27 func RunPullWorker(nextItem <-chan interface{}) {
28         var err error
29         arv, err = arvadosclient.MakeArvadosClient()
30         if err != nil {
31                 log.Fatalf("Error setting up arvados client %s", err.Error())
32         }
33
34         keepClient, err = keepclient.MakeKeepClient(&arv)
35         if err != nil {
36                 log.Fatalf("Error setting up keep client %s", err.Error())
37         }
38
39         for item := range nextItem {
40                 Pull(item.(PullRequest))
41         }
42 }
43
44 /*
45         For each Pull request:
46                 Generate a random API token.
47                 Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
48                 Using this token & signature, retrieve the given block.
49                 Write to storage
50 */
51 func Pull(pullRequest PullRequest) (err error) {
52         defer func() {
53                 if err == nil {
54                         log.Printf("Pull %s success", pullRequest)
55                 } else {
56                         log.Printf("Pull %s error: %s", pullRequest, err)
57                 }
58         }()
59
60         service_roots := make(map[string]string)
61         for _, addr := range pullRequest.Servers {
62                 service_roots[addr] = addr
63         }
64         keepClient.SetServiceRoots(service_roots)
65
66         // Generate signature with a random token
67         PermissionSecret = []byte(os.Getenv("ARVADOS_API_TOKEN"))
68         expires_at := time.Now().Add(60 * time.Second)
69         signedLocator := SignLocator(pullRequest.Locator, GenerateRandomApiToken(), expires_at)
70
71         reader, contentLen, _, err := GetContent(pullRequest.Locator, signedLocator)
72         if err != nil {
73                 return
74         }
75         if reader == nil {
76                 return errors.New(fmt.Sprintf("No reader found for : %s", signedLocator))
77         }
78         defer reader.Close()
79
80         read_content, err := ioutil.ReadAll(reader)
81         if err != nil {
82                 return err
83         }
84
85         if (read_content == nil) || (int64(len(read_content)) != contentLen) {
86                 return errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
87         }
88
89         err = PutContent(read_content, pullRequest.Locator)
90         return
91 }
92
93 // Fetch the content for the given locator using keepclient.
94 var GetContent = func(locator string, signedLocator string) (reader io.ReadCloser, contentLength int64, url string, err error) {
95         reader, blocklen, url, err := keepClient.Get(signedLocator)
96         return reader, blocklen, url, err
97 }
98
99 const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
100
101 func GenerateRandomApiToken() string {
102         var bytes = make([]byte, 36)
103         rand.Read(bytes)
104         for i, b := range bytes {
105                 bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
106         }
107         return (string(bytes))
108 }
109
110 // Put block
111 var PutContent = func(content []byte, locator string) (err error) {
112         err = PutBlock(content, locator)
113         return
114 }