1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "git.arvados.org/arvados.git/sdk/go/arvadostest"
22 "github.com/sirupsen/logrus"
26 func (s *routerSuite) TestPullList_Execute(c *C) {
27 remotecluster := testCluster(c)
28 remotecluster.Volumes = map[string]arvados.Volume{
29 "zzzzz-nyw5e-rrrrrrrrrrrrrrr": {Replication: 1, Driver: "stub"},
31 remoterouter, cancel := testRouter(c, remotecluster, nil)
33 remoteserver := httptest.NewServer(remoterouter)
34 defer remoteserver.Close()
36 router, cancel := testRouter(c, s.cluster, nil)
39 executePullList := func(pullList []PullListItem) string {
40 var logbuf bytes.Buffer
41 logger := logrus.New()
43 router.keepstore.logger = logger
45 listjson, err := json.Marshal(pullList)
47 resp := call(router, "PUT", "http://example/pull", s.cluster.SystemRootToken, listjson, nil)
48 c.Check(resp.Code, Equals, http.StatusOK)
50 router.puller.cond.L.Lock()
51 todolen := len(router.puller.todo)
52 router.puller.cond.L.Unlock()
53 if todolen == 0 && router.puller.inprogress.Load() == 0 {
56 time.Sleep(time.Millisecond)
58 return logbuf.String()
61 newRemoteBlock := func(datastring string) string {
62 data := []byte(datastring)
63 hash := fmt.Sprintf("%x", md5.Sum(data))
64 locator := fmt.Sprintf("%s+%d", hash, len(data))
65 _, err := remoterouter.keepstore.BlockWrite(context.Background(), arvados.BlockWriteOptions{
73 mounts := append([]*mount(nil), router.keepstore.mountsR...)
74 sort.Slice(mounts, func(i, j int) bool { return mounts[i].UUID < mounts[j].UUID })
75 var vols []*stubVolume
76 for _, mount := range mounts {
77 vols = append(vols, mount.volume.(*stubVolume))
80 ctx := authContext(arvadostest.ActiveTokenV2)
82 locator := newRemoteBlock("pull available block to unspecified volume")
83 executePullList([]PullListItem{{
85 Servers: []string{remoteserver.URL}}})
86 _, err := router.keepstore.BlockRead(ctx, arvados.BlockReadOptions{
87 Locator: router.keepstore.signLocator(arvadostest.ActiveTokenV2, locator),
91 locator0 := newRemoteBlock("pull available block to specified volume 0")
92 locator1 := newRemoteBlock("pull available block to specified volume 1")
93 executePullList([]PullListItem{
96 Servers: []string{remoteserver.URL},
97 MountUUID: vols[0].params.UUID},
100 Servers: []string{remoteserver.URL},
101 MountUUID: vols[1].params.UUID}})
102 c.Check(vols[0].data[locator0[:32]].data, NotNil)
103 c.Check(vols[1].data[locator1[:32]].data, NotNil)
105 locator = fooHash + "+3"
106 logs := executePullList([]PullListItem{{
108 Servers: []string{remoteserver.URL}}})
109 c.Check(logs, Matches, ".*error pulling data from remote servers.*Block not found.*locator=acbd.*\n")
111 locator = fooHash + "+3"
112 logs = executePullList([]PullListItem{{
114 Servers: []string{"http://0.0.0.0:9/"}}})
115 c.Check(logs, Matches, ".*error pulling data from remote servers.*connection refused.*locator=acbd.*\n")
117 locator = newRemoteBlock("log error writing to local volume")
118 vols[0].blockWrite = func(context.Context, string, []byte) error { return errors.New("test error") }
119 vols[1].blockWrite = vols[0].blockWrite
120 logs = executePullList([]PullListItem{{
122 Servers: []string{remoteserver.URL}}})
123 c.Check(logs, Matches, ".*error writing data to zzzzz-nyw5e-.*error=\"test error\".*locator=.*\n")
124 vols[0].blockWrite = nil
125 vols[1].blockWrite = nil
127 locator = newRemoteBlock("log error when destination mount does not exist")
128 logs = executePullList([]PullListItem{{
130 Servers: []string{remoteserver.URL},
131 MountUUID: "bogus-mount-uuid"}})
132 c.Check(logs, Matches, ".*ignoring pull list entry for nonexistent mount bogus-mount-uuid.*locator=.*\n")
134 logs = executePullList([]PullListItem{})