2960: Buffer reads when serialize enabled on unix volume.
[arvados.git] / services / keepstore / unix_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "os"
16         "sync"
17         "syscall"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "github.com/prometheus/client_golang/prometheus"
22         check "gopkg.in/check.v1"
23 )
24
25 type testableUnixVolume struct {
26         unixVolume
27         t TB
28 }
29
30 func (v *testableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
31         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{Actime: lastPut.Unix(), Modtime: lastPut.Unix()})
32         if err != nil {
33                 v.t.Fatal(err)
34         }
35 }
36
37 func (v *testableUnixVolume) Teardown() {
38         if err := os.RemoveAll(v.Root); err != nil {
39                 v.t.Error(err)
40         }
41 }
42
43 func (v *testableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
44         return "open", "create"
45 }
46
47 var _ = check.Suite(&unixVolumeSuite{})
48
49 type unixVolumeSuite struct {
50         params  newVolumeParams
51         volumes []*testableUnixVolume
52 }
53
54 func (s *unixVolumeSuite) SetUpTest(c *check.C) {
55         logger := ctxlog.TestLogger(c)
56         reg := prometheus.NewRegistry()
57         s.params = newVolumeParams{
58                 UUID:        "zzzzz-nyw5e-999999999999999",
59                 Cluster:     testCluster(c),
60                 Logger:      logger,
61                 MetricsVecs: newVolumeMetricsVecs(reg),
62                 BufferPool:  newBufferPool(logger, 8, reg),
63         }
64 }
65
66 func (s *unixVolumeSuite) TearDownTest(c *check.C) {
67         for _, v := range s.volumes {
68                 v.Teardown()
69         }
70 }
71
72 func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumeParams, serialize bool) *testableUnixVolume {
73         d, err := ioutil.TempDir("", "volume_test")
74         c.Check(err, check.IsNil)
75         var locker sync.Locker
76         if serialize {
77                 locker = &sync.Mutex{}
78         }
79         v := &testableUnixVolume{
80                 unixVolume: unixVolume{
81                         Root:       d,
82                         locker:     locker,
83                         uuid:       params.UUID,
84                         cluster:    params.Cluster,
85                         logger:     params.Logger,
86                         volume:     params.ConfigVolume,
87                         metrics:    params.MetricsVecs,
88                         bufferPool: params.BufferPool,
89                 },
90                 t: c,
91         }
92         c.Check(v.check(), check.IsNil)
93         s.volumes = append(s.volumes, v)
94         return v
95 }
96
97 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
98         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
99                 return s.newTestableUnixVolume(c, params, false)
100         })
101 }
102
103 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_ReadOnly(c *check.C) {
104         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
105                 return s.newTestableUnixVolume(c, params, false)
106         })
107 }
108
109 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Serialized(c *check.C) {
110         DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
111                 return s.newTestableUnixVolume(c, params, true)
112         })
113 }
114
115 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Readonly_Serialized(c *check.C) {
116         DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
117                 return s.newTestableUnixVolume(c, params, true)
118         })
119 }
120
121 func (s *unixVolumeSuite) TestGetNotFound(c *check.C) {
122         v := s.newTestableUnixVolume(c, s.params, true)
123         defer v.Teardown()
124         v.BlockWrite(context.Background(), TestHash, TestBlock)
125
126         buf := bytes.NewBuffer(nil)
127         _, err := v.BlockRead(context.Background(), TestHash2, buf)
128         switch {
129         case os.IsNotExist(err):
130                 break
131         case err == nil:
132                 c.Errorf("Read should have failed, returned %+q", buf.Bytes())
133         default:
134                 c.Errorf("Read expected ErrNotExist, got: %s", err)
135         }
136 }
137
138 func (s *unixVolumeSuite) TestPut(c *check.C) {
139         v := s.newTestableUnixVolume(c, s.params, false)
140         defer v.Teardown()
141
142         err := v.BlockWrite(context.Background(), TestHash, TestBlock)
143         if err != nil {
144                 c.Error(err)
145         }
146         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
147         if buf, err := ioutil.ReadFile(p); err != nil {
148                 c.Error(err)
149         } else if bytes.Compare(buf, TestBlock) != 0 {
150                 c.Errorf("Write should have stored %s, did store %s",
151                         string(TestBlock), string(buf))
152         }
153 }
154
155 func (s *unixVolumeSuite) TestPutBadVolume(c *check.C) {
156         v := s.newTestableUnixVolume(c, s.params, false)
157         defer v.Teardown()
158
159         err := os.RemoveAll(v.Root)
160         c.Assert(err, check.IsNil)
161         err = v.BlockWrite(context.Background(), TestHash, TestBlock)
162         c.Check(err, check.IsNil)
163 }
164
165 func (s *unixVolumeSuite) TestIsFull(c *check.C) {
166         v := s.newTestableUnixVolume(c, s.params, false)
167         defer v.Teardown()
168
169         fullPath := v.Root + "/full"
170         now := fmt.Sprintf("%d", time.Now().Unix())
171         os.Symlink(now, fullPath)
172         if !v.isFull() {
173                 c.Error("volume claims not to be full")
174         }
175         os.Remove(fullPath)
176
177         // Test with an expired /full link.
178         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
179         os.Symlink(expired, fullPath)
180         if v.isFull() {
181                 c.Error("volume should no longer be full")
182         }
183 }
184
185 func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
186         v := s.newTestableUnixVolume(c, s.params, false)
187         defer v.Teardown()
188
189         v.BlockWrite(context.Background(), TestHash, TestBlock)
190         mockErr := errors.New("Mock error")
191         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
192                 return mockErr
193         })
194         if err != mockErr {
195                 c.Errorf("Got %v, expected %v", err, mockErr)
196         }
197 }
198
199 func (s *unixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
200         v := s.newTestableUnixVolume(c, s.params, false)
201         defer v.Teardown()
202
203         funcCalled := false
204         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
205                 funcCalled = true
206                 return nil
207         })
208         if err == nil {
209                 c.Errorf("Expected error opening non-existent file")
210         }
211         if funcCalled {
212                 c.Errorf("Worker func should not have been called")
213         }
214 }
215
216 func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
217         v := s.newTestableUnixVolume(c, s.params, false)
218         defer v.Teardown()
219
220         v.BlockWrite(context.Background(), TestHash, TestBlock)
221
222         mtx := NewMockMutex()
223         v.locker = mtx
224
225         funcCalled := make(chan struct{})
226         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
227                 funcCalled <- struct{}{}
228                 return nil
229         })
230         select {
231         case mtx.AllowLock <- struct{}{}:
232         case <-funcCalled:
233                 c.Fatal("Function was called before mutex was acquired")
234         case <-time.After(5 * time.Second):
235                 c.Fatal("Timed out before mutex was acquired")
236         }
237         select {
238         case <-funcCalled:
239         case mtx.AllowUnlock <- struct{}{}:
240                 c.Fatal("Mutex was released before function was called")
241         case <-time.After(5 * time.Second):
242                 c.Fatal("Timed out waiting for funcCalled")
243         }
244         select {
245         case mtx.AllowUnlock <- struct{}{}:
246         case <-time.After(5 * time.Second):
247                 c.Fatal("Timed out waiting for getFunc() to release mutex")
248         }
249 }
250
251 type MockMutex struct {
252         AllowLock   chan struct{}
253         AllowUnlock chan struct{}
254 }
255
256 func NewMockMutex() *MockMutex {
257         return &MockMutex{
258                 AllowLock:   make(chan struct{}),
259                 AllowUnlock: make(chan struct{}),
260         }
261 }
262
263 // Lock waits for someone to send to AllowLock.
264 func (m *MockMutex) Lock() {
265         <-m.AllowLock
266 }
267
268 // Unlock waits for someone to send to AllowUnlock.
269 func (m *MockMutex) Unlock() {
270         <-m.AllowUnlock
271 }
272
273 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) {
274         v := s.newTestableUnixVolume(c, s.params, true)
275         defer v.Teardown()
276         v.locker.Lock()
277         defer v.locker.Unlock()
278         ctx, cancel := context.WithCancel(context.Background())
279         go func() {
280                 time.Sleep(50 * time.Millisecond)
281                 cancel()
282         }()
283         err := v.BlockWrite(ctx, TestHash, TestBlock)
284         if err != context.Canceled {
285                 c.Errorf("BlockWrite() returned %s -- expected short read / canceled", err)
286         }
287 }
288
289 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) {
290         v := s.newTestableUnixVolume(c, s.params, true)
291         defer v.Teardown()
292         err := v.BlockWrite(context.Background(), TestHash, TestBlock)
293         if err != nil {
294                 c.Fatal(err)
295         }
296         ctx, cancel := context.WithCancel(context.Background())
297         v.locker.Lock()
298         defer v.locker.Unlock()
299         go func() {
300                 time.Sleep(50 * time.Millisecond)
301                 cancel()
302         }()
303         n, err := v.BlockRead(ctx, TestHash, io.Discard)
304         if n > 0 || err != context.Canceled {
305                 c.Errorf("BlockRead() returned %d, %s -- expected short read / canceled", n, err)
306         }
307 }
308
309 func (s *unixVolumeSuite) TestStats(c *check.C) {
310         vol := s.newTestableUnixVolume(c, s.params, false)
311         stats := func() string {
312                 buf, err := json.Marshal(vol.InternalStats())
313                 c.Check(err, check.IsNil)
314                 return string(buf)
315         }
316
317         c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
318         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
319
320         _, err := vol.BlockRead(context.Background(), fooHash, io.Discard)
321         c.Check(err, check.NotNil)
322         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
323         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
324         c.Check(stats(), check.Matches, `.*"\*(fs|os)\.PathError":[^0].*`) // os.PathError changed to fs.PathError in Go 1.16
325         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
326         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
327         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
328
329         err = vol.BlockWrite(context.Background(), fooHash, []byte("foo"))
330         c.Check(err, check.IsNil)
331         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
332         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
333         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
334         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
335
336         err = vol.BlockTouch(fooHash)
337         c.Check(err, check.IsNil)
338         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
339         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
340         c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
341
342         buf := bytes.NewBuffer(nil)
343         _, err = vol.BlockRead(context.Background(), fooHash, buf)
344         c.Check(err, check.IsNil)
345         c.Check(buf.String(), check.Equals, "foo")
346         c.Check(stats(), check.Matches, `.*"InBytes":3,.*`)
347         c.Check(stats(), check.Matches, `.*"OpenOps":2,.*`)
348
349         err = vol.BlockTrash(fooHash)
350         c.Check(err, check.IsNil)
351         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
352 }
353
354 func (s *unixVolumeSuite) TestSkipUnusedDirs(c *check.C) {
355         vol := s.newTestableUnixVolume(c, s.params, false)
356
357         err := os.Mkdir(vol.unixVolume.Root+"/aaa", 0777)
358         c.Assert(err, check.IsNil)
359         err = os.Mkdir(vol.unixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
360         c.Assert(err, check.IsNil)
361         deleteme := vol.unixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
362         err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777)
363         c.Assert(err, check.IsNil)
364         skipme := vol.unixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
365         err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777)
366         c.Assert(err, check.IsNil)
367         vol.EmptyTrash()
368
369         _, err = os.Stat(skipme)
370         c.Check(err, check.IsNil)
371
372         _, err = os.Stat(deleteme)
373         c.Check(err, check.NotNil)
374         c.Check(os.IsNotExist(err), check.Equals, true)
375 }