7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
15 var arv arvadosclient.ArvadosClient
16 var keepClient keepclient.KeepClient
19 Keepstore initiates pull worker channel goroutine.
20 The channel will process pull list.
21 For each (next) pull request:
22 For each locator listed, execute Pull on the server(s) listed
23 Skip the rest of the servers if no errors
26 func RunPullWorker(nextItem <-chan interface{}) {
28 arv, err = arvadosclient.MakeArvadosClient()
30 log.Fatalf("Error setting up arvados client %s", err.Error())
32 arv.ApiToken = os.Getenv("ARVADOS_API_TOKEN")
34 keepClient, err = keepclient.MakeKeepClient(&arv)
36 log.Fatalf("Error setting up keep client %s", err.Error())
39 for item := range nextItem {
40 pullReq := item.(PullRequest)
41 for _, addr := range pullReq.Servers {
42 err := Pull(addr, pullReq.Locator)
51 For each Pull request:
52 Generate a random API token.
53 Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
54 Using this token & signature, retrieve the given block.
57 func Pull(addr string, locator string) (err error) {
58 log.Printf("Pull %s/%s starting", addr, locator)
62 log.Printf("Pull %s/%s success", addr, locator)
64 log.Printf("Pull %s/%s error: %s", addr, locator, err)
68 service_roots := make(map[string]string)
69 service_roots[locator] = addr
70 keepClient.SetServiceRoots(service_roots)
72 read_content, err := GetContent(addr, locator)
73 log.Print(read_content, err)
78 err = PutContent(read_content, locator)
82 // Fetch the content for the given locator using keepclient.
83 var GetContent = func(addr string, locator string) ([]byte, error) {
84 // Generate signature with a random token
85 PermissionSecret = []byte(os.Getenv("ARVADOS_API_TOKEN"))
86 expires_at := time.Now().Add(60 * time.Second)
87 signedLocator := SignLocator(locator, GenerateRandomApiToken(), expires_at)
88 reader, blocklen, _, err := keepClient.Get(signedLocator)
94 read_content, err := ioutil.ReadAll(reader)
95 log.Print(read_content, err)
100 if (read_content == nil) || (int64(len(read_content)) != blocklen) {
101 return nil, errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
104 return read_content, nil
107 const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
109 func GenerateRandomApiToken() string {
110 var bytes = make([]byte, 36)
112 for i, b := range bytes {
113 bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
115 return (string(bytes))
119 var PutContent = func(content []byte, locator string) (err error) {
120 err = PutBlock(content, locator)