2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / pull_worker_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "errors"
13         "fmt"
14         "io"
15         "net/http"
16         "net/http/httptest"
17         "sort"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/arvadostest"
22         "github.com/sirupsen/logrus"
23         . "gopkg.in/check.v1"
24 )
25
26 func (s *routerSuite) TestPullList_Execute(c *C) {
27         remotecluster := testCluster(c)
28         remotecluster.Volumes = map[string]arvados.Volume{
29                 "zzzzz-nyw5e-rrrrrrrrrrrrrrr": {Replication: 1, Driver: "stub"},
30         }
31         remoterouter, cancel := testRouter(c, remotecluster, nil)
32         defer cancel()
33         remoteserver := httptest.NewServer(remoterouter)
34         defer remoteserver.Close()
35
36         router, cancel := testRouter(c, s.cluster, nil)
37         defer cancel()
38
39         executePullList := func(pullList []PullListItem) string {
40                 var logbuf bytes.Buffer
41                 logger := logrus.New()
42                 logger.Out = &logbuf
43                 router.keepstore.logger = logger
44
45                 listjson, err := json.Marshal(pullList)
46                 c.Assert(err, IsNil)
47                 resp := call(router, "PUT", "http://example/pull", s.cluster.SystemRootToken, listjson, nil)
48                 c.Check(resp.Code, Equals, http.StatusOK)
49                 for {
50                         router.puller.cond.L.Lock()
51                         todolen := len(router.puller.todo)
52                         router.puller.cond.L.Unlock()
53                         if todolen == 0 && router.puller.inprogress.Load() == 0 {
54                                 break
55                         }
56                         time.Sleep(time.Millisecond)
57                 }
58                 return logbuf.String()
59         }
60
61         newRemoteBlock := func(datastring string) string {
62                 data := []byte(datastring)
63                 hash := fmt.Sprintf("%x", md5.Sum(data))
64                 locator := fmt.Sprintf("%s+%d", hash, len(data))
65                 _, err := remoterouter.keepstore.BlockWrite(context.Background(), arvados.BlockWriteOptions{
66                         Hash: hash,
67                         Data: data,
68                 })
69                 c.Assert(err, IsNil)
70                 return locator
71         }
72
73         mounts := append([]*mount(nil), router.keepstore.mountsR...)
74         sort.Slice(mounts, func(i, j int) bool { return mounts[i].UUID < mounts[j].UUID })
75         var vols []*stubVolume
76         for _, mount := range mounts {
77                 vols = append(vols, mount.volume.(*stubVolume))
78         }
79
80         ctx := authContext(arvadostest.ActiveTokenV2)
81
82         locator := newRemoteBlock("pull available block to unspecified volume")
83         executePullList([]PullListItem{{
84                 Locator: locator,
85                 Servers: []string{remoteserver.URL}}})
86         _, err := router.keepstore.BlockRead(ctx, arvados.BlockReadOptions{
87                 Locator: router.keepstore.signLocator(arvadostest.ActiveTokenV2, locator),
88                 WriteTo: io.Discard})
89         c.Check(err, IsNil)
90
91         locator0 := newRemoteBlock("pull available block to specified volume 0")
92         locator1 := newRemoteBlock("pull available block to specified volume 1")
93         executePullList([]PullListItem{
94                 {
95                         Locator:   locator0,
96                         Servers:   []string{remoteserver.URL},
97                         MountUUID: vols[0].params.UUID},
98                 {
99                         Locator:   locator1,
100                         Servers:   []string{remoteserver.URL},
101                         MountUUID: vols[1].params.UUID}})
102         c.Check(vols[0].data[locator0[:32]].data, NotNil)
103         c.Check(vols[1].data[locator1[:32]].data, NotNil)
104
105         locator = fooHash + "+3"
106         logs := executePullList([]PullListItem{{
107                 Locator: locator,
108                 Servers: []string{remoteserver.URL}}})
109         c.Check(logs, Matches, ".*error pulling data from remote servers.*Block not found.*locator=acbd.*\n")
110
111         locator = fooHash + "+3"
112         logs = executePullList([]PullListItem{{
113                 Locator: locator,
114                 Servers: []string{"http://0.0.0.0:9/"}}})
115         c.Check(logs, Matches, ".*error pulling data from remote servers.*connection refused.*locator=acbd.*\n")
116
117         locator = newRemoteBlock("log error writing to local volume")
118         vols[0].blockWrite = func(context.Context, string, []byte) error { return errors.New("test error") }
119         vols[1].blockWrite = vols[0].blockWrite
120         logs = executePullList([]PullListItem{{
121                 Locator: locator,
122                 Servers: []string{remoteserver.URL}}})
123         c.Check(logs, Matches, ".*error writing data to zzzzz-nyw5e-.*error=\"test error\".*locator=.*\n")
124         vols[0].blockWrite = nil
125         vols[1].blockWrite = nil
126
127         locator = newRemoteBlock("log error when destination mount does not exist")
128         logs = executePullList([]PullListItem{{
129                 Locator:   locator,
130                 Servers:   []string{remoteserver.URL},
131                 MountUUID: "bogus-mount-uuid"}})
132         c.Check(logs, Matches, ".*ignoring pull list entry for nonexistent mount bogus-mount-uuid.*locator=.*\n")
133
134         logs = executePullList([]PullListItem{})
135         c.Logf("%s", logs)
136 }