1885: Made ServiceRoots atomically updatable, so that KeepProxy can poll for
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 22 May 2014 18:51:56 +0000 (14:51 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 22 May 2014 18:51:56 +0000 (14:51 -0400)
changes in the services list without disrupting any active flows.

sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go
sdk/go/src/arvados.org/keepclient/support.go
services/keep/src/arvados.org/keepproxy/keepproxy.go
services/keep/src/arvados.org/keepproxy/keepproxy_test.go

index 91989bd4794fcde574a9c0849e9979ad0dc86cb5..dadf8bf740f1f31857bbf9c388a816f652cb973f 100644 (file)
@@ -11,6 +11,10 @@ import (
        "io/ioutil"
        "net/http"
        "os"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "unsafe"
 )
 
 // A Keep "block" is 64MB.
@@ -25,11 +29,12 @@ type KeepClient struct {
        ApiServer     string
        ApiToken      string
        ApiInsecure   bool
-       Service_roots []string
        Want_replicas int
        Client        *http.Client
        Using_proxy   bool
        External      bool
+       service_roots *[]string
+       lock          sync.Mutex
 }
 
 // Create a new KeepClient, initialized with standard Arvados environment
@@ -50,7 +55,7 @@ func MakeKeepClient() (kc KeepClient, err error) {
                Using_proxy: false,
                External:    external}
 
-       err = (&kc).discoverKeepServers()
+       err = (&kc).DiscoverKeepServers()
 
        return kc, err
 }
@@ -207,3 +212,22 @@ func (this KeepClient) AuthorizedAsk(hash string, signature string,
        return 0, "", BlockNotFound
 
 }
+
+// Atomically read the service_roots field.
+func (this *KeepClient) ServiceRoots() []string {
+       r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
+       return *r
+}
+
+// Atomically update the service_roots field.  Enables you to update
+// service_roots without disrupting any GET or PUT operations that might
+// already be in progress.
+func (this *KeepClient) SetServiceRoots(svc []string) {
+       // Must be sorted for ShuffledServiceRoots() to produce consistent
+       // results.
+       roots := make([]string, len(svc))
+       copy(roots, svc)
+       sort.Strings(roots)
+       atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
+               unsafe.Pointer(&roots))
+}
index 600d7393c519190ca849a4ad33fd5594cdec55c7..395603d5ad3b5c3f92363763c2be719fd723d494 100644 (file)
@@ -13,7 +13,6 @@ import (
        "net/http"
        "os"
        "os/exec"
-       "sort"
        "strings"
        "testing"
 )
@@ -75,13 +74,14 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
        c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true)
 
        c.Assert(err, Equals, nil)
-       c.Check(len(kc.Service_roots), Equals, 2)
-       c.Check(kc.Service_roots[0], Equals, "http://localhost:25107")
-       c.Check(kc.Service_roots[1], Equals, "http://localhost:25108")
+       c.Check(len(kc.ServiceRoots()), Equals, 2)
+       c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
+       c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
 }
 
 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
-       kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
+       kc := KeepClient{}
+       kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
 
        // "foo" acbd18db4cc2f85cedef654fccc4a4d8
        foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
@@ -265,16 +265,16 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 
        kc.Want_replicas = 2
        kc.ApiToken = "abc123"
-       kc.Service_roots = make([]string, 5)
+       service_roots := make([]string, 5)
 
        ks := RunSomeFakeKeepServers(st, 5, 2990)
 
        for i := 0; i < len(ks); i += 1 {
-               kc.Service_roots[i] = ks[i].url
+               service_roots[i] = ks[i].url
                defer ks[i].listener.Close()
        }
 
-       sort.Strings(kc.Service_roots)
+       kc.SetServiceRoots(service_roots)
 
        kc.PutB([]byte("foo"))
 
@@ -306,16 +306,16 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 
        kc.Want_replicas = 2
        kc.ApiToken = "abc123"
-       kc.Service_roots = make([]string, 5)
+       service_roots := make([]string, 5)
 
        ks := RunSomeFakeKeepServers(st, 5, 2990)
 
        for i := 0; i < len(ks); i += 1 {
-               kc.Service_roots[i] = ks[i].url
+               service_roots[i] = ks[i].url
                defer ks[i].listener.Close()
        }
 
-       sort.Strings(kc.Service_roots)
+       kc.SetServiceRoots(service_roots)
 
        reader, writer := io.Pipe()
 
@@ -359,21 +359,21 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 
        kc.Want_replicas = 2
        kc.ApiToken = "abc123"
-       kc.Service_roots = make([]string, 5)
+       service_roots := make([]string, 5)
 
        ks1 := RunSomeFakeKeepServers(st, 4, 2990)
        ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
 
        for i, k := range ks1 {
-               kc.Service_roots[i] = k.url
+               service_roots[i] = k.url
                defer k.listener.Close()
        }
        for i, k := range ks2 {
-               kc.Service_roots[len(ks1)+i] = k.url
+               service_roots[len(ks1)+i] = k.url
                defer k.listener.Close()
        }
 
-       sort.Strings(kc.Service_roots)
+       kc.SetServiceRoots(service_roots)
 
        shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
 
@@ -407,21 +407,21 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 
        kc.Want_replicas = 2
        kc.ApiToken = "abc123"
-       kc.Service_roots = make([]string, 5)
+       service_roots := make([]string, 5)
 
        ks1 := RunSomeFakeKeepServers(st, 1, 2990)
        ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
        for i, k := range ks1 {
-               kc.Service_roots[i] = k.url
+               service_roots[i] = k.url
                defer k.listener.Close()
        }
        for i, k := range ks2 {
-               kc.Service_roots[len(ks1)+i] = k.url
+               service_roots[len(ks1)+i] = k.url
                defer k.listener.Close()
        }
 
-       sort.Strings(kc.Service_roots)
+       kc.SetServiceRoots(service_roots)
 
        shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
 
@@ -464,7 +464,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
 
        kc, _ := MakeKeepClient()
        kc.ApiToken = "abc123"
-       kc.Service_roots = []string{url}
+       kc.SetServiceRoots([]string{url})
 
        r, n, url2, err := kc.Get(hash)
        defer r.Close()
@@ -489,7 +489,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 
        kc, _ := MakeKeepClient()
        kc.ApiToken = "abc123"
-       kc.Service_roots = []string{url}
+       kc.SetServiceRoots([]string{url})
 
        r, n, url2, err := kc.Get(hash)
        c.Check(err, Equals, BlockNotFound)
@@ -518,7 +518,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
 
        kc, _ := MakeKeepClient()
        kc.ApiToken = "abc123"
-       kc.Service_roots = []string{url}
+       kc.SetServiceRoots([]string{url})
 
        r, n, _, err := kc.Get(barhash)
        _, err = ioutil.ReadAll(r)
@@ -550,21 +550,21 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
 
        kc, _ := MakeKeepClient()
        kc.ApiToken = "abc123"
-       kc.Service_roots = make([]string, 5)
+       service_roots := make([]string, 5)
 
        ks1 := RunSomeFakeKeepServers(st, 1, 2990)
        ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
 
        for i, k := range ks1 {
-               kc.Service_roots[i] = k.url
+               service_roots[i] = k.url
                defer k.listener.Close()
        }
        for i, k := range ks2 {
-               kc.Service_roots[len(ks1)+i] = k.url
+               service_roots[len(ks1)+i] = k.url
                defer k.listener.Close()
        }
 
-       sort.Strings(kc.Service_roots)
+       kc.SetServiceRoots(service_roots)
 
        r, n, url2, err := kc.Get(hash)
        <-fh.handled
@@ -635,15 +635,17 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
        kc.Want_replicas = 2
        kc.Using_proxy = true
        kc.ApiToken = "abc123"
-       kc.Service_roots = make([]string, 1)
+       service_roots := make([]string, 1)
 
        ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
        for i, k := range ks1 {
-               kc.Service_roots[i] = k.url
+               service_roots[i] = k.url
                defer k.listener.Close()
        }
 
+       kc.SetServiceRoots(service_roots)
+
        _, replicas, err := kc.PutB([]byte("foo"))
        <-st.handled
 
@@ -663,14 +665,15 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
        kc.Want_replicas = 3
        kc.Using_proxy = true
        kc.ApiToken = "abc123"
-       kc.Service_roots = make([]string, 1)
+       service_roots := make([]string, 1)
 
        ks1 := RunSomeFakeKeepServers(st, 1, 2990)
 
        for i, k := range ks1 {
-               kc.Service_roots[i] = k.url
+               service_roots[i] = k.url
                defer k.listener.Close()
        }
+       kc.SetServiceRoots(service_roots)
 
        _, replicas, err := kc.PutB([]byte("foo"))
        <-st.handled
index ef4a8e1133b8674dc84d696cc01b04ae1e6269b4..38669a11dbfbda40c32df35c7702b3c8458a4e2c 100644 (file)
@@ -10,7 +10,6 @@ import (
        "log"
        "net/http"
        "os"
-       "sort"
        "strconv"
 )
 
@@ -21,10 +20,9 @@ type keepDisk struct {
        SvcType  string `json:"service_type"`
 }
 
-func (this *KeepClient) discoverKeepServers() error {
+func (this *KeepClient) DiscoverKeepServers() error {
        if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
-               this.Service_roots = make([]string, 1)
-               this.Service_roots[0] = prx
+               this.SetServiceRoots([]string{prx})
                this.Using_proxy = true
                return nil
        }
@@ -72,7 +70,7 @@ func (this *KeepClient) discoverKeepServers() error {
        }
 
        listed := make(map[string]bool)
-       this.Service_roots = make([]string, 0, len(m.Items))
+       service_roots := make([]string, 0, len(m.Items))
 
        for _, element := range m.Items {
                n := ""
@@ -87,16 +85,14 @@ func (this *KeepClient) discoverKeepServers() error {
                // Skip duplicates
                if !listed[url] {
                        listed[url] = true
-                       this.Service_roots = append(this.Service_roots, url)
+                       service_roots = append(service_roots, url)
                }
                if element.SvcType == "proxy" {
                        this.Using_proxy = true
                }
        }
 
-       // Must be sorted for ShuffledServiceRoots() to produce consistent
-       // results.
-       sort.Strings(this.Service_roots)
+       this.SetServiceRoots(service_roots)
 
        return nil
 }
@@ -111,11 +107,12 @@ func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
        seed := hash
 
        // Keep servers still to be added to the ordering
-       pool := make([]string, len(this.Service_roots))
-       copy(pool, this.Service_roots)
+       service_roots := this.ServiceRoots()
+       pool := make([]string, len(service_roots))
+       copy(pool, service_roots)
 
        // output probe sequence
-       pseq = make([]string, 0, len(this.Service_roots))
+       pseq = make([]string, 0, len(service_roots))
 
        // iterate while there are servers left to be assigned
        for len(pool) > 0 {
index ed33ac9bbd62a79a59e293f9170af9a2b1b3cf2a..b914f47d5cd669ed805d12c28c4932cdc775881c 100644 (file)
@@ -92,8 +92,10 @@ func main() {
                return
        }
 
+       go RefreshServicesList(&kc)
+
        // Start listening for requests.
-       http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
+       http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
 }
 
 type ApiTokenCache struct {
@@ -102,6 +104,14 @@ type ApiTokenCache struct {
        expireTime int64
 }
 
+// Refresh the keep service list every five minutes.
+func RefreshServicesList(kc *keepclient.KeepClient) {
+       for {
+               time.Sleep(300 * time.Second)
+               kc.DiscoverKeepServers()
+       }
+}
+
 // Cache the token and set an expire time.  If we already have an expire time
 // on the token, it is not updated.
 func (this *ApiTokenCache) RememberToken(token string) {
@@ -181,12 +191,12 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
 }
 
 type GetBlockHandler struct {
-       keepclient.KeepClient
+       *keepclient.KeepClient
        *ApiTokenCache
 }
 
 type PutBlockHandler struct {
-       keepclient.KeepClient
+       *keepclient.KeepClient
        *ApiTokenCache
 }
 
@@ -197,7 +207,7 @@ type PutBlockHandler struct {
 func MakeRESTRouter(
        enable_get bool,
        enable_put bool,
-       kc keepclient.KeepClient) *mux.Router {
+       kc *keepclient.KeepClient) *mux.Router {
 
        t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
 
@@ -222,7 +232,9 @@ func MakeRESTRouter(
 
 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 
-       if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+       kc := *this.KeepClient
+
+       if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
                http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
        }
 
@@ -235,10 +247,10 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
        var blocklen int64
 
        if req.Method == "GET" {
-               reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+               reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
                defer reader.Close()
        } else if req.Method == "HEAD" {
-               blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
+               blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
        }
 
        resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
@@ -259,7 +271,9 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
        log.Print("PutBlockHandler start")
 
-       if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+       kc := *this.KeepClient
+
+       if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
                http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
        }
 
@@ -284,12 +298,12 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                var r int
                _, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
                if err != nil {
-                       this.KeepClient.Want_replicas = r
+                       kc.Want_replicas = r
                }
        }
 
        // Now try to put the block through
-       replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
+       replicas, err := kc.PutHR(hash, req.Body, contentLength)
 
        log.Printf("Replicas stored: %v err: %v", replicas, err)
 
index af1377b249c6fca6e811f258f00aad5a965927a1..d8abda73965e77f88cf6cc054dd058b3f3fa2025 100644 (file)
@@ -109,8 +109,8 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        kc, err := keepclient.MakeKeepClient()
        c.Check(kc.External, Equals, true)
        c.Check(kc.Using_proxy, Equals, true)
-       c.Check(len(kc.Service_roots), Equals, 1)
-       c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+       c.Check(len(kc.ServiceRoots()), Equals, 1)
+       c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
        c.Check(err, Equals, nil)
        os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
 
@@ -159,8 +159,8 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
        kc.ApiToken = "123xyz"
        c.Check(kc.External, Equals, true)
        c.Check(kc.Using_proxy, Equals, true)
-       c.Check(len(kc.Service_roots), Equals, 1)
-       c.Check(kc.Service_roots[0], Equals, "http://localhost:29950")
+       c.Check(len(kc.ServiceRoots()), Equals, 1)
+       c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
        c.Check(err, Equals, nil)
        os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")