20850: Fix test selector
[arvados.git] / services / keepstore / pull_worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "context"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/keepclient"
15 )
16
17 // RunPullWorker receives PullRequests from pullq, invokes
18 // PullItemAndProcess on each one. After each PR, it logs a message
19 // indicating whether the pull was successful.
20 func (h *handler) runPullWorker(pullq *WorkQueue) {
21         for item := range pullq.NextItem {
22                 pr := item.(PullRequest)
23                 err := h.pullItemAndProcess(pr)
24                 pullq.DoneItem <- struct{}{}
25                 if err == nil {
26                         h.Logger.Printf("Pull %s success", pr)
27                 } else {
28                         h.Logger.Printf("Pull %s error: %s", pr, err)
29                 }
30         }
31 }
32
33 // PullItemAndProcess executes a pull request by retrieving the
34 // specified block from one of the specified servers, and storing it
35 // on a local volume.
36 //
37 // If the PR specifies a non-blank mount UUID, PullItemAndProcess will
38 // only attempt to write the data to the corresponding
39 // volume. Otherwise it writes to any local volume, as a PUT request
40 // would.
41 func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
42         var vol *VolumeMount
43         if uuid := pullRequest.MountUUID; uuid != "" {
44                 vol = h.volmgr.Lookup(pullRequest.MountUUID, true)
45                 if vol == nil {
46                         return fmt.Errorf("pull req has nonexistent mount: %v", pullRequest)
47                 }
48         }
49
50         // Make a private copy of keepClient so we can set
51         // ServiceRoots to the source servers specified in the pull
52         // request.
53         keepClient := *h.keepClient
54         serviceRoots := make(map[string]string)
55         for _, addr := range pullRequest.Servers {
56                 serviceRoots[addr] = addr
57         }
58         keepClient.SetServiceRoots(serviceRoots, nil, nil)
59
60         signedLocator := SignLocator(h.Cluster, pullRequest.Locator, keepClient.Arvados.ApiToken, time.Now().Add(time.Minute))
61
62         reader, contentLen, _, err := GetContent(signedLocator, &keepClient)
63         if err != nil {
64                 return err
65         }
66         if reader == nil {
67                 return fmt.Errorf("No reader found for : %s", signedLocator)
68         }
69         defer reader.Close()
70
71         readContent, err := ioutil.ReadAll(reader)
72         if err != nil {
73                 return err
74         }
75
76         if (readContent == nil) || (int64(len(readContent)) != contentLen) {
77                 return fmt.Errorf("Content not found for: %s", signedLocator)
78         }
79
80         return writePulledBlock(h.volmgr, vol, readContent, pullRequest.Locator)
81 }
82
83 // GetContent fetches the content for the given locator using keepclient.
84 var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (io.ReadCloser, int64, string, error) {
85         return keepClient.Get(signedLocator)
86 }
87
88 var writePulledBlock = func(volmgr *RRVolumeManager, volume Volume, data []byte, locator string) error {
89         if volume != nil {
90                 return volume.Put(context.Background(), locator, data)
91         }
92         _, err := PutBlock(context.Background(), volmgr, data, locator, nil)
93         return err
94 }