Added -serialize flag.
[arvados.git] / services / keep / src / keep / volume_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "testing"
9         "time"
10 )
11
12 func TempUnixVolume(t *testing.T, queue chan *IORequest) UnixVolume {
13         d, err := ioutil.TempDir("", "volume_test")
14         if err != nil {
15                 t.Fatal(err)
16         }
17         return MakeUnixVolume(d, queue)
18 }
19
20 func _teardown(v UnixVolume) {
21         if v.queue != nil {
22                 close(v.queue)
23         }
24         os.RemoveAll(v.root)
25 }
26
27 // store writes a Keep block directly into a UnixVolume, for testing
28 // UnixVolume methods.
29 //
30 func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
31         blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
32         if err := os.MkdirAll(blockdir, 0755); err != nil {
33                 t.Fatal(err)
34         }
35
36         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
37         if f, err := os.Create(blockpath); err == nil {
38                 f.Write(block)
39                 f.Close()
40         } else {
41                 t.Fatal(err)
42         }
43 }
44
45 func TestGet(t *testing.T) {
46         v := TempUnixVolume(t, nil)
47         defer _teardown(v)
48         _store(t, v, TEST_HASH, TEST_BLOCK)
49
50         buf, err := v.Get(TEST_HASH)
51         if err != nil {
52                 t.Error(err)
53         }
54         if bytes.Compare(buf, TEST_BLOCK) != 0 {
55                 t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
56         }
57 }
58
59 func TestGetNotFound(t *testing.T) {
60         v := TempUnixVolume(t, nil)
61         defer _teardown(v)
62         _store(t, v, TEST_HASH, TEST_BLOCK)
63
64         buf, err := v.Get(TEST_HASH_2)
65         switch {
66         case os.IsNotExist(err):
67                 break
68         case err == nil:
69                 t.Errorf("Read should have failed, returned %s", string(buf))
70         default:
71                 t.Errorf("Read expected ErrNotExist, got: %s", err)
72         }
73 }
74
75 func TestPut(t *testing.T) {
76         v := TempUnixVolume(t, nil)
77         defer _teardown(v)
78
79         err := v.Put(TEST_HASH, TEST_BLOCK)
80         if err != nil {
81                 t.Error(err)
82         }
83         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
84         if buf, err := ioutil.ReadFile(p); err != nil {
85                 t.Error(err)
86         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
87                 t.Errorf("Write should have stored %s, did store %s",
88                         string(TEST_BLOCK), string(buf))
89         }
90 }
91
92 func TestPutBadVolume(t *testing.T) {
93         v := TempUnixVolume(t, nil)
94         defer _teardown(v)
95
96         os.Chmod(v.root, 000)
97         err := v.Put(TEST_HASH, TEST_BLOCK)
98         if err == nil {
99                 t.Error("Write should have failed")
100         }
101 }
102
103 // Serialization tests.
104 //
105 // TODO(twp): a proper test of I/O serialization requires that
106 // a second request start while the first one is still executing.
107 // Doing this correctly requires some tricky synchronization.
108 // For now we'll just launch a bunch of requests in goroutines
109 // and demonstrate that they return accurate results.
110 //
111 func TestGetSerialized(t *testing.T) {
112         v := TempUnixVolume(t, make(chan *IORequest))
113         defer _teardown(v)
114
115         _store(t, v, TEST_HASH, TEST_BLOCK)
116         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
117         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
118
119         sem := make(chan int)
120         go func(sem chan int) {
121                 buf, err := v.Get(TEST_HASH)
122                 if err != nil {
123                         t.Errorf("err1: %v", err)
124                 }
125                 if bytes.Compare(buf, TEST_BLOCK) != 0 {
126                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
127                 }
128                 sem <- 1
129         }(sem)
130
131         go func(sem chan int) {
132                 buf, err := v.Get(TEST_HASH_2)
133                 if err != nil {
134                         t.Errorf("err2: %v", err)
135                 }
136                 if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
137                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
138                 }
139                 sem <- 1
140         }(sem)
141
142         go func(sem chan int) {
143                 buf, err := v.Get(TEST_HASH_3)
144                 if err != nil {
145                         t.Errorf("err3: %v", err)
146                 }
147                 if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
148                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
149                 }
150                 sem <- 1
151         }(sem)
152
153         // Wait for all goroutines to finish
154         for done := 0; done < 2; {
155                 done += <-sem
156         }
157 }
158
159 func TestPutSerialized(t *testing.T) {
160         v := TempUnixVolume(t, make(chan *IORequest))
161         defer _teardown(v)
162
163         sem := make(chan int)
164         go func(sem chan int) {
165                 err := v.Put(TEST_HASH, TEST_BLOCK)
166                 if err != nil {
167                         t.Errorf("err1: %v", err)
168                 }
169                 sem <- 1
170         }(sem)
171
172         go func(sem chan int) {
173                 err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
174                 if err != nil {
175                         t.Errorf("err2: %v", err)
176                 }
177                 sem <- 1
178         }(sem)
179
180         go func(sem chan int) {
181                 err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
182                 if err != nil {
183                         t.Errorf("err3: %v", err)
184                 }
185                 sem <- 1
186         }(sem)
187
188         // Wait for all goroutines to finish
189         for done := 0; done < 2; {
190                 done += <-sem
191         }
192
193         // Double check that we actually wrote the blocks we expected to write.
194         buf, err := v.Get(TEST_HASH)
195         if err != nil {
196                 t.Errorf("Get #1: %v", err)
197         }
198         if bytes.Compare(buf, TEST_BLOCK) != 0 {
199                 t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
200         }
201
202         buf, err = v.Get(TEST_HASH_2)
203         if err != nil {
204                 t.Errorf("Get #2: %v", err)
205         }
206         if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
207                 t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
208         }
209
210         buf, err = v.Get(TEST_HASH_3)
211         if err != nil {
212                 t.Errorf("Get #3: %v", err)
213         }
214         if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
215                 t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
216         }
217 }
218
219 func TestIsFull(t *testing.T) {
220         v := TempUnixVolume(t, nil)
221         defer _teardown(v)
222
223         full_path := v.root + "/full"
224         now := fmt.Sprintf("%d", time.Now().Unix())
225         os.Symlink(now, full_path)
226         if !v.IsFull() {
227                 t.Errorf("%s: claims not to be full", v)
228         }
229         os.Remove(full_path)
230
231         // Test with an expired /full link.
232         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
233         os.Symlink(expired, full_path)
234         if v.IsFull() {
235                 t.Errorf("%s: should no longer be full", v)
236         }
237 }