package keepclient
import (
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
"crypto/md5"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
. "gopkg.in/check.v1"
"io"
"io/ioutil"
"net"
"net/http"
"os"
- "os/exec"
"testing"
)
c.Skip("Skipping tests that require server")
return
}
- os.Chdir(pythonDir())
- {
- cmd := exec.Command("python", "run_test_server.py", "start")
- stderr, err := cmd.StderrPipe()
- if err != nil {
- log.Fatalf("Setting up stderr pipe: %s", err)
- }
- go io.Copy(os.Stderr, stderr)
- if err := cmd.Run(); err != nil {
- panic(fmt.Sprintf("'python run_test_server.py start' returned error %s", err))
- }
- }
- {
- cmd := exec.Command("python", "run_test_server.py", "start_keep")
- stderr, err := cmd.StderrPipe()
- if err != nil {
- log.Fatalf("Setting up stderr pipe: %s", err)
- }
- go io.Copy(os.Stderr, stderr)
- if err := cmd.Run(); err != nil {
- panic(fmt.Sprintf("'python run_test_server.py start_keep' returned error %s", err))
- }
- }
+ arvadostest.StartAPI()
+ arvadostest.StartKeep()
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
- os.Chdir(pythonDir())
- exec.Command("python", "run_test_server.py", "stop_keep").Run()
- exec.Command("python", "run_test_server.py", "stop").Run()
+ if *no_server {
+ return
+ }
+ arvadostest.StopKeep()
+ arvadostest.StopAPI()
}
func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
- os.Setenv("ARVADOS_API_HOST", "localhost:3000")
- os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
- os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
-
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, Equals, nil)
c.Assert(err, Equals, nil)
c.Check(len(kc.ServiceRoots()), Equals, 2)
- c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
- c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
-}
-
-func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
- kc := KeepClient{}
- kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
-
- // "foo" acbd18db4cc2f85cedef654fccc4a4d8
- foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
- c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
-
- // "bar" 37b51d194a7513e45b56f6524f2d51f2
- bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
- c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
+ for _, root := range kc.ServiceRoots() {
+ c.Check(root, Matches, "http://localhost:\\d+")
+ }
}
type StubPutHandler struct {
this.handled <- fmt.Sprintf("http://%s", req.Host)
}
-func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
+func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
var err error
- listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
+ ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
if err != nil {
- panic(fmt.Sprintf("Could not listen on tcp port %v", port))
+ panic(fmt.Sprintf("Could not listen on any port"))
}
-
- url = fmt.Sprintf("http://localhost:%d", port)
-
- go http.Serve(listener, st)
- return listener, url
+ ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
+ go http.Serve(ks.listener, st)
+ return
}
func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
- listener, url := RunBogusKeepServer(st, 2990)
- defer listener.Close()
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
arv, _ := arvadosclient.MakeArvadosClient()
arv.ApiToken = "abc123"
reader, writer := io.Pipe()
upload_status := make(chan uploadStatus)
- f(kc, url, reader, writer, upload_status)
+ f(kc, ks.url, reader, writer, upload_status)
}
func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
func(kc KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
- go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
+ go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
writer.Write([]byte("foo"))
writer.Close()
br1 := tr.MakeStreamReader()
- go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
+ go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
writer.Write([]byte("foo"))
writer.Close()
func(kc KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
- go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
+ go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
writer.Write([]byte("foo"))
writer.Close()
url string
}
-func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
+func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
ks = make([]KeepServer, n)
for i := 0; i < n; i += 1 {
- boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
- ks[i] = KeepServer{boguslistener, bogusurl}
+ ks[i] = RunFakeKeepServer(st)
}
return ks
func (s *StandaloneSuite) TestPutB(c *C) {
log.Printf("TestPutB")
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := Md5String("foo")
st := StubPutHandler{
c,
hash,
"abc123",
"foo",
- make(chan string, 2)}
+ make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
- ks := RunSomeFakeKeepServers(st, 5, 2990)
+ ks := RunSomeFakeKeepServers(st, 5)
- for i := 0; i < len(ks); i += 1 {
- service_roots[i] = ks[i].url
- defer ks[i].listener.Close()
+ for i, k := range ks {
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
kc.PutB([]byte("foo"))
- shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
+ shuff := NewRootSorter(
+ kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
s1 := <-st.handled
s2 := <-st.handled
hash,
"abc123",
"foo",
- make(chan string, 2)}
+ make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
- ks := RunSomeFakeKeepServers(st, 5, 2990)
+ ks := RunSomeFakeKeepServers(st, 5)
- for i := 0; i < len(ks); i += 1 {
- service_roots[i] = ks[i].url
- defer ks[i].listener.Close()
+ for i, k := range ks {
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
kc.PutHR(hash, reader, 3)
- shuff := kc.shuffledServiceRoots(hash)
+ shuff := NewRootSorter(kc.ServiceRoots(), hash).GetSortedRoots()
log.Print(shuff)
s1 := <-st.handled
hash,
"abc123",
"foo",
- make(chan string, 2)}
+ make(chan string, 4)}
fh := FailHandler{
make(chan string, 1)}
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
- ks1 := RunSomeFakeKeepServers(st, 4, 2990)
- ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
+ ks1 := RunSomeFakeKeepServers(st, 4)
+ ks2 := RunSomeFakeKeepServers(fh, 1)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[len(ks1)+i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
- shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
+ shuff := NewRootSorter(
+ kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
phash, replicas, err := kc.PutB([]byte("foo"))
c.Check(err, Equals, nil)
c.Check(phash, Equals, "")
c.Check(replicas, Equals, 2)
- c.Check(<-st.handled, Equals, shuff[1])
- c.Check(<-st.handled, Equals, shuff[2])
+
+ s1 := <-st.handled
+ s2 := <-st.handled
+
+ c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
+ (s1 == shuff[2] && s2 == shuff[1]),
+ Equals,
+ true)
}
func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
- ks1 := RunSomeFakeKeepServers(st, 1, 2990)
- ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
+ ks1 := RunSomeFakeKeepServers(st, 1)
+ ks2 := RunSomeFakeKeepServers(fh, 4)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[len(ks1)+i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
- shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
-
_, replicas, err := kc.PutB([]byte("foo"))
c.Check(err, Equals, InsufficientReplicasError)
c.Check(replicas, Equals, 1)
- c.Check(<-st.handled, Equals, shuff[1])
+ c.Check(<-st.handled, Equals, ks1[0].url)
log.Printf("TestPutWithTooManyFail done")
}
"abc123",
[]byte("foo")}
- listener, url := RunBogusKeepServer(st, 2990)
- defer listener.Close()
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots([]string{url})
+ kc.SetServiceRoots(map[string]string{"x": ks.url})
r, n, url2, err := kc.Get(hash)
defer r.Close()
c.Check(err, Equals, nil)
c.Check(n, Equals, int64(3))
- c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
+ c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
st := FailHandler{make(chan string, 1)}
- listener, url := RunBogusKeepServer(st, 2990)
- defer listener.Close()
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots([]string{url})
+ kc.SetServiceRoots(map[string]string{"x": ks.url})
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
st := BarHandler{make(chan string, 1)}
- listener, url := RunBogusKeepServer(st, 2990)
- defer listener.Close()
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots([]string{url})
+ kc.SetServiceRoots(map[string]string{"x": ks.url})
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
}
func (s *StandaloneSuite) TestGetWithFailures(c *C) {
-
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ content := []byte("waz")
+ hash := fmt.Sprintf("%x", md5.Sum(content))
fh := FailHandler{
- make(chan string, 1)}
+ make(chan string, 4)}
st := StubGetHandler{
c,
hash,
"abc123",
- []byte("foo")}
+ content}
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- service_roots := make([]string, 5)
+ service_roots := make(map[string]string)
- ks1 := RunSomeFakeKeepServers(st, 1, 2990)
- ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
+ ks1 := RunSomeFakeKeepServers(st, 1)
+ ks2 := RunSomeFakeKeepServers(fh, 4)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[len(ks1)+i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)
+ // This test works only if one of the failing services is
+ // attempted before the succeeding service. Otherwise,
+ // <-fh.handled below will just hang! (Probe order depends on
+ // the choice of block content "waz" and the UUIDs of the fake
+ // servers, so we just tried different strings until we found
+ // an example that passes this Assert.)
+ c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
+
r, n, url2, err := kc.Get(hash)
+
<-fh.handled
c.Check(err, Equals, nil)
c.Check(n, Equals, int64(3))
c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
- content, err2 := ioutil.ReadAll(r)
+ read_content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
- c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(read_content, DeepEquals, content)
}
func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
- os.Setenv("ARVADOS_API_HOST", "localhost:3000")
- os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
- os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+ content := []byte("TestPutGetHead")
arv, err := arvadosclient.MakeArvadosClient()
kc, err := MakeKeepClient(&arv)
c.Assert(err, Equals, nil)
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x", md5.Sum(content))
{
n, _, err := kc.Ask(hash)
c.Check(n, Equals, int64(0))
}
{
- hash2, replicas, err := kc.PutB([]byte("foo"))
- c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
+ hash2, replicas, err := kc.PutB(content)
+ c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
c.Check(replicas, Equals, 2)
c.Check(err, Equals, nil)
}
{
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, nil)
- c.Check(n, Equals, int64(3))
- c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+ c.Check(n, Equals, int64(len(content)))
+ c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
- content, err2 := ioutil.ReadAll(r)
+ read_content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
- c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(read_content, DeepEquals, content)
}
{
n, url2, err := kc.Ask(hash)
c.Check(err, Equals, nil)
- c.Check(n, Equals, int64(3))
- c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+ c.Check(n, Equals, int64(len(content)))
+ c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
}
}
kc.Want_replicas = 2
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make([]string, 1)
+ service_roots := make(map[string]string)
- ks1 := RunSomeFakeKeepServers(st, 1, 2990)
+ ks1 := RunSomeFakeKeepServers(st, 1)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
kc.Want_replicas = 3
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make([]string, 1)
+ service_roots := make(map[string]string)
- ks1 := RunSomeFakeKeepServers(st, 1, 2990)
+ ks1 := RunSomeFakeKeepServers(st, 1)
for i, k := range ks1 {
- service_roots[i] = k.url
+ service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
kc.SetServiceRoots(service_roots)