7253: update manigest.parseManifestStream to raise error when the given manifest...
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "os"
10         "strings"
11         "sync"
12         "syscall"
13         "testing"
14         "time"
15 )
16
17 type TestableUnixVolume struct {
18         UnixVolume
19         t *testing.T
20 }
21
22 func NewTestableUnixVolume(t *testing.T, serialize bool, readonly bool) *TestableUnixVolume {
23         d, err := ioutil.TempDir("", "volume_test")
24         if err != nil {
25                 t.Fatal(err)
26         }
27         var locker sync.Locker
28         if serialize {
29                 locker = &sync.Mutex{}
30         }
31         return &TestableUnixVolume{
32                 UnixVolume: UnixVolume{
33                         root:     d,
34                         locker:   locker,
35                         readonly: readonly,
36                 },
37                 t: t,
38         }
39 }
40
41 // PutRaw writes a Keep block directly into a UnixVolume, even if
42 // the volume is readonly.
43 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
44         defer func(orig bool) {
45                 v.readonly = orig
46         }(v.readonly)
47         v.readonly = false
48         err := v.Put(locator, data)
49         if err != nil {
50                 v.t.Fatal(err)
51         }
52 }
53
54 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
55         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
56         if err != nil {
57                 v.t.Fatal(err)
58         }
59 }
60
61 func (v *TestableUnixVolume) Teardown() {
62         if err := os.RemoveAll(v.root); err != nil {
63                 v.t.Fatal(err)
64         }
65 }
66
67 // serialize = false; readonly = false
68 func TestUnixVolumeWithGenericTests(t *testing.T) {
69         DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
70                 return NewTestableUnixVolume(t, false, false)
71         })
72 }
73
74 // serialize = false; readonly = true
75 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
76         DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
77                 return NewTestableUnixVolume(t, false, true)
78         })
79 }
80
81 // serialize = true; readonly = false
82 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
83         DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
84                 return NewTestableUnixVolume(t, true, false)
85         })
86 }
87
88 // serialize = false; readonly = false
89 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
90         DoHandlersWithGenericVolumeTests(t, func(t *testing.T) (*RRVolumeManager, []TestableVolume) {
91                 vols := make([]Volume, 2)
92                 testableUnixVols := make([]TestableVolume, 2)
93
94                 for i := range vols {
95                         v := NewTestableUnixVolume(t, false, false)
96                         vols[i] = v
97                         testableUnixVols[i] = v
98                 }
99
100                 return MakeRRVolumeManager(vols), testableUnixVols
101         })
102 }
103
104 func TestGetNotFound(t *testing.T) {
105         v := NewTestableUnixVolume(t, false, false)
106         defer v.Teardown()
107         v.Put(TestHash, TestBlock)
108
109         buf, err := v.Get(TestHash2)
110         switch {
111         case os.IsNotExist(err):
112                 break
113         case err == nil:
114                 t.Errorf("Read should have failed, returned %s", string(buf))
115         default:
116                 t.Errorf("Read expected ErrNotExist, got: %s", err)
117         }
118 }
119
120 func TestPut(t *testing.T) {
121         v := NewTestableUnixVolume(t, false, false)
122         defer v.Teardown()
123
124         err := v.Put(TestHash, TestBlock)
125         if err != nil {
126                 t.Error(err)
127         }
128         p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
129         if buf, err := ioutil.ReadFile(p); err != nil {
130                 t.Error(err)
131         } else if bytes.Compare(buf, TestBlock) != 0 {
132                 t.Errorf("Write should have stored %s, did store %s",
133                         string(TestBlock), string(buf))
134         }
135 }
136
137 func TestPutBadVolume(t *testing.T) {
138         v := NewTestableUnixVolume(t, false, false)
139         defer v.Teardown()
140
141         os.Chmod(v.root, 000)
142         err := v.Put(TestHash, TestBlock)
143         if err == nil {
144                 t.Error("Write should have failed")
145         }
146 }
147
148 func TestUnixVolumeReadonly(t *testing.T) {
149         v := NewTestableUnixVolume(t, false, true)
150         defer v.Teardown()
151
152         v.PutRaw(TestHash, TestBlock)
153
154         _, err := v.Get(TestHash)
155         if err != nil {
156                 t.Errorf("got err %v, expected nil", err)
157         }
158
159         err = v.Put(TestHash, TestBlock)
160         if err != MethodDisabledError {
161                 t.Errorf("got err %v, expected MethodDisabledError", err)
162         }
163
164         err = v.Touch(TestHash)
165         if err != MethodDisabledError {
166                 t.Errorf("got err %v, expected MethodDisabledError", err)
167         }
168
169         err = v.Delete(TestHash)
170         if err != MethodDisabledError {
171                 t.Errorf("got err %v, expected MethodDisabledError", err)
172         }
173 }
174
175 func TestIsFull(t *testing.T) {
176         v := NewTestableUnixVolume(t, false, false)
177         defer v.Teardown()
178
179         fullPath := v.root + "/full"
180         now := fmt.Sprintf("%d", time.Now().Unix())
181         os.Symlink(now, fullPath)
182         if !v.IsFull() {
183                 t.Errorf("%s: claims not to be full", v)
184         }
185         os.Remove(fullPath)
186
187         // Test with an expired /full link.
188         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
189         os.Symlink(expired, fullPath)
190         if v.IsFull() {
191                 t.Errorf("%s: should no longer be full", v)
192         }
193 }
194
195 func TestNodeStatus(t *testing.T) {
196         v := NewTestableUnixVolume(t, false, false)
197         defer v.Teardown()
198
199         // Get node status and make a basic sanity check.
200         volinfo := v.Status()
201         if volinfo.MountPoint != v.root {
202                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
203         }
204         if volinfo.DeviceNum == 0 {
205                 t.Errorf("uninitialized device_num in %v", volinfo)
206         }
207         if volinfo.BytesFree == 0 {
208                 t.Errorf("uninitialized bytes_free in %v", volinfo)
209         }
210         if volinfo.BytesUsed == 0 {
211                 t.Errorf("uninitialized bytes_used in %v", volinfo)
212         }
213 }
214
215 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
216         v := NewTestableUnixVolume(t, false, false)
217         defer v.Teardown()
218
219         v.Put(TestHash, TestBlock)
220         mockErr := errors.New("Mock error")
221         err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
222                 return mockErr
223         })
224         if err != mockErr {
225                 t.Errorf("Got %v, expected %v", err, mockErr)
226         }
227 }
228
229 func TestUnixVolumeGetFuncFileError(t *testing.T) {
230         v := NewTestableUnixVolume(t, false, false)
231         defer v.Teardown()
232
233         funcCalled := false
234         err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
235                 funcCalled = true
236                 return nil
237         })
238         if err == nil {
239                 t.Errorf("Expected error opening non-existent file")
240         }
241         if funcCalled {
242                 t.Errorf("Worker func should not have been called")
243         }
244 }
245
246 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
247         v := NewTestableUnixVolume(t, false, false)
248         defer v.Teardown()
249
250         v.Put(TestHash, TestBlock)
251
252         mtx := NewMockMutex()
253         v.locker = mtx
254
255         funcCalled := make(chan struct{})
256         go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
257                 funcCalled <- struct{}{}
258                 return nil
259         })
260         select {
261         case mtx.AllowLock <- struct{}{}:
262         case <-funcCalled:
263                 t.Fatal("Function was called before mutex was acquired")
264         case <-time.After(5 * time.Second):
265                 t.Fatal("Timed out before mutex was acquired")
266         }
267         select {
268         case <-funcCalled:
269         case mtx.AllowUnlock <- struct{}{}:
270                 t.Fatal("Mutex was released before function was called")
271         case <-time.After(5 * time.Second):
272                 t.Fatal("Timed out waiting for funcCalled")
273         }
274         select {
275         case mtx.AllowUnlock <- struct{}{}:
276         case <-time.After(5 * time.Second):
277                 t.Fatal("Timed out waiting for getFunc() to release mutex")
278         }
279 }
280
281 func TestUnixVolumeCompare(t *testing.T) {
282         v := NewTestableUnixVolume(t, false, false)
283         defer v.Teardown()
284
285         v.Put(TestHash, TestBlock)
286         err := v.Compare(TestHash, TestBlock)
287         if err != nil {
288                 t.Errorf("Got err %q, expected nil", err)
289         }
290
291         err = v.Compare(TestHash, []byte("baddata"))
292         if err != CollisionError {
293                 t.Errorf("Got err %q, expected %q", err, CollisionError)
294         }
295
296         v.Put(TestHash, []byte("baddata"))
297         err = v.Compare(TestHash, TestBlock)
298         if err != DiskHashError {
299                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
300         }
301
302         p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
303         os.Chmod(p, 000)
304         err = v.Compare(TestHash, TestBlock)
305         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
306                 t.Errorf("Got err %q, expected %q", err, "permission denied")
307         }
308 }
309
310 // TODO(twp): show that the underlying Read/Write operations executed
311 // serially and not concurrently. The easiest way to do this is
312 // probably to activate verbose or debug logging, capture log output
313 // and examine it to confirm that Reads and Writes did not overlap.
314 //
315 // TODO(twp): a proper test of I/O serialization requires that a
316 // second request start while the first one is still underway.
317 // Guaranteeing that the test behaves this way requires some tricky
318 // synchronization and mocking.  For now we'll just launch a bunch of
319 // requests simultaenously in goroutines and demonstrate that they
320 // return accurate results.