1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/sdk/go/ctxlog"
21 "github.com/prometheus/client_golang/prometheus"
22 check "gopkg.in/check.v1"
25 type testableUnixVolume struct {
30 func (v *testableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
31 err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{Actime: lastPut.Unix(), Modtime: lastPut.Unix()})
37 func (v *testableUnixVolume) Teardown() {
38 if err := os.RemoveAll(v.Root); err != nil {
43 func (v *testableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
44 return "open", "create"
47 var _ = check.Suite(&unixVolumeSuite{})
49 type unixVolumeSuite struct {
50 params newVolumeParams
51 volumes []*testableUnixVolume
54 func (s *unixVolumeSuite) SetUpTest(c *check.C) {
55 logger := ctxlog.TestLogger(c)
56 reg := prometheus.NewRegistry()
57 s.params = newVolumeParams{
58 UUID: "zzzzz-nyw5e-999999999999999",
59 Cluster: testCluster(c),
61 MetricsVecs: newVolumeMetricsVecs(reg),
62 BufferPool: newBufferPool(logger, 8, reg),
66 func (s *unixVolumeSuite) TearDownTest(c *check.C) {
67 for _, v := range s.volumes {
72 func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumeParams, serialize bool) *testableUnixVolume {
73 d, err := ioutil.TempDir("", "volume_test")
74 c.Check(err, check.IsNil)
75 var locker sync.Locker
77 locker = &sync.Mutex{}
79 v := &testableUnixVolume{
80 unixVolume: unixVolume{
84 cluster: params.Cluster,
85 logger: params.Logger,
86 volume: params.ConfigVolume,
87 metrics: params.MetricsVecs,
88 bufferPool: params.BufferPool,
92 c.Check(v.check(), check.IsNil)
93 s.volumes = append(s.volumes, v)
97 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
98 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
99 return s.newTestableUnixVolume(c, params, false)
103 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_ReadOnly(c *check.C) {
104 DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
105 return s.newTestableUnixVolume(c, params, false)
109 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Serialized(c *check.C) {
110 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
111 return s.newTestableUnixVolume(c, params, true)
115 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Readonly_Serialized(c *check.C) {
116 DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
117 return s.newTestableUnixVolume(c, params, true)
121 func (s *unixVolumeSuite) TestGetNotFound(c *check.C) {
122 v := s.newTestableUnixVolume(c, s.params, true)
124 v.BlockWrite(context.Background(), TestHash, TestBlock)
126 buf := bytes.NewBuffer(nil)
127 _, err := v.BlockRead(context.Background(), TestHash2, buf)
129 case os.IsNotExist(err):
132 c.Errorf("Read should have failed, returned %+q", buf.Bytes())
134 c.Errorf("Read expected ErrNotExist, got: %s", err)
138 func (s *unixVolumeSuite) TestPut(c *check.C) {
139 v := s.newTestableUnixVolume(c, s.params, false)
142 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
146 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
147 if buf, err := ioutil.ReadFile(p); err != nil {
149 } else if bytes.Compare(buf, TestBlock) != 0 {
150 c.Errorf("Write should have stored %s, did store %s",
151 string(TestBlock), string(buf))
155 func (s *unixVolumeSuite) TestPutBadVolume(c *check.C) {
156 v := s.newTestableUnixVolume(c, s.params, false)
159 err := os.RemoveAll(v.Root)
160 c.Assert(err, check.IsNil)
161 err = v.BlockWrite(context.Background(), TestHash, TestBlock)
162 c.Check(err, check.IsNil)
165 func (s *unixVolumeSuite) TestIsFull(c *check.C) {
166 v := s.newTestableUnixVolume(c, s.params, false)
169 fullPath := v.Root + "/full"
170 now := fmt.Sprintf("%d", time.Now().Unix())
171 os.Symlink(now, fullPath)
173 c.Error("volume claims not to be full")
177 // Test with an expired /full link.
178 expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
179 os.Symlink(expired, fullPath)
181 c.Error("volume should no longer be full")
185 func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
186 v := s.newTestableUnixVolume(c, s.params, false)
189 v.BlockWrite(context.Background(), TestHash, TestBlock)
190 mockErr := errors.New("Mock error")
191 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
195 c.Errorf("Got %v, expected %v", err, mockErr)
199 func (s *unixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
200 v := s.newTestableUnixVolume(c, s.params, false)
204 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
209 c.Errorf("Expected error opening non-existent file")
212 c.Errorf("Worker func should not have been called")
216 func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
217 v := s.newTestableUnixVolume(c, s.params, false)
220 v.BlockWrite(context.Background(), TestHash, TestBlock)
222 mtx := NewMockMutex()
225 funcCalled := make(chan struct{})
226 go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
227 funcCalled <- struct{}{}
231 case mtx.AllowLock <- struct{}{}:
233 c.Fatal("Function was called before mutex was acquired")
234 case <-time.After(5 * time.Second):
235 c.Fatal("Timed out before mutex was acquired")
239 case mtx.AllowUnlock <- struct{}{}:
240 c.Fatal("Mutex was released before function was called")
241 case <-time.After(5 * time.Second):
242 c.Fatal("Timed out waiting for funcCalled")
245 case mtx.AllowUnlock <- struct{}{}:
246 case <-time.After(5 * time.Second):
247 c.Fatal("Timed out waiting for getFunc() to release mutex")
251 type MockMutex struct {
252 AllowLock chan struct{}
253 AllowUnlock chan struct{}
256 func NewMockMutex() *MockMutex {
258 AllowLock: make(chan struct{}),
259 AllowUnlock: make(chan struct{}),
263 // Lock waits for someone to send to AllowLock.
264 func (m *MockMutex) Lock() {
268 // Unlock waits for someone to send to AllowUnlock.
269 func (m *MockMutex) Unlock() {
273 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) {
274 v := s.newTestableUnixVolume(c, s.params, true)
277 defer v.locker.Unlock()
278 ctx, cancel := context.WithCancel(context.Background())
280 time.Sleep(50 * time.Millisecond)
283 err := v.BlockWrite(ctx, TestHash, TestBlock)
284 if err != context.Canceled {
285 c.Errorf("BlockWrite() returned %s -- expected short read / canceled", err)
289 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) {
290 v := s.newTestableUnixVolume(c, s.params, true)
292 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
296 ctx, cancel := context.WithCancel(context.Background())
298 defer v.locker.Unlock()
300 time.Sleep(50 * time.Millisecond)
303 n, err := v.BlockRead(ctx, TestHash, io.Discard)
304 if n > 0 || err != context.Canceled {
305 c.Errorf("BlockRead() returned %d, %s -- expected short read / canceled", n, err)
309 func (s *unixVolumeSuite) TestStats(c *check.C) {
310 vol := s.newTestableUnixVolume(c, s.params, false)
311 stats := func() string {
312 buf, err := json.Marshal(vol.InternalStats())
313 c.Check(err, check.IsNil)
317 c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
318 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
320 _, err := vol.BlockRead(context.Background(), fooHash, io.Discard)
321 c.Check(err, check.NotNil)
322 c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
323 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
324 c.Check(stats(), check.Matches, `.*"\*(fs|os)\.PathError":[^0].*`) // os.PathError changed to fs.PathError in Go 1.16
325 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
326 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
327 c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
329 err = vol.BlockWrite(context.Background(), fooHash, []byte("foo"))
330 c.Check(err, check.IsNil)
331 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
332 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
333 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
334 c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
336 err = vol.BlockTouch(fooHash)
337 c.Check(err, check.IsNil)
338 c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
339 c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
340 c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
342 buf := bytes.NewBuffer(nil)
343 _, err = vol.BlockRead(context.Background(), fooHash, buf)
344 c.Check(err, check.IsNil)
345 c.Check(buf.String(), check.Equals, "foo")
346 c.Check(stats(), check.Matches, `.*"InBytes":3,.*`)
347 c.Check(stats(), check.Matches, `.*"OpenOps":2,.*`)
349 err = vol.BlockTrash(fooHash)
350 c.Check(err, check.IsNil)
351 c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
354 func (s *unixVolumeSuite) TestSkipUnusedDirs(c *check.C) {
355 vol := s.newTestableUnixVolume(c, s.params, false)
357 err := os.Mkdir(vol.unixVolume.Root+"/aaa", 0777)
358 c.Assert(err, check.IsNil)
359 err = os.Mkdir(vol.unixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
360 c.Assert(err, check.IsNil)
361 deleteme := vol.unixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
362 err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777)
363 c.Assert(err, check.IsNil)
364 skipme := vol.unixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
365 err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777)
366 c.Assert(err, check.IsNil)
369 _, err = os.Stat(skipme)
370 c.Check(err, check.IsNil)
372 _, err = os.Stat(deleteme)
373 c.Check(err, check.NotNil)
374 c.Check(os.IsNotExist(err), check.Equals, true)