887247d3c3956e9475edf8437c913b3e1fc922c9
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "os"
10         "strings"
11         "sync"
12         "syscall"
13         "testing"
14         "time"
15 )
16
17 type TestableUnixVolume struct {
18         UnixVolume
19         t TB
20 }
21
22 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
23         d, err := ioutil.TempDir("", "volume_test")
24         if err != nil {
25                 t.Fatal(err)
26         }
27         var locker sync.Locker
28         if serialize {
29                 locker = &sync.Mutex{}
30         }
31         return &TestableUnixVolume{
32                 UnixVolume: UnixVolume{
33                         Root:     d,
34                         ReadOnly: readonly,
35                         locker:   locker,
36                 },
37                 t: t,
38         }
39 }
40
41 // PutRaw writes a Keep block directly into a UnixVolume, even if
42 // the volume is readonly.
43 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
44         defer func(orig bool) {
45                 v.ReadOnly = orig
46         }(v.ReadOnly)
47         v.ReadOnly = false
48         err := v.Put(locator, data)
49         if err != nil {
50                 v.t.Fatal(err)
51         }
52 }
53
54 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
55         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
56         if err != nil {
57                 v.t.Fatal(err)
58         }
59 }
60
61 func (v *TestableUnixVolume) Teardown() {
62         if err := os.RemoveAll(v.Root); err != nil {
63                 v.t.Fatal(err)
64         }
65 }
66
67 // serialize = false; readonly = false
68 func TestUnixVolumeWithGenericTests(t *testing.T) {
69         DoGenericVolumeTests(t, func(t TB) TestableVolume {
70                 return NewTestableUnixVolume(t, false, false)
71         })
72 }
73
74 // serialize = false; readonly = true
75 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
76         DoGenericVolumeTests(t, func(t TB) TestableVolume {
77                 return NewTestableUnixVolume(t, false, true)
78         })
79 }
80
81 // serialize = true; readonly = false
82 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
83         DoGenericVolumeTests(t, func(t TB) TestableVolume {
84                 return NewTestableUnixVolume(t, true, false)
85         })
86 }
87
88 // serialize = false; readonly = false
89 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
90         DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
91                 vols := make([]Volume, 2)
92                 testableUnixVols := make([]TestableVolume, 2)
93
94                 for i := range vols {
95                         v := NewTestableUnixVolume(t, false, false)
96                         vols[i] = v
97                         testableUnixVols[i] = v
98                 }
99
100                 return MakeRRVolumeManager(vols), testableUnixVols
101         })
102 }
103
104 func TestReplicationDefault1(t *testing.T) {
105         v := &UnixVolume{
106                 Root:     "/",
107                 ReadOnly: true,
108         }
109         if err := v.Start(); err != nil {
110                 t.Error(err)
111         }
112         if got := v.Replication(); got != 1 {
113                 t.Errorf("Replication() returned %d, expected 1 if no config given", got)
114         }
115 }
116
117 func TestGetNotFound(t *testing.T) {
118         v := NewTestableUnixVolume(t, false, false)
119         defer v.Teardown()
120         v.Put(TestHash, TestBlock)
121
122         buf := make([]byte, BlockSize)
123         n, err := v.Get(TestHash2, buf)
124         switch {
125         case os.IsNotExist(err):
126                 break
127         case err == nil:
128                 t.Errorf("Read should have failed, returned %+q", buf[:n])
129         default:
130                 t.Errorf("Read expected ErrNotExist, got: %s", err)
131         }
132 }
133
134 func TestPut(t *testing.T) {
135         v := NewTestableUnixVolume(t, false, false)
136         defer v.Teardown()
137
138         err := v.Put(TestHash, TestBlock)
139         if err != nil {
140                 t.Error(err)
141         }
142         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
143         if buf, err := ioutil.ReadFile(p); err != nil {
144                 t.Error(err)
145         } else if bytes.Compare(buf, TestBlock) != 0 {
146                 t.Errorf("Write should have stored %s, did store %s",
147                         string(TestBlock), string(buf))
148         }
149 }
150
151 func TestPutBadVolume(t *testing.T) {
152         v := NewTestableUnixVolume(t, false, false)
153         defer v.Teardown()
154
155         os.Chmod(v.Root, 000)
156         err := v.Put(TestHash, TestBlock)
157         if err == nil {
158                 t.Error("Write should have failed")
159         }
160 }
161
162 func TestUnixVolumeReadonly(t *testing.T) {
163         v := NewTestableUnixVolume(t, false, true)
164         defer v.Teardown()
165
166         v.PutRaw(TestHash, TestBlock)
167
168         buf := make([]byte, BlockSize)
169         _, err := v.Get(TestHash, buf)
170         if err != nil {
171                 t.Errorf("got err %v, expected nil", err)
172         }
173
174         err = v.Put(TestHash, TestBlock)
175         if err != MethodDisabledError {
176                 t.Errorf("got err %v, expected MethodDisabledError", err)
177         }
178
179         err = v.Touch(TestHash)
180         if err != MethodDisabledError {
181                 t.Errorf("got err %v, expected MethodDisabledError", err)
182         }
183
184         err = v.Trash(TestHash)
185         if err != MethodDisabledError {
186                 t.Errorf("got err %v, expected MethodDisabledError", err)
187         }
188 }
189
190 func TestIsFull(t *testing.T) {
191         v := NewTestableUnixVolume(t, false, false)
192         defer v.Teardown()
193
194         fullPath := v.Root + "/full"
195         now := fmt.Sprintf("%d", time.Now().Unix())
196         os.Symlink(now, fullPath)
197         if !v.IsFull() {
198                 t.Errorf("%s: claims not to be full", v)
199         }
200         os.Remove(fullPath)
201
202         // Test with an expired /full link.
203         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
204         os.Symlink(expired, fullPath)
205         if v.IsFull() {
206                 t.Errorf("%s: should no longer be full", v)
207         }
208 }
209
210 func TestNodeStatus(t *testing.T) {
211         v := NewTestableUnixVolume(t, false, false)
212         defer v.Teardown()
213
214         // Get node status and make a basic sanity check.
215         volinfo := v.Status()
216         if volinfo.MountPoint != v.Root {
217                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
218         }
219         if volinfo.DeviceNum == 0 {
220                 t.Errorf("uninitialized device_num in %v", volinfo)
221         }
222         if volinfo.BytesFree == 0 {
223                 t.Errorf("uninitialized bytes_free in %v", volinfo)
224         }
225         if volinfo.BytesUsed == 0 {
226                 t.Errorf("uninitialized bytes_used in %v", volinfo)
227         }
228 }
229
230 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
231         v := NewTestableUnixVolume(t, false, false)
232         defer v.Teardown()
233
234         v.Put(TestHash, TestBlock)
235         mockErr := errors.New("Mock error")
236         err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
237                 return mockErr
238         })
239         if err != mockErr {
240                 t.Errorf("Got %v, expected %v", err, mockErr)
241         }
242 }
243
244 func TestUnixVolumeGetFuncFileError(t *testing.T) {
245         v := NewTestableUnixVolume(t, false, false)
246         defer v.Teardown()
247
248         funcCalled := false
249         err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
250                 funcCalled = true
251                 return nil
252         })
253         if err == nil {
254                 t.Errorf("Expected error opening non-existent file")
255         }
256         if funcCalled {
257                 t.Errorf("Worker func should not have been called")
258         }
259 }
260
261 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
262         v := NewTestableUnixVolume(t, false, false)
263         defer v.Teardown()
264
265         v.Put(TestHash, TestBlock)
266
267         mtx := NewMockMutex()
268         v.locker = mtx
269
270         funcCalled := make(chan struct{})
271         go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
272                 funcCalled <- struct{}{}
273                 return nil
274         })
275         select {
276         case mtx.AllowLock <- struct{}{}:
277         case <-funcCalled:
278                 t.Fatal("Function was called before mutex was acquired")
279         case <-time.After(5 * time.Second):
280                 t.Fatal("Timed out before mutex was acquired")
281         }
282         select {
283         case <-funcCalled:
284         case mtx.AllowUnlock <- struct{}{}:
285                 t.Fatal("Mutex was released before function was called")
286         case <-time.After(5 * time.Second):
287                 t.Fatal("Timed out waiting for funcCalled")
288         }
289         select {
290         case mtx.AllowUnlock <- struct{}{}:
291         case <-time.After(5 * time.Second):
292                 t.Fatal("Timed out waiting for getFunc() to release mutex")
293         }
294 }
295
296 func TestUnixVolumeCompare(t *testing.T) {
297         v := NewTestableUnixVolume(t, false, false)
298         defer v.Teardown()
299
300         v.Put(TestHash, TestBlock)
301         err := v.Compare(TestHash, TestBlock)
302         if err != nil {
303                 t.Errorf("Got err %q, expected nil", err)
304         }
305
306         err = v.Compare(TestHash, []byte("baddata"))
307         if err != CollisionError {
308                 t.Errorf("Got err %q, expected %q", err, CollisionError)
309         }
310
311         v.Put(TestHash, []byte("baddata"))
312         err = v.Compare(TestHash, TestBlock)
313         if err != DiskHashError {
314                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
315         }
316
317         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
318         os.Chmod(p, 000)
319         err = v.Compare(TestHash, TestBlock)
320         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
321                 t.Errorf("Got err %q, expected %q", err, "permission denied")
322         }
323 }
324
325 // TODO(twp): show that the underlying Read/Write operations executed
326 // serially and not concurrently. The easiest way to do this is
327 // probably to activate verbose or debug logging, capture log output
328 // and examine it to confirm that Reads and Writes did not overlap.
329 //
330 // TODO(twp): a proper test of I/O serialization requires that a
331 // second request start while the first one is still underway.
332 // Guaranteeing that the test behaves this way requires some tricky
333 // synchronization and mocking.  For now we'll just launch a bunch of
334 // requests simultaenously in goroutines and demonstrate that they
335 // return accurate results.