3761: code refactoring
[arvados.git] / services / keepstore / pull_worker.go
1 package main
2
3 import (
4         "crypto/rand"
5         "errors"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io"
10         "io/ioutil"
11         "log"
12         "os"
13         "time"
14 )
15
16 var arv arvadosclient.ArvadosClient
17 var keepClient keepclient.KeepClient
18
19 /*
20         Keepstore initiates pull worker channel goroutine.
21         The channel will process pull list.
22                 For each (next) pull request:
23                         For each locator listed, execute Pull on the server(s) listed
24                         Skip the rest of the servers if no errors
25                 Repeat
26 */
27 func RunPullWorker(nextItem <-chan interface{}) {
28         var err error
29         arv, err = arvadosclient.MakeArvadosClient()
30         if err != nil {
31                 log.Fatalf("Error setting up arvados client %s", err.Error())
32         }
33
34         keepClient, err = keepclient.MakeKeepClient(&arv)
35         if err != nil {
36                 log.Fatalf("Error setting up keep client %s", err.Error())
37         }
38
39         for item := range nextItem {
40                 Pull(item.(PullRequest))
41         }
42 }
43
44 /*
45         For each Pull request:
46                 Generate a random API token.
47                 Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
48                 Using this token & signature, retrieve the given block.
49                 Write to storage
50 */
51 func Pull(pullRequest PullRequest) (err error) {
52         defer func() {
53                 if err == nil {
54                         log.Printf("Pull %s success", pullRequest)
55                 } else {
56                         log.Printf("Pull %s error: %s", pullRequest, err)
57                 }
58         }()
59
60         service_roots := make(map[string]string)
61         for _, addr := range pullRequest.Servers {
62                 service_roots[addr] = addr
63         }
64         keepClient.SetServiceRoots(service_roots)
65
66         // Generate signature with a random token
67         PermissionSecret = []byte(os.Getenv("ARVADOS_API_TOKEN"))
68         expires_at := time.Now().Add(60 * time.Second)
69         signedLocator := SignLocator(pullRequest.Locator, GenerateRandomApiToken(), expires_at)
70
71         reader, contentLen, _, err := GetContent(signedLocator)
72
73         if err != nil {
74                 return
75         }
76         if reader == nil {
77                 return errors.New(fmt.Sprintf("No reader found for : %s", signedLocator))
78         }
79         defer reader.Close()
80
81         read_content, err := ioutil.ReadAll(reader)
82         if err != nil {
83                 return err
84         }
85
86         if (read_content == nil) || (int64(len(read_content)) != contentLen) {
87                 return errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
88         }
89
90         err = PutContent(read_content, pullRequest.Locator)
91         return
92 }
93
94 // Fetch the content for the given locator using keepclient.
95 var GetContent = func(signedLocator string) (reader io.ReadCloser, contentLength int64, url string, err error) {
96         reader, blocklen, url, err := keepClient.Get(signedLocator)
97         return reader, blocklen, url, err
98 }
99
100 const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
101
102 func GenerateRandomApiToken() string {
103         var bytes = make([]byte, 36)
104         rand.Read(bytes)
105         for i, b := range bytes {
106                 bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
107         }
108         return (string(bytes))
109 }
110
111 // Put block
112 var PutContent = func(content []byte, locator string) (err error) {
113         err = PutBlock(content, locator)
114         return
115 }