7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
16 var arv arvadosclient.ArvadosClient
17 var keepClient keepclient.KeepClient
20 Keepstore initiates pull worker channel goroutine.
21 The channel will process pull list.
22 For each (next) pull request:
23 For each locator listed, execute Pull on the server(s) listed
24 Skip the rest of the servers if no errors
27 func RunPullWorker(nextItem <-chan interface{}) {
29 arv, err = arvadosclient.MakeArvadosClient()
31 log.Fatalf("Error setting up arvados client %s", err.Error())
33 arv.ApiToken = os.Getenv("ARVADOS_API_TOKEN")
35 keepClient, err = keepclient.MakeKeepClient(&arv)
37 log.Fatalf("Error setting up keep client %s", err.Error())
40 for item := range nextItem {
41 pullReq := item.(PullRequest)
42 for _, addr := range pullReq.Servers {
43 err := Pull(addr, pullReq.Locator)
52 For each Pull request:
53 Generate a random API token.
54 Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
55 Using this token & signature, retrieve the given block.
58 func Pull(addr string, locator string) (err error) {
59 log.Printf("Pull %s/%s starting", addr, locator)
63 log.Printf("Pull %s/%s success", addr, locator)
65 log.Printf("Pull %s/%s error: %s", addr, locator, err)
69 service_roots := make(map[string]string)
70 service_roots[locator] = addr
71 keepClient.SetServiceRoots(service_roots)
73 read_content, err := GetContent(addr, locator)
74 log.Print(read_content, err)
79 err = PutContent(read_content, locator)
83 // Fetch the content for the given locator using keepclient.
84 var GetContent = func(addr string, locator string) ([]byte, error) {
85 // Generate signature with a random token
86 expires_at := time.Now().Unix() + 60 // now + 1 min in seconds
87 hints := "+A" + GenerateRandomApiToken() + "@" + strconv.FormatInt(expires_at, 16)
88 signature := keepclient.MakeLocator2(locator, hints)
90 reader, blocklen, _, err := keepClient.AuthorizedGet(locator, signature.Signature, signature.Timestamp)
96 read_content, err := ioutil.ReadAll(reader)
97 log.Print(read_content, err)
102 if (read_content == nil) || (int64(len(read_content)) != blocklen) {
103 return nil, errors.New(fmt.Sprintf("Content not found for: %s/%s", addr, locator))
106 return read_content, nil
109 const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
111 func GenerateRandomApiToken() string {
112 var bytes = make([]byte, 36)
114 for i, b := range bytes {
115 bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
117 return (string(bytes))
121 var PutContent = func(content []byte, locator string) (err error) {
122 err = PutBlock(content, locator)