"net/http"
"os"
"os/exec"
- "sort"
"strings"
"testing"
)
c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
c.Assert(err, Equals, nil)
- c.Check(len(kc.Service_roots), Equals, 2)
- c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
- c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
+ c.Check(len(kc.ServiceRoots()), Equals, 2)
+ c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
+ c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
}
func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
- kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
+ kc := KeepClient{}
+ kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
// "foo" acbd18db4cc2f85cedef654fccc4a4d8
foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- kc.Service_roots[i] = ks[i].url
+ service_roots[i] = ks[i].url
defer ks[i].listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
kc.PutB([]byte("foo"))
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks := RunSomeFakeKeepServers(st, 5, 2990)
for i := 0; i < len(ks); i += 1 {
- kc.Service_roots[i] = ks[i].url
+ service_roots[i] = ks[i].url
defer ks[i].listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
reader, writer := io.Pipe()
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 4, 2990)
ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
kc.Want_replicas = 2
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, url2, err := kc.Get(hash)
defer r.Close()
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = []string{url}
+ kc.SetServiceRoots([]string{url})
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
kc, _ := MakeKeepClient()
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 5)
+ service_roots := make([]string, 5)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- kc.Service_roots[len(ks1)+i] = k.url
+ service_roots[len(ks1)+i] = k.url
defer k.listener.Close()
}
- sort.Strings(kc.Service_roots)
+ kc.SetServiceRoots(service_roots)
r, n, url2, err := kc.Get(hash)
<-fh.handled
kc.Want_replicas = 2
kc.Using_proxy = true
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 1)
+ service_roots := make([]string, 1)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
+ kc.SetServiceRoots(service_roots)
+
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
kc.Want_replicas = 3
kc.Using_proxy = true
kc.ApiToken = "abc123"
- kc.Service_roots = make([]string, 1)
+ service_roots := make([]string, 1)
ks1 := RunSomeFakeKeepServers(st, 1, 2990)
for i, k := range ks1 {
- kc.Service_roots[i] = k.url
+ service_roots[i] = k.url
defer k.listener.Close()
}
+ kc.SetServiceRoots(service_roots)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
"log"
"net/http"
"os"
- "sort"
"strconv"
)
SvcType string `json:"service_type"`
}
-func (this *KeepClient) discoverKeepServers() error {
+func (this *KeepClient) DiscoverKeepServers() error {
if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
- this.Service_roots = make([]string, 1)
- this.Service_roots[0] = prx
+ this.SetServiceRoots([]string{prx})
this.Using_proxy = true
return nil
}
}
listed := make(map[string]bool)
- this.Service_roots = make([]string, 0, len(m.Items))
+ service_roots := make([]string, 0, len(m.Items))
for _, element := range m.Items {
n := ""
// Skip duplicates
if !listed[url] {
listed[url] = true
- this.Service_roots = append(this.Service_roots, url)
+ service_roots = append(service_roots, url)
}
if element.SvcType == "proxy" {
this.Using_proxy = true
}
}
- // Must be sorted for ShuffledServiceRoots() to produce consistent
- // results.
- sort.Strings(this.Service_roots)
+ this.SetServiceRoots(service_roots)
return nil
}
seed := hash
// Keep servers still to be added to the ordering
- pool := make([]string, len(this.Service_roots))
- copy(pool, this.Service_roots)
+ service_roots := this.ServiceRoots()
+ pool := make([]string, len(service_roots))
+ copy(pool, service_roots)
// output probe sequence
- pseq = make([]string, 0, len(this.Service_roots))
+ pseq = make([]string, 0, len(service_roots))
// iterate while there are servers left to be assigned
for len(pool) > 0 {
return
}
+ go RefreshServicesList(&kc)
+
// Start listening for requests.
- http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
+ http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
}
type ApiTokenCache struct {
expireTime int64
}
+// Refresh the keep service list every five minutes.
+func RefreshServicesList(kc *keepclient.KeepClient) {
+ for {
+ time.Sleep(300 * time.Second)
+ kc.DiscoverKeepServers()
+ }
+}
+
// Cache the token and set an expire time. If we already have an expire time
// on the token, it is not updated.
func (this *ApiTokenCache) RememberToken(token string) {
}
type GetBlockHandler struct {
- keepclient.KeepClient
+ *keepclient.KeepClient
*ApiTokenCache
}
type PutBlockHandler struct {
- keepclient.KeepClient
+ *keepclient.KeepClient
*ApiTokenCache
}
func MakeRESTRouter(
enable_get bool,
enable_put bool,
- kc keepclient.KeepClient) *mux.Router {
+ kc *keepclient.KeepClient) *mux.Router {
t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+ kc := *this.KeepClient
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
}
var blocklen int64
if req.Method == "GET" {
- reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+ reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
defer reader.Close()
} else if req.Method == "HEAD" {
- blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
+ blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
}
resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
log.Print("PutBlockHandler start")
- if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+ kc := *this.KeepClient
+
+ if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
}
var r int
_, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
if err != nil {
- this.KeepClient.Want_replicas = r
+ kc.Want_replicas = r
}
}
// Now try to put the block through
- replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
+ replicas, err := kc.PutHR(hash, req.Body, contentLength)
log.Printf("Replicas stored: %v err: %v", replicas, err)