1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/sdk/go/ctxlog"
19 "github.com/prometheus/client_golang/prometheus"
20 check "gopkg.in/check.v1"
23 type testableUnixVolume struct {
28 func (v *testableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
29 err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{Actime: lastPut.Unix(), Modtime: lastPut.Unix()})
35 func (v *testableUnixVolume) Teardown() {
36 if err := os.RemoveAll(v.Root); err != nil {
41 func (v *testableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
42 return "open", "create"
45 var _ = check.Suite(&unixVolumeSuite{})
47 type unixVolumeSuite struct {
48 params newVolumeParams
49 volumes []*testableUnixVolume
52 func (s *unixVolumeSuite) SetUpTest(c *check.C) {
53 logger := ctxlog.TestLogger(c)
54 reg := prometheus.NewRegistry()
55 s.params = newVolumeParams{
56 UUID: "zzzzz-nyw5e-999999999999999",
57 Cluster: testCluster(c),
59 MetricsVecs: newVolumeMetricsVecs(reg),
60 BufferPool: newBufferPool(logger, 8, reg),
64 func (s *unixVolumeSuite) TearDownTest(c *check.C) {
65 for _, v := range s.volumes {
70 func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumeParams, serialize bool) *testableUnixVolume {
71 d, err := ioutil.TempDir("", "volume_test")
72 c.Check(err, check.IsNil)
73 var locker sync.Locker
75 locker = &sync.Mutex{}
77 v := &testableUnixVolume{
78 unixVolume: unixVolume{
82 cluster: params.Cluster,
83 logger: params.Logger,
84 volume: params.ConfigVolume,
85 metrics: params.MetricsVecs,
86 bufferPool: params.BufferPool,
90 c.Check(v.check(), check.IsNil)
91 s.volumes = append(s.volumes, v)
95 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
96 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
97 return s.newTestableUnixVolume(c, params, false)
101 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_ReadOnly(c *check.C) {
102 DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
103 return s.newTestableUnixVolume(c, params, false)
107 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Serialized(c *check.C) {
108 DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
109 return s.newTestableUnixVolume(c, params, true)
113 func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Readonly_Serialized(c *check.C) {
114 DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
115 return s.newTestableUnixVolume(c, params, true)
119 func (s *unixVolumeSuite) TestGetNotFound(c *check.C) {
120 v := s.newTestableUnixVolume(c, s.params, true)
122 v.BlockWrite(context.Background(), TestHash, TestBlock)
125 err := v.BlockRead(context.Background(), TestHash2, buf)
126 c.Check(err, check.FitsTypeOf, os.ErrNotExist)
129 func (s *unixVolumeSuite) TestPut(c *check.C) {
130 v := s.newTestableUnixVolume(c, s.params, false)
133 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
137 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
138 if buf, err := ioutil.ReadFile(p); err != nil {
140 } else if bytes.Compare(buf, TestBlock) != 0 {
141 c.Errorf("Write should have stored %s, did store %s",
142 string(TestBlock), string(buf))
146 func (s *unixVolumeSuite) TestPutBadVolume(c *check.C) {
147 v := s.newTestableUnixVolume(c, s.params, false)
150 err := os.RemoveAll(v.Root)
151 c.Assert(err, check.IsNil)
152 err = v.BlockWrite(context.Background(), TestHash, TestBlock)
153 c.Check(err, check.IsNil)
156 func (s *unixVolumeSuite) TestIsFull(c *check.C) {
157 v := s.newTestableUnixVolume(c, s.params, false)
160 fullPath := v.Root + "/full"
161 now := fmt.Sprintf("%d", time.Now().Unix())
162 os.Symlink(now, fullPath)
164 c.Error("volume claims not to be full")
168 // Test with an expired /full link.
169 expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
170 os.Symlink(expired, fullPath)
172 c.Error("volume should no longer be full")
176 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) {
177 v := s.newTestableUnixVolume(c, s.params, true)
180 defer v.locker.Unlock()
181 ctx, cancel := context.WithCancel(context.Background())
183 time.Sleep(50 * time.Millisecond)
186 err := v.BlockWrite(ctx, TestHash, TestBlock)
187 if err != context.Canceled {
188 c.Errorf("BlockWrite() returned %s -- expected short read / canceled", err)
192 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) {
193 v := s.newTestableUnixVolume(c, s.params, true)
195 err := v.BlockWrite(context.Background(), TestHash, TestBlock)
199 ctx, cancel := context.WithCancel(context.Background())
201 defer v.locker.Unlock()
203 time.Sleep(50 * time.Millisecond)
207 err = v.BlockRead(ctx, TestHash, buf)
208 if buf.Len() != 0 || err != context.Canceled {
209 c.Errorf("BlockRead() returned %q, %s -- expected short read / canceled", buf.String(), err)
213 func (s *unixVolumeSuite) TestStats(c *check.C) {
214 vol := s.newTestableUnixVolume(c, s.params, false)
215 stats := func() string {
216 buf, err := json.Marshal(vol.InternalStats())
217 c.Check(err, check.IsNil)
221 c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
222 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
224 err := vol.BlockRead(context.Background(), fooHash, brdiscard)
225 c.Check(err, check.NotNil)
226 c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
227 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
228 c.Check(stats(), check.Matches, `.*"\*(fs|os)\.PathError":[^0].*`) // os.PathError changed to fs.PathError in Go 1.16
229 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
230 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
231 c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
233 err = vol.BlockWrite(context.Background(), fooHash, []byte("foo"))
234 c.Check(err, check.IsNil)
235 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
236 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
237 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
238 c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
240 err = vol.BlockTouch(fooHash)
241 c.Check(err, check.IsNil)
242 c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
243 c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
244 c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
247 err = vol.BlockRead(context.Background(), fooHash, buf)
248 c.Check(err, check.IsNil)
249 c.Check(buf.String(), check.Equals, "foo")
250 c.Check(stats(), check.Matches, `.*"InBytes":3,.*`)
251 c.Check(stats(), check.Matches, `.*"OpenOps":2,.*`)
253 err = vol.BlockTrash(fooHash)
254 c.Check(err, check.IsNil)
255 c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
258 func (s *unixVolumeSuite) TestSkipUnusedDirs(c *check.C) {
259 vol := s.newTestableUnixVolume(c, s.params, false)
261 err := os.Mkdir(vol.unixVolume.Root+"/aaa", 0777)
262 c.Assert(err, check.IsNil)
263 err = os.Mkdir(vol.unixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
264 c.Assert(err, check.IsNil)
265 deleteme := vol.unixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
266 err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777)
267 c.Assert(err, check.IsNil)
268 skipme := vol.unixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
269 err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777)
270 c.Assert(err, check.IsNil)
273 _, err = os.Stat(skipme)
274 c.Check(err, check.IsNil)
276 _, err = os.Stat(deleteme)
277 c.Check(err, check.NotNil)
278 c.Check(os.IsNotExist(err), check.Equals, true)