5f189fc8aa6fa97f58f62cf7c2e209f92bfe29cc
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient_test.go
1 package keepclient
2
3 import (
4         "arvados.org/buffer"
5         "crypto/md5"
6         "flag"
7         "fmt"
8         . "gopkg.in/check.v1"
9         "io"
10         "io/ioutil"
11         "log"
12         "net"
13         "net/http"
14         "os"
15         "os/exec"
16         "sort"
17         "strings"
18         "testing"
19 )
20
21 // Gocheck boilerplate
22 func Test(t *testing.T) { TestingT(t) }
23
24 // Gocheck boilerplate
25 var _ = Suite(&ServerRequiredSuite{})
26 var _ = Suite(&StandaloneSuite{})
27
28 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
29
30 // Tests that require the Keep server running
31 type ServerRequiredSuite struct{}
32
33 // Standalone tests
34 type StandaloneSuite struct{}
35
36 func pythonDir() string {
37         gopath := os.Getenv("GOPATH")
38         return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
39 }
40
41 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
42         if *no_server {
43                 c.Skip("Skipping tests that require server")
44         } else {
45                 os.Chdir(pythonDir())
46                 exec.Command("python", "run_test_server.py", "start").Run()
47                 exec.Command("python", "run_test_server.py", "start_keep").Run()
48         }
49 }
50
51 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
52         os.Chdir(pythonDir())
53         exec.Command("python", "run_test_server.py", "stop_keep").Run()
54         exec.Command("python", "run_test_server.py", "stop").Run()
55 }
56
57 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
58         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
59         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
60         os.Setenv("ARVADOS_API_HOST_INSECURE", "")
61
62         kc, err := MakeKeepClient()
63         c.Assert(kc.ApiServer, Equals, "localhost:3001")
64         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
65         c.Assert(kc.ApiInsecure, Equals, false)
66
67         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
68
69         kc, err = MakeKeepClient()
70         c.Assert(kc.ApiServer, Equals, "localhost:3001")
71         c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
72         c.Assert(kc.ApiInsecure, Equals, true)
73
74         c.Assert(err, Equals, nil)
75         c.Assert(len(kc.Service_roots), Equals, 2)
76         c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
77         c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
78 }
79
80 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
81         kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
82
83         // "foo" acbd18db4cc2f85cedef654fccc4a4d8
84         foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
85         c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
86
87         // "bar" 37b51d194a7513e45b56f6524f2d51f2
88         bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
89         c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
90 }
91
92 type StubPutHandler struct {
93         c              *C
94         expectPath     string
95         expectApiToken string
96         expectBody     string
97         handled        chan string
98 }
99
100 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
101         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
102         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
103         body, err := ioutil.ReadAll(req.Body)
104         this.c.Check(err, Equals, nil)
105         this.c.Check(body, DeepEquals, []byte(this.expectBody))
106         resp.WriteHeader(200)
107         this.handled <- fmt.Sprintf("http://%s", req.Host)
108 }
109
110 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
111         server := http.Server{Handler: st}
112
113         var err error
114         listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
115         if err != nil {
116                 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
117         }
118
119         url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
120
121         go server.Serve(listener)
122         return listener, url
123 }
124
125 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
126         io.ReadCloser, io.WriteCloser, chan UploadStatus)) {
127
128         listener, url := RunBogusKeepServer(st, 2990)
129         defer listener.Close()
130
131         kc, _ := MakeKeepClient()
132         kc.ApiToken = "abc123"
133
134         reader, writer := io.Pipe()
135         upload_status := make(chan UploadStatus)
136
137         f(kc, url, reader, writer, upload_status)
138 }
139
140 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
141         st := StubPutHandler{
142                 c,
143                 "acbd18db4cc2f85cedef654fccc4a4d8",
144                 "abc123",
145                 "foo",
146                 make(chan string)}
147
148         UploadToStubHelper(c, st,
149                 func(kc KeepClient, url string, reader io.ReadCloser,
150                         writer io.WriteCloser, upload_status chan UploadStatus) {
151
152                         go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
153
154                         writer.Write([]byte("foo"))
155                         writer.Close()
156
157                         <-st.handled
158                         status := <-upload_status
159                         c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
160                 })
161 }
162
163 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
164         st := StubPutHandler{
165                 c,
166                 "acbd18db4cc2f85cedef654fccc4a4d8",
167                 "abc123",
168                 "foo",
169                 make(chan string)}
170
171         UploadToStubHelper(c, st,
172                 func(kc KeepClient, url string, reader io.ReadCloser,
173                         writer io.WriteCloser, upload_status chan UploadStatus) {
174
175                         // Buffer for reads from 'r'
176                         buf := make([]byte, 512)
177
178                         // Read requests on Transfer() buffer
179                         requests := make(chan buffer.ReadRequest)
180                         defer close(requests)
181
182                         // Reporting reader error states
183                         reader_status := make(chan error)
184
185                         go buffer.Transfer(buf, reader, requests, reader_status)
186
187                         br1 := buffer.MakeBufferReader(requests)
188
189                         go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
190
191                         writer.Write([]byte("foo"))
192                         writer.Close()
193
194                         <-reader_status
195                         <-st.handled
196
197                         status := <-upload_status
198                         c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
199
200                         //c.Check(true, Equals, false)
201                 })
202 }
203
204 type FailHandler struct {
205         handled chan string
206 }
207
208 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
209         resp.WriteHeader(400)
210         this.handled <- fmt.Sprintf("http://%s", req.Host)
211 }
212
213 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
214         st := FailHandler{
215                 make(chan string)}
216
217         hash := "acbd18db4cc2f85cedef654fccc4a4d8"
218
219         UploadToStubHelper(c, st,
220                 func(kc KeepClient, url string, reader io.ReadCloser,
221                         writer io.WriteCloser, upload_status chan UploadStatus) {
222
223                         go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
224
225                         writer.Write([]byte("foo"))
226                         writer.Close()
227
228                         <-st.handled
229
230                         status := <-upload_status
231                         c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
232                         c.Check(status.StatusCode, Equals, 400)
233                 })
234
235 }
236
237 type KeepServer struct {
238         listener net.Listener
239         url      string
240 }
241
242 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
243         ks = make([]KeepServer, n)
244
245         for i := 0; i < n; i += 1 {
246                 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
247                 ks[i] = KeepServer{boguslistener, bogusurl}
248         }
249
250         return ks
251 }
252
253 func (s *StandaloneSuite) TestPutB(c *C) {
254         log.Printf("TestPutB")
255
256         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
257
258         st := StubPutHandler{
259                 c,
260                 hash,
261                 "abc123",
262                 "foo",
263                 make(chan string, 2)}
264
265         kc, _ := MakeKeepClient()
266
267         kc.Want_replicas = 2
268         kc.ApiToken = "abc123"
269         kc.Service_roots = make([]string, 5)
270
271         ks := RunSomeFakeKeepServers(st, 5, 2990)
272
273         for i := 0; i < len(ks); i += 1 {
274                 kc.Service_roots[i] = ks[i].url
275                 defer ks[i].listener.Close()
276         }
277
278         sort.Strings(kc.Service_roots)
279
280         kc.PutB([]byte("foo"))
281
282         shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
283
284         c.Check(<-st.handled, Equals, shuff[0])
285         c.Check(<-st.handled, Equals, shuff[1])
286 }
287
288 func (s *StandaloneSuite) TestPutHR(c *C) {
289         log.Printf("TestPutHR")
290
291         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
292
293         st := StubPutHandler{
294                 c,
295                 hash,
296                 "abc123",
297                 "foo",
298                 make(chan string, 2)}
299
300         kc, _ := MakeKeepClient()
301
302         kc.Want_replicas = 2
303         kc.ApiToken = "abc123"
304         kc.Service_roots = make([]string, 5)
305
306         ks := RunSomeFakeKeepServers(st, 5, 2990)
307
308         for i := 0; i < len(ks); i += 1 {
309                 kc.Service_roots[i] = ks[i].url
310                 defer ks[i].listener.Close()
311         }
312
313         sort.Strings(kc.Service_roots)
314
315         reader, writer := io.Pipe()
316
317         go func() {
318                 writer.Write([]byte("foo"))
319                 writer.Close()
320         }()
321
322         kc.PutHR(hash, reader, 3)
323
324         shuff := kc.ShuffledServiceRoots(hash)
325         log.Print(shuff)
326
327         s1 := <-st.handled
328         s2 := <-st.handled
329
330         c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
331                 (s1 == shuff[1] && s2 == shuff[0]),
332                 Equals,
333                 true)
334 }
335
336 func (s *StandaloneSuite) TestPutWithFail(c *C) {
337         log.Printf("TestPutWithFail")
338
339         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
340
341         st := StubPutHandler{
342                 c,
343                 hash,
344                 "abc123",
345                 "foo",
346                 make(chan string, 2)}
347
348         fh := FailHandler{
349                 make(chan string, 1)}
350
351         kc, _ := MakeKeepClient()
352
353         kc.Want_replicas = 2
354         kc.ApiToken = "abc123"
355         kc.Service_roots = make([]string, 5)
356
357         ks1 := RunSomeFakeKeepServers(st, 4, 2990)
358         ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
359
360         for i, k := range ks1 {
361                 kc.Service_roots[i] = k.url
362                 defer k.listener.Close()
363         }
364         for i, k := range ks2 {
365                 kc.Service_roots[len(ks1)+i] = k.url
366                 defer k.listener.Close()
367         }
368
369         sort.Strings(kc.Service_roots)
370
371         shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
372
373         phash, replicas, err := kc.PutB([]byte("foo"))
374
375         <-fh.handled
376
377         c.Check(err, Equals, nil)
378         c.Check(phash, Equals, hash)
379         c.Check(replicas, Equals, 2)
380         c.Check(<-st.handled, Equals, shuff[1])
381         c.Check(<-st.handled, Equals, shuff[2])
382 }
383
384 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
385         log.Printf("TestPutWithTooManyFail")
386
387         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
388
389         st := StubPutHandler{
390                 c,
391                 hash,
392                 "abc123",
393                 "foo",
394                 make(chan string, 1)}
395
396         fh := FailHandler{
397                 make(chan string, 4)}
398
399         kc, _ := MakeKeepClient()
400
401         kc.Want_replicas = 2
402         kc.ApiToken = "abc123"
403         kc.Service_roots = make([]string, 5)
404
405         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
406         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
407
408         for i, k := range ks1 {
409                 kc.Service_roots[i] = k.url
410                 defer k.listener.Close()
411         }
412         for i, k := range ks2 {
413                 kc.Service_roots[len(ks1)+i] = k.url
414                 defer k.listener.Close()
415         }
416
417         sort.Strings(kc.Service_roots)
418
419         shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
420
421         _, replicas, err := kc.PutB([]byte("foo"))
422
423         c.Check(err, Equals, InsufficientReplicasError)
424         c.Check(replicas, Equals, 1)
425         c.Check(<-st.handled, Equals, shuff[1])
426 }
427
428 type StubGetHandler struct {
429         c              *C
430         expectPath     string
431         expectApiToken string
432         returnBody     []byte
433 }
434
435 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
436         this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
437         this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
438         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
439         resp.Write(this.returnBody)
440 }
441
442 func (s *StandaloneSuite) TestGet(c *C) {
443
444         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
445
446         st := StubGetHandler{
447                 c,
448                 hash,
449                 "abc123",
450                 []byte("foo")}
451
452         listener, url := RunBogusKeepServer(st, 2990)
453         defer listener.Close()
454
455         kc, _ := MakeKeepClient()
456         kc.ApiToken = "abc123"
457         kc.Service_roots = []string{url}
458
459         r, n, url2, err := kc.Get(hash)
460         c.Check(err, Equals, nil)
461         c.Check(n, Equals, int64(3))
462         c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
463
464         content, err2 := ioutil.ReadAll(r)
465         c.Check(err2, Equals, nil)
466         c.Check(content, DeepEquals, []byte("foo"))
467 }
468
469 func (s *StandaloneSuite) TestGetFail(c *C) {
470         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
471
472         st := FailHandler{make(chan string, 1)}
473
474         listener, url := RunBogusKeepServer(st, 2990)
475         defer listener.Close()
476
477         kc, _ := MakeKeepClient()
478         kc.ApiToken = "abc123"
479         kc.Service_roots = []string{url}
480
481         r, n, url2, err := kc.Get(hash)
482         c.Check(err, Equals, BlockNotFound)
483         c.Check(n, Equals, int64(0))
484         c.Check(url2, Equals, "")
485         c.Check(r, Equals, nil)
486 }
487
488 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
489
490         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
491
492         fh := FailHandler{
493                 make(chan string, 1)}
494
495         st := StubGetHandler{
496                 c,
497                 hash,
498                 "abc123",
499                 []byte("foo")}
500
501         kc, _ := MakeKeepClient()
502         kc.ApiToken = "abc123"
503         kc.Service_roots = make([]string, 5)
504
505         ks1 := RunSomeFakeKeepServers(st, 1, 2990)
506         ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
507
508         for i, k := range ks1 {
509                 kc.Service_roots[i] = k.url
510                 defer k.listener.Close()
511         }
512         for i, k := range ks2 {
513                 kc.Service_roots[len(ks1)+i] = k.url
514                 defer k.listener.Close()
515         }
516
517         sort.Strings(kc.Service_roots)
518
519         r, n, url2, err := kc.Get(hash)
520         <-fh.handled
521         c.Check(err, Equals, nil)
522         c.Check(n, Equals, int64(3))
523         c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
524
525         content, err2 := ioutil.ReadAll(r)
526         c.Check(err2, Equals, nil)
527         c.Check(content, DeepEquals, []byte("foo"))
528 }
529
530 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
531         os.Setenv("ARVADOS_API_HOST", "localhost:3001")
532         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
533         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
534
535         kc, err := MakeKeepClient()
536         c.Assert(err, Equals, nil)
537
538         hash, replicas, err := kc.PutB([]byte("foo"))
539         c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
540         c.Check(replicas, Equals, 2)
541         c.Check(err, Equals, nil)
542
543         {
544                 r, n, url2, err := kc.Get(hash)
545                 c.Check(err, Equals, nil)
546                 c.Check(n, Equals, int64(3))
547                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
548
549                 content, err2 := ioutil.ReadAll(r)
550                 c.Check(err2, Equals, nil)
551                 c.Check(content, DeepEquals, []byte("foo"))
552         }
553
554         {
555                 n, url2, err := kc.Ask(hash)
556                 c.Check(err, Equals, nil)
557                 c.Check(n, Equals, int64(3))
558                 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
559         }
560 }