10467: Abort S3 and release buffer if caller disconnects during S3 PUT request.
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "os"
11         "strings"
12         "sync"
13         "syscall"
14         "testing"
15         "time"
16 )
17
18 type TestableUnixVolume struct {
19         UnixVolume
20         t TB
21 }
22
23 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
24         d, err := ioutil.TempDir("", "volume_test")
25         if err != nil {
26                 t.Fatal(err)
27         }
28         var locker sync.Locker
29         if serialize {
30                 locker = &sync.Mutex{}
31         }
32         return &TestableUnixVolume{
33                 UnixVolume: UnixVolume{
34                         Root:     d,
35                         ReadOnly: readonly,
36                         locker:   locker,
37                 },
38                 t: t,
39         }
40 }
41
42 // PutRaw writes a Keep block directly into a UnixVolume, even if
43 // the volume is readonly.
44 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
45         defer func(orig bool) {
46                 v.ReadOnly = orig
47         }(v.ReadOnly)
48         v.ReadOnly = false
49         err := v.Put(context.TODO(), locator, data)
50         if err != nil {
51                 v.t.Fatal(err)
52         }
53 }
54
55 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
56         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
57         if err != nil {
58                 v.t.Fatal(err)
59         }
60 }
61
62 func (v *TestableUnixVolume) Teardown() {
63         if err := os.RemoveAll(v.Root); err != nil {
64                 v.t.Fatal(err)
65         }
66 }
67
68 // serialize = false; readonly = false
69 func TestUnixVolumeWithGenericTests(t *testing.T) {
70         DoGenericVolumeTests(t, func(t TB) TestableVolume {
71                 return NewTestableUnixVolume(t, false, false)
72         })
73 }
74
75 // serialize = false; readonly = true
76 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
77         DoGenericVolumeTests(t, func(t TB) TestableVolume {
78                 return NewTestableUnixVolume(t, false, true)
79         })
80 }
81
82 // serialize = true; readonly = false
83 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
84         DoGenericVolumeTests(t, func(t TB) TestableVolume {
85                 return NewTestableUnixVolume(t, true, false)
86         })
87 }
88
89 // serialize = false; readonly = false
90 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
91         DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
92                 vols := make([]Volume, 2)
93                 testableUnixVols := make([]TestableVolume, 2)
94
95                 for i := range vols {
96                         v := NewTestableUnixVolume(t, false, false)
97                         vols[i] = v
98                         testableUnixVols[i] = v
99                 }
100
101                 return MakeRRVolumeManager(vols), testableUnixVols
102         })
103 }
104
105 func TestReplicationDefault1(t *testing.T) {
106         v := &UnixVolume{
107                 Root:     "/",
108                 ReadOnly: true,
109         }
110         if err := v.Start(); err != nil {
111                 t.Error(err)
112         }
113         if got := v.Replication(); got != 1 {
114                 t.Errorf("Replication() returned %d, expected 1 if no config given", got)
115         }
116 }
117
118 func TestGetNotFound(t *testing.T) {
119         v := NewTestableUnixVolume(t, false, false)
120         defer v.Teardown()
121         v.Put(context.TODO(), TestHash, TestBlock)
122
123         buf := make([]byte, BlockSize)
124         n, err := v.Get(context.TODO(), TestHash2, buf)
125         switch {
126         case os.IsNotExist(err):
127                 break
128         case err == nil:
129                 t.Errorf("Read should have failed, returned %+q", buf[:n])
130         default:
131                 t.Errorf("Read expected ErrNotExist, got: %s", err)
132         }
133 }
134
135 func TestPut(t *testing.T) {
136         v := NewTestableUnixVolume(t, false, false)
137         defer v.Teardown()
138
139         err := v.Put(context.TODO(), TestHash, TestBlock)
140         if err != nil {
141                 t.Error(err)
142         }
143         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
144         if buf, err := ioutil.ReadFile(p); err != nil {
145                 t.Error(err)
146         } else if bytes.Compare(buf, TestBlock) != 0 {
147                 t.Errorf("Write should have stored %s, did store %s",
148                         string(TestBlock), string(buf))
149         }
150 }
151
152 func TestPutBadVolume(t *testing.T) {
153         v := NewTestableUnixVolume(t, false, false)
154         defer v.Teardown()
155
156         os.Chmod(v.Root, 000)
157         err := v.Put(context.TODO(), TestHash, TestBlock)
158         if err == nil {
159                 t.Error("Write should have failed")
160         }
161 }
162
163 func TestUnixVolumeReadonly(t *testing.T) {
164         v := NewTestableUnixVolume(t, false, true)
165         defer v.Teardown()
166
167         v.PutRaw(TestHash, TestBlock)
168
169         buf := make([]byte, BlockSize)
170         _, err := v.Get(context.TODO(), TestHash, buf)
171         if err != nil {
172                 t.Errorf("got err %v, expected nil", err)
173         }
174
175         err = v.Put(context.TODO(), TestHash, TestBlock)
176         if err != MethodDisabledError {
177                 t.Errorf("got err %v, expected MethodDisabledError", err)
178         }
179
180         err = v.Touch(TestHash)
181         if err != MethodDisabledError {
182                 t.Errorf("got err %v, expected MethodDisabledError", err)
183         }
184
185         err = v.Trash(TestHash)
186         if err != MethodDisabledError {
187                 t.Errorf("got err %v, expected MethodDisabledError", err)
188         }
189 }
190
191 func TestIsFull(t *testing.T) {
192         v := NewTestableUnixVolume(t, false, false)
193         defer v.Teardown()
194
195         fullPath := v.Root + "/full"
196         now := fmt.Sprintf("%d", time.Now().Unix())
197         os.Symlink(now, fullPath)
198         if !v.IsFull() {
199                 t.Errorf("%s: claims not to be full", v)
200         }
201         os.Remove(fullPath)
202
203         // Test with an expired /full link.
204         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
205         os.Symlink(expired, fullPath)
206         if v.IsFull() {
207                 t.Errorf("%s: should no longer be full", v)
208         }
209 }
210
211 func TestNodeStatus(t *testing.T) {
212         v := NewTestableUnixVolume(t, false, false)
213         defer v.Teardown()
214
215         // Get node status and make a basic sanity check.
216         volinfo := v.Status()
217         if volinfo.MountPoint != v.Root {
218                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
219         }
220         if volinfo.DeviceNum == 0 {
221                 t.Errorf("uninitialized device_num in %v", volinfo)
222         }
223         if volinfo.BytesFree == 0 {
224                 t.Errorf("uninitialized bytes_free in %v", volinfo)
225         }
226         if volinfo.BytesUsed == 0 {
227                 t.Errorf("uninitialized bytes_used in %v", volinfo)
228         }
229 }
230
231 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
232         v := NewTestableUnixVolume(t, false, false)
233         defer v.Teardown()
234
235         v.Put(context.TODO(), TestHash, TestBlock)
236         mockErr := errors.New("Mock error")
237         err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
238                 return mockErr
239         })
240         if err != mockErr {
241                 t.Errorf("Got %v, expected %v", err, mockErr)
242         }
243 }
244
245 func TestUnixVolumeGetFuncFileError(t *testing.T) {
246         v := NewTestableUnixVolume(t, false, false)
247         defer v.Teardown()
248
249         funcCalled := false
250         err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
251                 funcCalled = true
252                 return nil
253         })
254         if err == nil {
255                 t.Errorf("Expected error opening non-existent file")
256         }
257         if funcCalled {
258                 t.Errorf("Worker func should not have been called")
259         }
260 }
261
262 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
263         v := NewTestableUnixVolume(t, false, false)
264         defer v.Teardown()
265
266         v.Put(context.TODO(), TestHash, TestBlock)
267
268         mtx := NewMockMutex()
269         v.locker = mtx
270
271         funcCalled := make(chan struct{})
272         go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
273                 funcCalled <- struct{}{}
274                 return nil
275         })
276         select {
277         case mtx.AllowLock <- struct{}{}:
278         case <-funcCalled:
279                 t.Fatal("Function was called before mutex was acquired")
280         case <-time.After(5 * time.Second):
281                 t.Fatal("Timed out before mutex was acquired")
282         }
283         select {
284         case <-funcCalled:
285         case mtx.AllowUnlock <- struct{}{}:
286                 t.Fatal("Mutex was released before function was called")
287         case <-time.After(5 * time.Second):
288                 t.Fatal("Timed out waiting for funcCalled")
289         }
290         select {
291         case mtx.AllowUnlock <- struct{}{}:
292         case <-time.After(5 * time.Second):
293                 t.Fatal("Timed out waiting for getFunc() to release mutex")
294         }
295 }
296
297 func TestUnixVolumeCompare(t *testing.T) {
298         v := NewTestableUnixVolume(t, false, false)
299         defer v.Teardown()
300
301         v.Put(context.TODO(), TestHash, TestBlock)
302         err := v.Compare(TestHash, TestBlock)
303         if err != nil {
304                 t.Errorf("Got err %q, expected nil", err)
305         }
306
307         err = v.Compare(TestHash, []byte("baddata"))
308         if err != CollisionError {
309                 t.Errorf("Got err %q, expected %q", err, CollisionError)
310         }
311
312         v.Put(context.TODO(), TestHash, []byte("baddata"))
313         err = v.Compare(TestHash, TestBlock)
314         if err != DiskHashError {
315                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
316         }
317
318         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
319         os.Chmod(p, 000)
320         err = v.Compare(TestHash, TestBlock)
321         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
322                 t.Errorf("Got err %q, expected %q", err, "permission denied")
323         }
324 }
325
326 // TODO(twp): show that the underlying Read/Write operations executed
327 // serially and not concurrently. The easiest way to do this is
328 // probably to activate verbose or debug logging, capture log output
329 // and examine it to confirm that Reads and Writes did not overlap.
330 //
331 // TODO(twp): a proper test of I/O serialization requires that a
332 // second request start while the first one is still underway.
333 // Guaranteeing that the test behaves this way requires some tricky
334 // synchronization and mocking.  For now we'll just launch a bunch of
335 // requests simultaenously in goroutines and demonstrate that they
336 // return accurate results.