Bug fix: migration requires CurrentApiClient
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "testing"
9         "time"
10 )
11
12 func TempUnixVolume(t *testing.T, serialize bool) UnixVolume {
13         d, err := ioutil.TempDir("", "volume_test")
14         if err != nil {
15                 t.Fatal(err)
16         }
17         return MakeUnixVolume(d, serialize)
18 }
19
20 func _teardown(v UnixVolume) {
21         if v.queue != nil {
22                 close(v.queue)
23         }
24         os.RemoveAll(v.root)
25 }
26
27 // store writes a Keep block directly into a UnixVolume, for testing
28 // UnixVolume methods.
29 //
30 func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
31         blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
32         if err := os.MkdirAll(blockdir, 0755); err != nil {
33                 t.Fatal(err)
34         }
35
36         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
37         if f, err := os.Create(blockpath); err == nil {
38                 f.Write(block)
39                 f.Close()
40         } else {
41                 t.Fatal(err)
42         }
43 }
44
45 func TestGet(t *testing.T) {
46         v := TempUnixVolume(t, false)
47         defer _teardown(v)
48         _store(t, v, TEST_HASH, TEST_BLOCK)
49
50         buf, err := v.Get(TEST_HASH)
51         if err != nil {
52                 t.Error(err)
53         }
54         if bytes.Compare(buf, TEST_BLOCK) != 0 {
55                 t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
56         }
57 }
58
59 func TestGetNotFound(t *testing.T) {
60         v := TempUnixVolume(t, false)
61         defer _teardown(v)
62         _store(t, v, TEST_HASH, TEST_BLOCK)
63
64         buf, err := v.Get(TEST_HASH_2)
65         switch {
66         case os.IsNotExist(err):
67                 break
68         case err == nil:
69                 t.Errorf("Read should have failed, returned %s", string(buf))
70         default:
71                 t.Errorf("Read expected ErrNotExist, got: %s", err)
72         }
73 }
74
75 func TestPut(t *testing.T) {
76         v := TempUnixVolume(t, false)
77         defer _teardown(v)
78
79         err := v.Put(TEST_HASH, TEST_BLOCK)
80         if err != nil {
81                 t.Error(err)
82         }
83         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
84         if buf, err := ioutil.ReadFile(p); err != nil {
85                 t.Error(err)
86         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
87                 t.Errorf("Write should have stored %s, did store %s",
88                         string(TEST_BLOCK), string(buf))
89         }
90 }
91
92 func TestPutBadVolume(t *testing.T) {
93         v := TempUnixVolume(t, false)
94         defer _teardown(v)
95
96         os.Chmod(v.root, 000)
97         err := v.Put(TEST_HASH, TEST_BLOCK)
98         if err == nil {
99                 t.Error("Write should have failed")
100         }
101 }
102
103 // TestPutTouch
104 //     Test that when applying PUT to a block that already exists,
105 //     the block's modification time is updated.
106 func TestPutTouch(t *testing.T) {
107         v := TempUnixVolume(t, false)
108         defer _teardown(v)
109
110         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
111                 t.Error(err)
112         }
113         old_mtime, err := v.Mtime(TEST_HASH)
114         if err != nil {
115                 t.Error(err)
116         }
117         if old_mtime.IsZero() {
118                 t.Errorf("v.Mtime(%s) returned a zero mtime\n", TEST_HASH)
119         }
120         // Sleep for 1s, then put the block again.  The volume
121         // should report a more recent mtime.
122         //
123         // TODO(twp): this would be better handled with a mock Time object.
124         // Alternatively, set the mtime manually to some moment in the past
125         // (maybe a v.SetMtime method?)
126         //
127         time.Sleep(time.Second)
128         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
129                 t.Error(err)
130         }
131         new_mtime, err := v.Mtime(TEST_HASH)
132         if err != nil {
133                 t.Error(err)
134         }
135
136         if !new_mtime.After(old_mtime) {
137                 t.Errorf("v.Put did not update the block mtime:\nold_mtime = %v\nnew_mtime = %v\n",
138                         old_mtime, new_mtime)
139         }
140 }
141
142 // Serialization tests: launch a bunch of concurrent
143 //
144 // TODO(twp): show that the underlying Read/Write operations executed
145 // serially and not concurrently. The easiest way to do this is
146 // probably to activate verbose or debug logging, capture log output
147 // and examine it to confirm that Reads and Writes did not overlap.
148 //
149 // TODO(twp): a proper test of I/O serialization requires that a
150 // second request start while the first one is still underway.
151 // Guaranteeing that the test behaves this way requires some tricky
152 // synchronization and mocking.  For now we'll just launch a bunch of
153 // requests simultaenously in goroutines and demonstrate that they
154 // return accurate results.
155 //
156 func TestGetSerialized(t *testing.T) {
157         // Create a volume with I/O serialization enabled.
158         v := TempUnixVolume(t, true)
159         defer _teardown(v)
160
161         _store(t, v, TEST_HASH, TEST_BLOCK)
162         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
163         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
164
165         sem := make(chan int)
166         go func(sem chan int) {
167                 buf, err := v.Get(TEST_HASH)
168                 if err != nil {
169                         t.Errorf("err1: %v", err)
170                 }
171                 if bytes.Compare(buf, TEST_BLOCK) != 0 {
172                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
173                 }
174                 sem <- 1
175         }(sem)
176
177         go func(sem chan int) {
178                 buf, err := v.Get(TEST_HASH_2)
179                 if err != nil {
180                         t.Errorf("err2: %v", err)
181                 }
182                 if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
183                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
184                 }
185                 sem <- 1
186         }(sem)
187
188         go func(sem chan int) {
189                 buf, err := v.Get(TEST_HASH_3)
190                 if err != nil {
191                         t.Errorf("err3: %v", err)
192                 }
193                 if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
194                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
195                 }
196                 sem <- 1
197         }(sem)
198
199         // Wait for all goroutines to finish
200         for done := 0; done < 3; {
201                 done += <-sem
202         }
203 }
204
205 func TestPutSerialized(t *testing.T) {
206         // Create a volume with I/O serialization enabled.
207         v := TempUnixVolume(t, true)
208         defer _teardown(v)
209
210         sem := make(chan int)
211         go func(sem chan int) {
212                 err := v.Put(TEST_HASH, TEST_BLOCK)
213                 if err != nil {
214                         t.Errorf("err1: %v", err)
215                 }
216                 sem <- 1
217         }(sem)
218
219         go func(sem chan int) {
220                 err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
221                 if err != nil {
222                         t.Errorf("err2: %v", err)
223                 }
224                 sem <- 1
225         }(sem)
226
227         go func(sem chan int) {
228                 err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
229                 if err != nil {
230                         t.Errorf("err3: %v", err)
231                 }
232                 sem <- 1
233         }(sem)
234
235         // Wait for all goroutines to finish
236         for done := 0; done < 2; {
237                 done += <-sem
238         }
239
240         // Double check that we actually wrote the blocks we expected to write.
241         buf, err := v.Get(TEST_HASH)
242         if err != nil {
243                 t.Errorf("Get #1: %v", err)
244         }
245         if bytes.Compare(buf, TEST_BLOCK) != 0 {
246                 t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
247         }
248
249         buf, err = v.Get(TEST_HASH_2)
250         if err != nil {
251                 t.Errorf("Get #2: %v", err)
252         }
253         if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
254                 t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
255         }
256
257         buf, err = v.Get(TEST_HASH_3)
258         if err != nil {
259                 t.Errorf("Get #3: %v", err)
260         }
261         if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
262                 t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
263         }
264 }
265
266 func TestIsFull(t *testing.T) {
267         v := TempUnixVolume(t, false)
268         defer _teardown(v)
269
270         full_path := v.root + "/full"
271         now := fmt.Sprintf("%d", time.Now().Unix())
272         os.Symlink(now, full_path)
273         if !v.IsFull() {
274                 t.Errorf("%s: claims not to be full", v)
275         }
276         os.Remove(full_path)
277
278         // Test with an expired /full link.
279         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
280         os.Symlink(expired, full_path)
281         if v.IsFull() {
282                 t.Errorf("%s: should no longer be full", v)
283         }
284 }