17074: Add type to data explorer meta to satisfy SET_ITEMS
[arvados.git] / services / keepstore / unix_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "fmt"
12         "io/ioutil"
13         "os"
14         "sync"
15         "syscall"
16         "time"
17
18         "git.arvados.org/arvados.git/sdk/go/ctxlog"
19         "github.com/prometheus/client_golang/prometheus"
20         check "gopkg.in/check.v1"
21 )
22
23 type testableUnixVolume struct {
24         unixVolume
25         t TB
26 }
27
28 func (v *testableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
29         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{Actime: lastPut.Unix(), Modtime: lastPut.Unix()})
30         if err != nil {
31                 v.t.Fatal(err)
32         }
33 }
34
35 func (v *testableUnixVolume) Teardown() {
36         if err := os.RemoveAll(v.Root); err != nil {
37                 v.t.Error(err)
38         }
39 }
40
41 func (v *testableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
42         return "open", "create"
43 }
44
45 var _ = check.Suite(&unixVolumeSuite{})
46
47 type unixVolumeSuite struct {
48         params  newVolumeParams
49         volumes []*testableUnixVolume
50 }
51
52 func (s *unixVolumeSuite) SetUpTest(c *check.C) {
53         logger := ctxlog.TestLogger(c)
54         reg := prometheus.NewRegistry()
55         s.params = newVolumeParams{
56                 UUID:        "zzzzz-nyw5e-999999999999999",
57                 Cluster:     testCluster(c),
58                 Logger:      logger,
59                 MetricsVecs: newVolumeMetricsVecs(reg),
60                 BufferPool:  newBufferPool(logger, 8, reg),
61         }
62 }
63
64 func (s *unixVolumeSuite) TearDownTest(c *check.C) {
65         for _, v := range s.volumes {
66                 v.Teardown()
67         }
68 }
69
70 func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumeParams, serialize bool) *testableUnixVolume {
71         d, err := ioutil.TempDir("", "volume_test")
72         c.Check(err, check.IsNil)
73         var locker sync.Locker
74         if serialize {
75                 locker = &sync.Mutex{}
76         }
77         v := &testableUnixVolume{
78                 unixVolume: unixVolume{
79                         Root:       d,
80                         locker:     locker,
81                         uuid:       params.UUID,
82                         cluster:    params.Cluster,
83                         logger:     params.Logger,
84                         volume:     params.ConfigVolume,
85                         metrics:    params.MetricsVecs,
86                         bufferPool: params.BufferPool,
87                 },
88                 t: c,
89         }
90         c.Check(v.check(), check.IsNil)
91         s.volumes = append(s.volumes, v)
92         return v
93 }
94
95 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
96         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
97                 return s.newTestableUnixVolume(c, params, false)
98         })
99 }
100
101 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_ReadOnly(c *check.C) {
102         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
103                 return s.newTestableUnixVolume(c, params, false)
104         })
105 }
106
107 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Serialized(c *check.C) {
108         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
109                 return s.newTestableUnixVolume(c, params, true)
110         })
111 }
112
113 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Readonly_Serialized(c *check.C) {
114         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
115                 return s.newTestableUnixVolume(c, params, true)
116         })
117 }
118
119 func (s *unixVolumeSuite) TestGetNotFound(c *check.C) {
120         v := s.newTestableUnixVolume(c, s.params, true)
121         defer v.Teardown()
122         v.BlockWrite(context.Background(), TestHash, TestBlock)
123
124         buf := &brbuffer{}
125         err := v.BlockRead(context.Background(), TestHash2, buf)
126         c.Check(err, check.FitsTypeOf, os.ErrNotExist)
127 }
128
129 func (s *unixVolumeSuite) TestPut(c *check.C) {
130         v := s.newTestableUnixVolume(c, s.params, false)
131         defer v.Teardown()
132
133         err := v.BlockWrite(context.Background(), TestHash, TestBlock)
134         if err != nil {
135                 c.Error(err)
136         }
137         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
138         if buf, err := ioutil.ReadFile(p); err != nil {
139                 c.Error(err)
140         } else if bytes.Compare(buf, TestBlock) != 0 {
141                 c.Errorf("Write should have stored %s, did store %s",
142                         string(TestBlock), string(buf))
143         }
144 }
145
146 func (s *unixVolumeSuite) TestPutBadVolume(c *check.C) {
147         v := s.newTestableUnixVolume(c, s.params, false)
148         defer v.Teardown()
149
150         err := os.RemoveAll(v.Root)
151         c.Assert(err, check.IsNil)
152         err = v.BlockWrite(context.Background(), TestHash, TestBlock)
153         c.Check(err, check.IsNil)
154 }
155
156 func (s *unixVolumeSuite) TestIsFull(c *check.C) {
157         v := s.newTestableUnixVolume(c, s.params, false)
158         defer v.Teardown()
159
160         fullPath := v.Root + "/full"
161         now := fmt.Sprintf("%d", time.Now().Unix())
162         os.Symlink(now, fullPath)
163         if !v.isFull() {
164                 c.Error("volume claims not to be full")
165         }
166         os.Remove(fullPath)
167
168         // Test with an expired /full link.
169         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
170         os.Symlink(expired, fullPath)
171         if v.isFull() {
172                 c.Error("volume should no longer be full")
173         }
174 }
175
176 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) {
177         v := s.newTestableUnixVolume(c, s.params, true)
178         defer v.Teardown()
179         v.locker.Lock()
180         defer v.locker.Unlock()
181         ctx, cancel := context.WithCancel(context.Background())
182         go func() {
183                 time.Sleep(50 * time.Millisecond)
184                 cancel()
185         }()
186         err := v.BlockWrite(ctx, TestHash, TestBlock)
187         if err != context.Canceled {
188                 c.Errorf("BlockWrite() returned %s -- expected short read / canceled", err)
189         }
190 }
191
192 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) {
193         v := s.newTestableUnixVolume(c, s.params, true)
194         defer v.Teardown()
195         err := v.BlockWrite(context.Background(), TestHash, TestBlock)
196         if err != nil {
197                 c.Fatal(err)
198         }
199         ctx, cancel := context.WithCancel(context.Background())
200         v.locker.Lock()
201         defer v.locker.Unlock()
202         go func() {
203                 time.Sleep(50 * time.Millisecond)
204                 cancel()
205         }()
206         buf := &brbuffer{}
207         err = v.BlockRead(ctx, TestHash, buf)
208         if buf.Len() != 0 || err != context.Canceled {
209                 c.Errorf("BlockRead() returned %q, %s -- expected short read / canceled", buf.String(), err)
210         }
211 }
212
213 func (s *unixVolumeSuite) TestStats(c *check.C) {
214         vol := s.newTestableUnixVolume(c, s.params, false)
215         stats := func() string {
216                 buf, err := json.Marshal(vol.InternalStats())
217                 c.Check(err, check.IsNil)
218                 return string(buf)
219         }
220
221         c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
222         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
223
224         err := vol.BlockRead(context.Background(), fooHash, brdiscard)
225         c.Check(err, check.NotNil)
226         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
227         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
228         c.Check(stats(), check.Matches, `.*"\*(fs|os)\.PathError":[^0].*`) // os.PathError changed to fs.PathError in Go 1.16
229         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
230         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
231         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
232
233         err = vol.BlockWrite(context.Background(), fooHash, []byte("foo"))
234         c.Check(err, check.IsNil)
235         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
236         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
237         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
238         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
239
240         err = vol.BlockTouch(fooHash)
241         c.Check(err, check.IsNil)
242         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
243         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
244         c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
245
246         buf := &brbuffer{}
247         err = vol.BlockRead(context.Background(), fooHash, buf)
248         c.Check(err, check.IsNil)
249         c.Check(buf.String(), check.Equals, "foo")
250         c.Check(stats(), check.Matches, `.*"InBytes":3,.*`)
251         c.Check(stats(), check.Matches, `.*"OpenOps":2,.*`)
252
253         err = vol.BlockTrash(fooHash)
254         c.Check(err, check.IsNil)
255         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
256 }
257
258 func (s *unixVolumeSuite) TestSkipUnusedDirs(c *check.C) {
259         vol := s.newTestableUnixVolume(c, s.params, false)
260
261         err := os.Mkdir(vol.unixVolume.Root+"/aaa", 0777)
262         c.Assert(err, check.IsNil)
263         err = os.Mkdir(vol.unixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
264         c.Assert(err, check.IsNil)
265         deleteme := vol.unixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
266         err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777)
267         c.Assert(err, check.IsNil)
268         skipme := vol.unixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
269         err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777)
270         c.Assert(err, check.IsNil)
271         vol.EmptyTrash()
272
273         _, err = os.Stat(skipme)
274         c.Check(err, check.IsNil)
275
276         _, err = os.Stat(deleteme)
277         c.Check(err, check.NotNil)
278         c.Check(os.IsNotExist(err), check.Equals, true)
279 }