d6b1c807f87cf52ca21fe9c043b8ad8c8f1ac430
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "os"
10         "strings"
11         "sync"
12         "syscall"
13         "testing"
14         "time"
15 )
16
17 type TestableUnixVolume struct {
18         UnixVolume
19         t *testing.T
20 }
21
22 func NewTestableUnixVolume(t *testing.T, serialize bool, readonly bool) *TestableUnixVolume {
23         d, err := ioutil.TempDir("", "volume_test")
24         if err != nil {
25                 t.Fatal(err)
26         }
27         var locker sync.Locker
28         if serialize {
29                 locker = &sync.Mutex{}
30         }
31         return &TestableUnixVolume{
32                 UnixVolume: UnixVolume{
33                         root:     d,
34                         locker:   locker,
35                         readonly: readonly,
36                 },
37                 t: t,
38         }
39 }
40
41 // PutRaw writes a Keep block directly into a UnixVolume, even if
42 // the volume is readonly.
43 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
44         defer func(orig bool) {
45                 v.readonly = orig
46         }(v.readonly)
47         v.readonly = false
48         err := v.Put(locator, data)
49         if err != nil {
50                 v.t.Fatal(err)
51         }
52 }
53
54 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
55         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
56         if err != nil {
57                 v.t.Fatal(err)
58         }
59 }
60
61 func (v *TestableUnixVolume) Teardown() {
62         if err := os.RemoveAll(v.root); err != nil {
63                 v.t.Fatal(err)
64         }
65 }
66
67 // serialize = false; readonly = false
68 func TestUnixVolumeWithGenericTests(t *testing.T) {
69         DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
70                 return NewTestableUnixVolume(t, false, false)
71         })
72 }
73
74 // serialize = false; readonly = true
75 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
76         DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
77                 return NewTestableUnixVolume(t, false, true)
78         })
79 }
80
81 // serialize = true; readonly = false
82 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
83         DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
84                 return NewTestableUnixVolume(t, true, false)
85         })
86 }
87
88 func TestGetNotFound(t *testing.T) {
89         v := NewTestableUnixVolume(t, false, false)
90         defer v.Teardown()
91         v.Put(TEST_HASH, TEST_BLOCK)
92
93         buf, err := v.Get(TEST_HASH_2)
94         switch {
95         case os.IsNotExist(err):
96                 break
97         case err == nil:
98                 t.Errorf("Read should have failed, returned %s", string(buf))
99         default:
100                 t.Errorf("Read expected ErrNotExist, got: %s", err)
101         }
102 }
103
104 func TestPut(t *testing.T) {
105         v := NewTestableUnixVolume(t, false, false)
106         defer v.Teardown()
107
108         err := v.Put(TEST_HASH, TEST_BLOCK)
109         if err != nil {
110                 t.Error(err)
111         }
112         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
113         if buf, err := ioutil.ReadFile(p); err != nil {
114                 t.Error(err)
115         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
116                 t.Errorf("Write should have stored %s, did store %s",
117                         string(TEST_BLOCK), string(buf))
118         }
119 }
120
121 func TestPutBadVolume(t *testing.T) {
122         v := NewTestableUnixVolume(t, false, false)
123         defer v.Teardown()
124
125         os.Chmod(v.root, 000)
126         err := v.Put(TEST_HASH, TEST_BLOCK)
127         if err == nil {
128                 t.Error("Write should have failed")
129         }
130 }
131
132 func TestUnixVolumeReadonly(t *testing.T) {
133         v := NewTestableUnixVolume(t, false, true)
134         defer v.Teardown()
135
136         v.PutRaw(TEST_HASH, TEST_BLOCK)
137
138         _, err := v.Get(TEST_HASH)
139         if err != nil {
140                 t.Errorf("got err %v, expected nil", err)
141         }
142
143         err = v.Put(TEST_HASH, TEST_BLOCK)
144         if err != MethodDisabledError {
145                 t.Errorf("got err %v, expected MethodDisabledError", err)
146         }
147
148         err = v.Touch(TEST_HASH)
149         if err != MethodDisabledError {
150                 t.Errorf("got err %v, expected MethodDisabledError", err)
151         }
152
153         err = v.Delete(TEST_HASH)
154         if err != MethodDisabledError {
155                 t.Errorf("got err %v, expected MethodDisabledError", err)
156         }
157 }
158
159 func TestIsFull(t *testing.T) {
160         v := NewTestableUnixVolume(t, false, false)
161         defer v.Teardown()
162
163         fullPath := v.root + "/full"
164         now := fmt.Sprintf("%d", time.Now().Unix())
165         os.Symlink(now, fullPath)
166         if !v.IsFull() {
167                 t.Errorf("%s: claims not to be full", v)
168         }
169         os.Remove(fullPath)
170
171         // Test with an expired /full link.
172         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
173         os.Symlink(expired, fullPath)
174         if v.IsFull() {
175                 t.Errorf("%s: should no longer be full", v)
176         }
177 }
178
179 func TestNodeStatus(t *testing.T) {
180         v := NewTestableUnixVolume(t, false, false)
181         defer v.Teardown()
182
183         // Get node status and make a basic sanity check.
184         volinfo := v.Status()
185         if volinfo.MountPoint != v.root {
186                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
187         }
188         if volinfo.DeviceNum == 0 {
189                 t.Errorf("uninitialized device_num in %v", volinfo)
190         }
191         if volinfo.BytesFree == 0 {
192                 t.Errorf("uninitialized bytes_free in %v", volinfo)
193         }
194         if volinfo.BytesUsed == 0 {
195                 t.Errorf("uninitialized bytes_used in %v", volinfo)
196         }
197 }
198
199 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
200         v := NewTestableUnixVolume(t, false, false)
201         defer v.Teardown()
202
203         v.Put(TEST_HASH, TEST_BLOCK)
204         mockErr := errors.New("Mock error")
205         err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
206                 return mockErr
207         })
208         if err != mockErr {
209                 t.Errorf("Got %v, expected %v", err, mockErr)
210         }
211 }
212
213 func TestUnixVolumeGetFuncFileError(t *testing.T) {
214         v := NewTestableUnixVolume(t, false, false)
215         defer v.Teardown()
216
217         funcCalled := false
218         err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
219                 funcCalled = true
220                 return nil
221         })
222         if err == nil {
223                 t.Errorf("Expected error opening non-existent file")
224         }
225         if funcCalled {
226                 t.Errorf("Worker func should not have been called")
227         }
228 }
229
230 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
231         v := NewTestableUnixVolume(t, false, false)
232         defer v.Teardown()
233
234         v.Put(TEST_HASH, TEST_BLOCK)
235
236         mtx := NewMockMutex()
237         v.locker = mtx
238
239         funcCalled := make(chan struct{})
240         go v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
241                 funcCalled <- struct{}{}
242                 return nil
243         })
244         select {
245         case mtx.AllowLock <- struct{}{}:
246         case <-funcCalled:
247                 t.Fatal("Function was called before mutex was acquired")
248         case <-time.After(5 * time.Second):
249                 t.Fatal("Timed out before mutex was acquired")
250         }
251         select {
252         case <-funcCalled:
253         case mtx.AllowUnlock <- struct{}{}:
254                 t.Fatal("Mutex was released before function was called")
255         case <-time.After(5 * time.Second):
256                 t.Fatal("Timed out waiting for funcCalled")
257         }
258         select {
259         case mtx.AllowUnlock <- struct{}{}:
260         case <-time.After(5 * time.Second):
261                 t.Fatal("Timed out waiting for getFunc() to release mutex")
262         }
263 }
264
265 func TestUnixVolumeCompare(t *testing.T) {
266         v := NewTestableUnixVolume(t, false, false)
267         defer v.Teardown()
268
269         v.Put(TEST_HASH, TEST_BLOCK)
270         err := v.Compare(TEST_HASH, TEST_BLOCK)
271         if err != nil {
272                 t.Errorf("Got err %q, expected nil", err)
273         }
274
275         err = v.Compare(TEST_HASH, []byte("baddata"))
276         if err != CollisionError {
277                 t.Errorf("Got err %q, expected %q", err, CollisionError)
278         }
279
280         v.Put(TEST_HASH, []byte("baddata"))
281         err = v.Compare(TEST_HASH, TEST_BLOCK)
282         if err != DiskHashError {
283                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
284         }
285
286         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
287         os.Chmod(p, 000)
288         err = v.Compare(TEST_HASH, TEST_BLOCK)
289         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
290                 t.Errorf("Got err %q, expected %q", err, "permission denied")
291         }
292 }
293
294 // TODO(twp): show that the underlying Read/Write operations executed
295 // serially and not concurrently. The easiest way to do this is
296 // probably to activate verbose or debug logging, capture log output
297 // and examine it to confirm that Reads and Writes did not overlap.
298 //
299 // TODO(twp): a proper test of I/O serialization requires that a
300 // second request start while the first one is still underway.
301 // Guaranteeing that the test behaves this way requires some tricky
302 // synchronization and mocking.  For now we'll just launch a bunch of
303 // requests simultaenously in goroutines and demonstrate that they
304 // return accurate results.