10281: Update comment.
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "os"
10         "strings"
11         "sync"
12         "syscall"
13         "testing"
14         "time"
15 )
16
17 type TestableUnixVolume struct {
18         UnixVolume
19         t TB
20 }
21
22 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
23         d, err := ioutil.TempDir("", "volume_test")
24         if err != nil {
25                 t.Fatal(err)
26         }
27         var locker sync.Locker
28         if serialize {
29                 locker = &sync.Mutex{}
30         }
31         return &TestableUnixVolume{
32                 UnixVolume: UnixVolume{
33                         Root:     d,
34                         ReadOnly: readonly,
35                         locker:   locker,
36                 },
37                 t: t,
38         }
39 }
40
41 // PutRaw writes a Keep block directly into a UnixVolume, even if
42 // the volume is readonly.
43 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
44         defer func(orig bool) {
45                 v.ReadOnly = orig
46         }(v.ReadOnly)
47         v.ReadOnly = false
48         err := v.Put(locator, data)
49         if err != nil {
50                 v.t.Fatal(err)
51         }
52 }
53
54 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
55         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
56         if err != nil {
57                 v.t.Fatal(err)
58         }
59 }
60
61 func (v *TestableUnixVolume) Teardown() {
62         if err := os.RemoveAll(v.Root); err != nil {
63                 v.t.Fatal(err)
64         }
65 }
66
67 // serialize = false; readonly = false
68 func TestUnixVolumeWithGenericTests(t *testing.T) {
69         DoGenericVolumeTests(t, func(t TB) TestableVolume {
70                 return NewTestableUnixVolume(t, false, false)
71         })
72 }
73
74 // serialize = false; readonly = true
75 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
76         DoGenericVolumeTests(t, func(t TB) TestableVolume {
77                 return NewTestableUnixVolume(t, false, true)
78         })
79 }
80
81 // serialize = true; readonly = false
82 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
83         DoGenericVolumeTests(t, func(t TB) TestableVolume {
84                 return NewTestableUnixVolume(t, true, false)
85         })
86 }
87
88 // serialize = false; readonly = false
89 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
90         DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
91                 vols := make([]Volume, 2)
92                 testableUnixVols := make([]TestableVolume, 2)
93
94                 for i := range vols {
95                         v := NewTestableUnixVolume(t, false, false)
96                         vols[i] = v
97                         testableUnixVols[i] = v
98                 }
99
100                 return MakeRRVolumeManager(vols), testableUnixVols
101         })
102 }
103
104 func TestGetNotFound(t *testing.T) {
105         v := NewTestableUnixVolume(t, false, false)
106         defer v.Teardown()
107         v.Put(TestHash, TestBlock)
108
109         buf := make([]byte, BlockSize)
110         n, err := v.Get(TestHash2, buf)
111         switch {
112         case os.IsNotExist(err):
113                 break
114         case err == nil:
115                 t.Errorf("Read should have failed, returned %+q", buf[:n])
116         default:
117                 t.Errorf("Read expected ErrNotExist, got: %s", err)
118         }
119 }
120
121 func TestPut(t *testing.T) {
122         v := NewTestableUnixVolume(t, false, false)
123         defer v.Teardown()
124
125         err := v.Put(TestHash, TestBlock)
126         if err != nil {
127                 t.Error(err)
128         }
129         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
130         if buf, err := ioutil.ReadFile(p); err != nil {
131                 t.Error(err)
132         } else if bytes.Compare(buf, TestBlock) != 0 {
133                 t.Errorf("Write should have stored %s, did store %s",
134                         string(TestBlock), string(buf))
135         }
136 }
137
138 func TestPutBadVolume(t *testing.T) {
139         v := NewTestableUnixVolume(t, false, false)
140         defer v.Teardown()
141
142         os.Chmod(v.Root, 000)
143         err := v.Put(TestHash, TestBlock)
144         if err == nil {
145                 t.Error("Write should have failed")
146         }
147 }
148
149 func TestUnixVolumeReadonly(t *testing.T) {
150         v := NewTestableUnixVolume(t, false, true)
151         defer v.Teardown()
152
153         v.PutRaw(TestHash, TestBlock)
154
155         buf := make([]byte, BlockSize)
156         _, err := v.Get(TestHash, buf)
157         if err != nil {
158                 t.Errorf("got err %v, expected nil", err)
159         }
160
161         err = v.Put(TestHash, TestBlock)
162         if err != MethodDisabledError {
163                 t.Errorf("got err %v, expected MethodDisabledError", err)
164         }
165
166         err = v.Touch(TestHash)
167         if err != MethodDisabledError {
168                 t.Errorf("got err %v, expected MethodDisabledError", err)
169         }
170
171         err = v.Trash(TestHash)
172         if err != MethodDisabledError {
173                 t.Errorf("got err %v, expected MethodDisabledError", err)
174         }
175 }
176
177 func TestIsFull(t *testing.T) {
178         v := NewTestableUnixVolume(t, false, false)
179         defer v.Teardown()
180
181         fullPath := v.Root + "/full"
182         now := fmt.Sprintf("%d", time.Now().Unix())
183         os.Symlink(now, fullPath)
184         if !v.IsFull() {
185                 t.Errorf("%s: claims not to be full", v)
186         }
187         os.Remove(fullPath)
188
189         // Test with an expired /full link.
190         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
191         os.Symlink(expired, fullPath)
192         if v.IsFull() {
193                 t.Errorf("%s: should no longer be full", v)
194         }
195 }
196
197 func TestNodeStatus(t *testing.T) {
198         v := NewTestableUnixVolume(t, false, false)
199         defer v.Teardown()
200
201         // Get node status and make a basic sanity check.
202         volinfo := v.Status()
203         if volinfo.MountPoint != v.Root {
204                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
205         }
206         if volinfo.DeviceNum == 0 {
207                 t.Errorf("uninitialized device_num in %v", volinfo)
208         }
209         if volinfo.BytesFree == 0 {
210                 t.Errorf("uninitialized bytes_free in %v", volinfo)
211         }
212         if volinfo.BytesUsed == 0 {
213                 t.Errorf("uninitialized bytes_used in %v", volinfo)
214         }
215 }
216
217 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
218         v := NewTestableUnixVolume(t, false, false)
219         defer v.Teardown()
220
221         v.Put(TestHash, TestBlock)
222         mockErr := errors.New("Mock error")
223         err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
224                 return mockErr
225         })
226         if err != mockErr {
227                 t.Errorf("Got %v, expected %v", err, mockErr)
228         }
229 }
230
231 func TestUnixVolumeGetFuncFileError(t *testing.T) {
232         v := NewTestableUnixVolume(t, false, false)
233         defer v.Teardown()
234
235         funcCalled := false
236         err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
237                 funcCalled = true
238                 return nil
239         })
240         if err == nil {
241                 t.Errorf("Expected error opening non-existent file")
242         }
243         if funcCalled {
244                 t.Errorf("Worker func should not have been called")
245         }
246 }
247
248 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
249         v := NewTestableUnixVolume(t, false, false)
250         defer v.Teardown()
251
252         v.Put(TestHash, TestBlock)
253
254         mtx := NewMockMutex()
255         v.locker = mtx
256
257         funcCalled := make(chan struct{})
258         go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
259                 funcCalled <- struct{}{}
260                 return nil
261         })
262         select {
263         case mtx.AllowLock <- struct{}{}:
264         case <-funcCalled:
265                 t.Fatal("Function was called before mutex was acquired")
266         case <-time.After(5 * time.Second):
267                 t.Fatal("Timed out before mutex was acquired")
268         }
269         select {
270         case <-funcCalled:
271         case mtx.AllowUnlock <- struct{}{}:
272                 t.Fatal("Mutex was released before function was called")
273         case <-time.After(5 * time.Second):
274                 t.Fatal("Timed out waiting for funcCalled")
275         }
276         select {
277         case mtx.AllowUnlock <- struct{}{}:
278         case <-time.After(5 * time.Second):
279                 t.Fatal("Timed out waiting for getFunc() to release mutex")
280         }
281 }
282
283 func TestUnixVolumeCompare(t *testing.T) {
284         v := NewTestableUnixVolume(t, false, false)
285         defer v.Teardown()
286
287         v.Put(TestHash, TestBlock)
288         err := v.Compare(TestHash, TestBlock)
289         if err != nil {
290                 t.Errorf("Got err %q, expected nil", err)
291         }
292
293         err = v.Compare(TestHash, []byte("baddata"))
294         if err != CollisionError {
295                 t.Errorf("Got err %q, expected %q", err, CollisionError)
296         }
297
298         v.Put(TestHash, []byte("baddata"))
299         err = v.Compare(TestHash, TestBlock)
300         if err != DiskHashError {
301                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
302         }
303
304         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
305         os.Chmod(p, 000)
306         err = v.Compare(TestHash, TestBlock)
307         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
308                 t.Errorf("Got err %q, expected %q", err, "permission denied")
309         }
310 }
311
312 // TODO(twp): show that the underlying Read/Write operations executed
313 // serially and not concurrently. The easiest way to do this is
314 // probably to activate verbose or debug logging, capture log output
315 // and examine it to confirm that Reads and Writes did not overlap.
316 //
317 // TODO(twp): a proper test of I/O serialization requires that a
318 // second request start while the first one is still underway.
319 // Guaranteeing that the test behaves this way requires some tricky
320 // synchronization and mocking.  For now we'll just launch a bunch of
321 // requests simultaenously in goroutines and demonstrate that they
322 // return accurate results.