7179: Improve comments.
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "regexp"
9         "sort"
10         "strings"
11         "syscall"
12         "testing"
13         "time"
14 )
15
16 type TestableUnixVolume struct {
17         UnixVolume
18         t *testing.T
19 }
20
21 func NewTestableUnixVolume(t *testing.T, serialize bool, readonly bool) *TestableUnixVolume {
22         d, err := ioutil.TempDir("", "volume_test")
23         if err != nil {
24                 t.Fatal(err)
25         }
26         return &TestableUnixVolume{
27                 UnixVolume: UnixVolume{
28                         root:      d,
29                         serialize: serialize,
30                         readonly:  readonly,
31                 },
32                 t: t,
33         }
34 }
35
36 // PutRaw writes a Keep block directly into a UnixVolume, even if
37 // the volume is readonly.
38 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
39         defer func(orig bool) {
40                 v.readonly = orig
41         }(v.readonly)
42         v.readonly = false
43         err := v.Put(locator, data)
44         if err != nil {
45                 v.t.Fatal(err)
46         }
47 }
48
49 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
50         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
51         if err != nil {
52                 v.t.Fatal(err)
53         }
54 }
55
56 func (v *TestableUnixVolume) Teardown() {
57         if err := os.RemoveAll(v.root); err != nil {
58                 v.t.Fatal(err)
59         }
60 }
61
62 func TestUnixVolumeWithGenericTests(t *testing.T) {
63         DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
64                 return NewTestableUnixVolume(t, false, false)
65         })
66 }
67
68 func TestGet(t *testing.T) {
69         v := NewTestableUnixVolume(t, false, false)
70         defer v.Teardown()
71         v.Put(TEST_HASH, TEST_BLOCK)
72
73         buf, err := v.Get(TEST_HASH)
74         if err != nil {
75                 t.Error(err)
76         }
77         if bytes.Compare(buf, TEST_BLOCK) != 0 {
78                 t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
79         }
80 }
81
82 func TestGetNotFound(t *testing.T) {
83         v := NewTestableUnixVolume(t, false, false)
84         defer v.Teardown()
85         v.Put(TEST_HASH, TEST_BLOCK)
86
87         buf, err := v.Get(TEST_HASH_2)
88         switch {
89         case os.IsNotExist(err):
90                 break
91         case err == nil:
92                 t.Errorf("Read should have failed, returned %s", string(buf))
93         default:
94                 t.Errorf("Read expected ErrNotExist, got: %s", err)
95         }
96 }
97
98 func TestIndexTo(t *testing.T) {
99         v := NewTestableUnixVolume(t, false, false)
100         defer v.Teardown()
101
102         v.Put(TEST_HASH, TEST_BLOCK)
103         v.Put(TEST_HASH_2, TEST_BLOCK_2)
104         v.Put(TEST_HASH_3, TEST_BLOCK_3)
105
106         buf := new(bytes.Buffer)
107         v.IndexTo("", buf)
108         index_rows := strings.Split(string(buf.Bytes()), "\n")
109         sort.Strings(index_rows)
110         sorted_index := strings.Join(index_rows, "\n")
111         m, err := regexp.MatchString(
112                 `^\n`+TEST_HASH+`\+\d+ \d+\n`+
113                         TEST_HASH_3+`\+\d+ \d+\n`+
114                         TEST_HASH_2+`\+\d+ \d+$`,
115                 sorted_index)
116         if err != nil {
117                 t.Error(err)
118         } else if !m {
119                 t.Errorf("Got index %q for empty prefix", sorted_index)
120         }
121
122         for _, prefix := range []string{"f", "f15", "f15ac"} {
123                 buf = new(bytes.Buffer)
124                 v.IndexTo(prefix, buf)
125                 m, err := regexp.MatchString(`^`+TEST_HASH_2+`\+\d+ \d+\n$`, string(buf.Bytes()))
126                 if err != nil {
127                         t.Error(err)
128                 } else if !m {
129                         t.Errorf("Got index %q for prefix %q", string(buf.Bytes()), prefix)
130                 }
131         }
132 }
133
134 func TestPut(t *testing.T) {
135         v := NewTestableUnixVolume(t, false, false)
136         defer v.Teardown()
137
138         err := v.Put(TEST_HASH, TEST_BLOCK)
139         if err != nil {
140                 t.Error(err)
141         }
142         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
143         if buf, err := ioutil.ReadFile(p); err != nil {
144                 t.Error(err)
145         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
146                 t.Errorf("Write should have stored %s, did store %s",
147                         string(TEST_BLOCK), string(buf))
148         }
149 }
150
151 func TestPutBadVolume(t *testing.T) {
152         v := NewTestableUnixVolume(t, false, false)
153         defer v.Teardown()
154
155         os.Chmod(v.root, 000)
156         err := v.Put(TEST_HASH, TEST_BLOCK)
157         if err == nil {
158                 t.Error("Write should have failed")
159         }
160 }
161
162 func TestUnixVolumeReadonly(t *testing.T) {
163         v := NewTestableUnixVolume(t, false, true)
164         defer v.Teardown()
165
166         v.PutRaw(TEST_HASH, TEST_BLOCK)
167
168         _, err := v.Get(TEST_HASH)
169         if err != nil {
170                 t.Errorf("got err %v, expected nil", err)
171         }
172
173         err = v.Put(TEST_HASH, TEST_BLOCK)
174         if err != MethodDisabledError {
175                 t.Errorf("got err %v, expected MethodDisabledError", err)
176         }
177
178         err = v.Touch(TEST_HASH)
179         if err != MethodDisabledError {
180                 t.Errorf("got err %v, expected MethodDisabledError", err)
181         }
182
183         err = v.Delete(TEST_HASH)
184         if err != MethodDisabledError {
185                 t.Errorf("got err %v, expected MethodDisabledError", err)
186         }
187 }
188
189 // TestPutTouch
190 //     Test that when applying PUT to a block that already exists,
191 //     the block's modification time is updated.
192 func TestPutTouch(t *testing.T) {
193         v := NewTestableUnixVolume(t, false, false)
194         defer v.Teardown()
195
196         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
197                 t.Error(err)
198         }
199
200         // We'll verify { t0 < threshold < t1 }, where t0 is the
201         // existing block's timestamp on disk before Put() and t1 is
202         // its timestamp after Put().
203         threshold := time.Now().Add(-time.Second)
204
205         // Set the stored block's mtime far enough in the past that we
206         // can see the difference between "timestamp didn't change"
207         // and "timestamp granularity is too low".
208         v.TouchWithDate(TEST_HASH, time.Now().Add(-20*time.Second))
209
210         // Make sure v.Mtime() agrees the above Utime really worked.
211         if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
212                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
213         }
214
215         // Write the same block again.
216         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
217                 t.Error(err)
218         }
219
220         // Verify threshold < t1
221         if t1, err := v.Mtime(TEST_HASH); err != nil {
222                 t.Error(err)
223         } else if t1.Before(threshold) {
224                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
225         }
226 }
227
228 // Serialization tests: launch a bunch of concurrent
229 //
230 // TODO(twp): show that the underlying Read/Write operations executed
231 // serially and not concurrently. The easiest way to do this is
232 // probably to activate verbose or debug logging, capture log output
233 // and examine it to confirm that Reads and Writes did not overlap.
234 //
235 // TODO(twp): a proper test of I/O serialization requires that a
236 // second request start while the first one is still underway.
237 // Guaranteeing that the test behaves this way requires some tricky
238 // synchronization and mocking.  For now we'll just launch a bunch of
239 // requests simultaenously in goroutines and demonstrate that they
240 // return accurate results.
241 //
242 func TestGetSerialized(t *testing.T) {
243         // Create a volume with I/O serialization enabled.
244         v := NewTestableUnixVolume(t, true, false)
245         defer v.Teardown()
246
247         v.Put(TEST_HASH, TEST_BLOCK)
248         v.Put(TEST_HASH_2, TEST_BLOCK_2)
249         v.Put(TEST_HASH_3, TEST_BLOCK_3)
250
251         sem := make(chan int)
252         go func(sem chan int) {
253                 buf, err := v.Get(TEST_HASH)
254                 if err != nil {
255                         t.Errorf("err1: %v", err)
256                 }
257                 if bytes.Compare(buf, TEST_BLOCK) != 0 {
258                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
259                 }
260                 sem <- 1
261         }(sem)
262
263         go func(sem chan int) {
264                 buf, err := v.Get(TEST_HASH_2)
265                 if err != nil {
266                         t.Errorf("err2: %v", err)
267                 }
268                 if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
269                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
270                 }
271                 sem <- 1
272         }(sem)
273
274         go func(sem chan int) {
275                 buf, err := v.Get(TEST_HASH_3)
276                 if err != nil {
277                         t.Errorf("err3: %v", err)
278                 }
279                 if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
280                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
281                 }
282                 sem <- 1
283         }(sem)
284
285         // Wait for all goroutines to finish
286         for done := 0; done < 3; {
287                 done += <-sem
288         }
289 }
290
291 func TestPutSerialized(t *testing.T) {
292         // Create a volume with I/O serialization enabled.
293         v := NewTestableUnixVolume(t, true, false)
294         defer v.Teardown()
295
296         sem := make(chan int)
297         go func(sem chan int) {
298                 err := v.Put(TEST_HASH, TEST_BLOCK)
299                 if err != nil {
300                         t.Errorf("err1: %v", err)
301                 }
302                 sem <- 1
303         }(sem)
304
305         go func(sem chan int) {
306                 err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
307                 if err != nil {
308                         t.Errorf("err2: %v", err)
309                 }
310                 sem <- 1
311         }(sem)
312
313         go func(sem chan int) {
314                 err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
315                 if err != nil {
316                         t.Errorf("err3: %v", err)
317                 }
318                 sem <- 1
319         }(sem)
320
321         // Wait for all goroutines to finish
322         for done := 0; done < 3; {
323                 done += <-sem
324         }
325
326         // Double check that we actually wrote the blocks we expected to write.
327         buf, err := v.Get(TEST_HASH)
328         if err != nil {
329                 t.Errorf("Get #1: %v", err)
330         }
331         if bytes.Compare(buf, TEST_BLOCK) != 0 {
332                 t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
333         }
334
335         buf, err = v.Get(TEST_HASH_2)
336         if err != nil {
337                 t.Errorf("Get #2: %v", err)
338         }
339         if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
340                 t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
341         }
342
343         buf, err = v.Get(TEST_HASH_3)
344         if err != nil {
345                 t.Errorf("Get #3: %v", err)
346         }
347         if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
348                 t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
349         }
350 }
351
352 func TestIsFull(t *testing.T) {
353         v := NewTestableUnixVolume(t, false, false)
354         defer v.Teardown()
355
356         full_path := v.root + "/full"
357         now := fmt.Sprintf("%d", time.Now().Unix())
358         os.Symlink(now, full_path)
359         if !v.IsFull() {
360                 t.Errorf("%s: claims not to be full", v)
361         }
362         os.Remove(full_path)
363
364         // Test with an expired /full link.
365         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
366         os.Symlink(expired, full_path)
367         if v.IsFull() {
368                 t.Errorf("%s: should no longer be full", v)
369         }
370 }
371
372 func TestNodeStatus(t *testing.T) {
373         v := NewTestableUnixVolume(t, false, false)
374         defer v.Teardown()
375
376         // Get node status and make a basic sanity check.
377         volinfo := v.Status()
378         if volinfo.MountPoint != v.root {
379                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
380         }
381         if volinfo.DeviceNum == 0 {
382                 t.Errorf("uninitialized device_num in %v", volinfo)
383         }
384         if volinfo.BytesFree == 0 {
385                 t.Errorf("uninitialized bytes_free in %v", volinfo)
386         }
387         if volinfo.BytesUsed == 0 {
388                 t.Errorf("uninitialized bytes_used in %v", volinfo)
389         }
390 }