9005: Keep service discovery up to date automatically.
authorTom Clegg <tom@curoverse.com>
Wed, 31 May 2017 18:16:43 +0000 (14:16 -0400)
committerTom Clegg <tom@curoverse.com>
Wed, 31 May 2017 20:14:55 +0000 (16:14 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curoverse.com>

sdk/go/keepclient/discover.go
sdk/go/keepclient/discover_test.go
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
services/keep-web/handler.go
services/keepproxy/keepproxy.go
services/keepstore/pull_worker_integration_test.go
tools/keep-block-check/keep-block-check.go
tools/keep-block-check/keep-block-check_test.go
tools/keep-rsync/keep-rsync.go
tools/keep-rsync/keep-rsync_test.go

index 8889c4bedae23f52d62667ba5b8747ed11c3f67b..c5413d4a4b067c041141b89109afd81a144ed2a6 100644 (file)
@@ -6,99 +6,165 @@ import (
        "log"
        "os"
        "os/signal"
-       "reflect"
        "strings"
+       "sync"
        "syscall"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 )
 
-// DiscoverKeepServers gets list of available keep services from the
-// API server.
-//
-// If a list of services is provided in the arvadosclient (e.g., from
-// an environment variable or local config), that list is used
-// instead.
-func (this *KeepClient) DiscoverKeepServers() error {
-       if this.Arvados.KeepServiceURIs != nil {
-               this.foundNonDiskSvc = true
-               this.replicasPerService = 0
-               roots := make(map[string]string)
-               for i, uri := range this.Arvados.KeepServiceURIs {
-                       roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
-               }
-               this.SetServiceRoots(roots, roots, roots)
-               return nil
+// ClearCache clears the Keep service discovery cache.
+func ClearCache() {
+       svcListCacheMtx.Lock()
+       defer svcListCacheMtx.Unlock()
+       for _, ent := range svcListCache {
+               ent.clear <- struct{}{}
        }
+}
 
-       // ArvadosClient did not provide a services list. Ask API
-       // server for a list of accessible services.
-       var list svcList
-       err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
-       if err != nil {
-               return err
+// ClearCacheOnSIGHUP installs a signal handler that calls
+// ClearCache when SIGHUP is received.
+func ClearCacheOnSIGHUP() {
+       svcListCacheMtx.Lock()
+       defer svcListCacheMtx.Unlock()
+       if svcListCacheSignal != nil {
+               return
        }
-       return this.loadKeepServers(list)
+       svcListCacheSignal = make(chan os.Signal, 1)
+       signal.Notify(svcListCacheSignal, syscall.SIGHUP)
+       go func() {
+               for range svcListCacheSignal {
+                       ClearCache()
+               }
+       }()
 }
 
-// LoadKeepServicesFromJSON gets list of available keep services from given JSON
-func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
-       var list svcList
-
-       // Load keep services from given json
-       dec := json.NewDecoder(strings.NewReader(services))
-       if err := dec.Decode(&list); err != nil {
-               return err
-       }
+var (
+       svcListCache       = map[string]cachedSvcList{}
+       svcListCacheSignal chan os.Signal
+       svcListCacheMtx    sync.Mutex
+)
 
-       return this.loadKeepServers(list)
+type cachedSvcList struct {
+       arv    *arvadosclient.ArvadosClient
+       latest chan svcList
+       clear  chan struct{}
 }
 
-// RefreshServices calls DiscoverKeepServers to refresh the keep
-// service list on SIGHUP; when the given interval has elapsed since
-// the last refresh; and (if the last refresh failed) the given
-// errInterval has elapsed.
-func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
-       var previousRoots = []map[string]string{}
-
-       timer := time.NewTimer(interval)
-       gotHUP := make(chan os.Signal, 1)
-       signal.Notify(gotHUP, syscall.SIGHUP)
+// Check for new services list every few minutes. Send the latest list
+// to the "latest" channel as needed.
+func (ent *cachedSvcList) poll() {
+       wakeup := make(chan struct{})
+
+       replace := make(chan svcList)
+       go func() {
+               wakeup <- struct{}{}
+               current := <-replace
+               for {
+                       select {
+                       case <-ent.clear:
+                               wakeup <- struct{}{}
+                               // Wait here for the next success, in
+                               // order to avoid returning stale
+                               // results on the "latest" channel.
+                               current = <-replace
+                       case current = <-replace:
+                       case ent.latest <- current:
+                       }
+               }
+       }()
 
+       okDelay := 5 * time.Minute
+       errDelay := 3 * time.Second
+       timer := time.NewTimer(okDelay)
        for {
                select {
-               case <-gotHUP:
                case <-timer.C:
+               case <-wakeup:
+                       if !timer.Stop() {
+                               // Lost race stopping timer; skip extra firing
+                               <-timer.C
+                       }
                }
-               timer.Reset(interval)
-
-               if err := kc.DiscoverKeepServers(); err != nil {
-                       log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errInterval)
-                       timer.Reset(errInterval)
+               var next svcList
+               err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
+               if err != nil {
+                       log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
+                       timer.Reset(errDelay)
                        continue
                }
-               newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
+               replace <- next
+               timer.Reset(okDelay)
+       }
+}
+
+// discoverServices gets the list of available keep services from
+// the API server.
+//
+// If a list of services is provided in the arvadosclient (e.g., from
+// an environment variable or local config), that list is used
+// instead.
+//
+// If an API call is made, the result is cached for 5 minutes or until
+// ClearCache() is called, and during this interval it is reused by
+// other KeepClients that use the same API server host.
+func (kc *KeepClient) discoverServices() error {
+       if kc.disableDiscovery {
+               return nil
+       }
 
-               if !reflect.DeepEqual(previousRoots, newRoots) {
-                       DebugPrintf("DEBUG: Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
-                       previousRoots = newRoots
+       if kc.Arvados.KeepServiceURIs != nil {
+               kc.disableDiscovery = true
+               kc.foundNonDiskSvc = true
+               kc.replicasPerService = 0
+               roots := make(map[string]string)
+               for i, uri := range kc.Arvados.KeepServiceURIs {
+                       roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
                }
+               kc.setServiceRoots(roots, roots, roots)
+               return nil
+       }
 
-               if len(newRoots[0]) == 0 {
-                       log.Printf("WARNING: No local services (retrying in %v)", errInterval)
-                       timer.Reset(errInterval)
+       svcListCacheMtx.Lock()
+       cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
+       if !ok {
+               arv := *kc.Arvados
+               cacheEnt = cachedSvcList{
+                       latest: make(chan svcList),
+                       clear:  make(chan struct{}),
+                       arv:    &arv,
                }
+               go cacheEnt.poll()
+               svcListCache[kc.Arvados.ApiServer] = cacheEnt
+       }
+       svcListCacheMtx.Unlock()
+
+       return kc.loadKeepServers(<-cacheEnt.latest)
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from
+// given JSON and disables automatic service discovery.
+func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
+       kc.disableDiscovery = true
+
+       var list svcList
+       dec := json.NewDecoder(strings.NewReader(services))
+       if err := dec.Decode(&list); err != nil {
+               return err
        }
+
+       return kc.loadKeepServers(list)
 }
 
-// loadKeepServers
-func (this *KeepClient) loadKeepServers(list svcList) error {
+func (kc *KeepClient) loadKeepServers(list svcList) error {
        listed := make(map[string]bool)
        localRoots := make(map[string]string)
        gatewayRoots := make(map[string]string)
        writableLocalRoots := make(map[string]string)
 
        // replicasPerService is 1 for disks; unknown or unlimited otherwise
-       this.replicasPerService = 1
+       kc.replicasPerService = 1
 
        for _, service := range list.Items {
                scheme := "http"
@@ -117,12 +183,12 @@ func (this *KeepClient) loadKeepServers(list svcList) error {
                if service.ReadOnly == false {
                        writableLocalRoots[service.Uuid] = url
                        if service.SvcType != "disk" {
-                               this.replicasPerService = 0
+                               kc.replicasPerService = 0
                        }
                }
 
                if service.SvcType != "disk" {
-                       this.foundNonDiskSvc = true
+                       kc.foundNonDiskSvc = true
                }
 
                // Gateway services are only used when specified by
@@ -133,6 +199,6 @@ func (this *KeepClient) loadKeepServers(list svcList) error {
                gatewayRoots[service.Uuid] = url
        }
 
-       this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
+       kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
        return nil
 }
index 379d44c820aec0e0b88d84b3c52e5fd316480844..4065ce342e43dfaa5172ca48f4a3ec3282045296 100644 (file)
@@ -3,28 +3,15 @@ package keepclient
 import (
        "crypto/md5"
        "fmt"
-       "gopkg.in/check.v1"
        "net/http"
        "os"
-       "time"
+
+       "gopkg.in/check.v1"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 )
 
-func ExampleKeepClient_RefreshServices() {
-       arv, err := arvadosclient.MakeArvadosClient()
-       if err != nil {
-               panic(err)
-       }
-       kc, err := MakeKeepClient(arv)
-       if err != nil {
-               panic(err)
-       }
-       go kc.RefreshServices(5*time.Minute, 3*time.Second)
-       fmt.Printf("LocalRoots: %#v\n", kc.LocalRoots())
-}
-
 func (s *ServerRequiredSuite) TestOverrideDiscovery(c *check.C) {
        defer os.Setenv("ARVADOS_KEEP_SERVICES", "")
 
index 76ea17517fc6d2b59e0c96e2a33fb3561fcd38df..029c6ee7f3a5834a8af149b64917aff2d5dc4bcb 100644 (file)
@@ -88,9 +88,9 @@ type HTTPClient interface {
 type KeepClient struct {
        Arvados            *arvadosclient.ArvadosClient
        Want_replicas      int
-       localRoots         *map[string]string
-       writableLocalRoots *map[string]string
-       gatewayRoots       *map[string]string
+       localRoots         map[string]string
+       writableLocalRoots map[string]string
+       gatewayRoots       map[string]string
        lock               sync.RWMutex
        HTTPClient         HTTPClient
        Retries            int
@@ -101,6 +101,9 @@ type KeepClient struct {
 
        // Any non-disk typed services found in the list of keepservers?
        foundNonDiskSvc bool
+
+       // Disable automatic discovery of keep services
+       disableDiscovery bool
 }
 
 // MakeKeepClient creates a new KeepClient, calls
@@ -108,12 +111,11 @@ type KeepClient struct {
 // use.
 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
        kc := New(arv)
-       return kc, kc.DiscoverKeepServers()
+       return kc, kc.discoverServices()
 }
 
-// New creates a new KeepClient. The caller must call
-// DiscoverKeepServers() before using the returned client to read or
-// write data.
+// New creates a new KeepClient. Service discovery will occur on the
+// next read/write operation.
 func New(arv *arvadosclient.ArvadosClient) *KeepClient {
        defaultReplicationLevel := 2
        value, err := arv.Discovery("defaultCollectionReplication")
@@ -349,55 +351,47 @@ func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error
 // LocalRoots() returns the map of local (i.e., disk and proxy) Keep
 // services: uuid -> baseURI.
 func (kc *KeepClient) LocalRoots() map[string]string {
+       kc.discoverServices()
        kc.lock.RLock()
        defer kc.lock.RUnlock()
-       return *kc.localRoots
+       return kc.localRoots
 }
 
 // GatewayRoots() returns the map of Keep remote gateway services:
 // uuid -> baseURI.
 func (kc *KeepClient) GatewayRoots() map[string]string {
+       kc.discoverServices()
        kc.lock.RLock()
        defer kc.lock.RUnlock()
-       return *kc.gatewayRoots
+       return kc.gatewayRoots
 }
 
 // WritableLocalRoots() returns the map of writable local Keep services:
 // uuid -> baseURI.
 func (kc *KeepClient) WritableLocalRoots() map[string]string {
+       kc.discoverServices()
        kc.lock.RLock()
        defer kc.lock.RUnlock()
-       return *kc.writableLocalRoots
+       return kc.writableLocalRoots
 }
 
-// SetServiceRoots updates the localRoots and gatewayRoots maps,
-// without risk of disrupting operations that are already in progress.
+// SetServiceRoots disables service discovery and updates the
+// localRoots and gatewayRoots maps, without disrupting operations
+// that are already in progress.
 //
-// The KeepClient makes its own copy of the supplied maps, so the
-// caller can reuse/modify them after SetServiceRoots returns, but
-// they should not be modified by any other goroutine while
-// SetServiceRoots is running.
-func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) {
-       locals := make(map[string]string)
-       for uuid, root := range newLocals {
-               locals[uuid] = root
-       }
-
-       writables := make(map[string]string)
-       for uuid, root := range newWritableLocals {
-               writables[uuid] = root
-       }
-
-       gateways := make(map[string]string)
-       for uuid, root := range newGateways {
-               gateways[uuid] = root
-       }
+// The supplied maps must not be modified after calling
+// SetServiceRoots.
+func (kc *KeepClient) SetServiceRoots(locals, writables, gateways map[string]string) {
+       kc.disableDiscovery = true
+       kc.setServiceRoots(locals, writables, gateways)
+}
 
+func (kc *KeepClient) setServiceRoots(locals, writables, gateways map[string]string) {
        kc.lock.Lock()
        defer kc.lock.Unlock()
-       kc.localRoots = &locals
-       kc.writableLocalRoots = &writables
-       kc.gatewayRoots = &gateways
+       kc.localRoots = locals
+       kc.writableLocalRoots = writables
+       kc.gatewayRoots = gateways
 }
 
 // getSortedRoots returns a list of base URIs of Keep services, in the
index d2b84e94259892bcba2efe07e1952126530c8f7c..ff0f57bbe6ce58c091e9c5a94b98cda67e077424 100644 (file)
@@ -35,6 +35,10 @@ type ServerRequiredSuite struct{}
 // Standalone tests
 type StandaloneSuite struct{}
 
+func (s *StandaloneSuite) SetUpTest(c *C) {
+       ClearCache()
+}
+
 func pythonDir() string {
        cwd, _ := os.Getwd()
        return fmt.Sprintf("%s/../../python/tests", cwd)
@@ -50,6 +54,10 @@ func (s *ServerRequiredSuite) TearDownSuite(c *C) {
        arvadostest.StopAPI()
 }
 
+func (s *ServerRequiredSuite) SetUpTest(c *C) {
+       ClearCache()
+}
+
 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        c.Assert(err, Equals, nil)
@@ -1067,12 +1075,14 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
-       kc, _ := MakeKeepClient(arv)
+       c.Assert(err, IsNil)
+       kc, err := MakeKeepClient(arv)
+       c.Assert(err, IsNil)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", "")
-       c.Check(err, Equals, nil)
+       c.Check(err, IsNil)
 
        content, err2 := ioutil.ReadAll(r)
        c.Check(err2, Equals, nil)
@@ -1098,7 +1108,7 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", hash[0:3])
-       c.Check(err, Equals, nil)
+       c.Assert(err, Equals, nil)
 
        content, err2 := ioutil.ReadAll(r)
        c.Check(err2, Equals, nil)
@@ -1237,6 +1247,7 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
                &blobKeepService)
        c.Assert(err, Equals, nil)
        defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
+       ClearCache()
 
        // Make a keepclient and ensure that the testblobstore is included
        kc, err := MakeKeepClient(arv)
index 85ec93b8d9b325f00029613902d3dbda9a125adf..4ec0a38f059cd95781e2e4e215939891383965ed 100644 (file)
@@ -64,6 +64,7 @@ func parseCollectionIDFromURL(s string) string {
 
 func (h *handler) setup() {
        h.clientPool = arvadosclient.MakeClientPool()
+       keepclient.ClearCacheOnSIGHUP()
 }
 
 // ServeHTTP implements http.Handler.
index f9239a0f15648b0078a8ea8a00fce3dfc27d9366..a7aa0e69bfefc960d998a4d0dd976b25056e7c7d 100644 (file)
@@ -133,7 +133,6 @@ func main() {
        if cfg.DefaultReplicas > 0 {
                kc.Want_replicas = cfg.DefaultReplicas
        }
-       go kc.RefreshServices(5*time.Minute, 3*time.Second)
 
        listener, err = net.Listen("tcp", cfg.Listen)
        if err != nil {
index c0a7c6f6a523725098596e5fce89fcbffa7fdc94..c0ea4fae210bd4d22daede36840d5d9d18181002 100644 (file)
@@ -29,24 +29,23 @@ func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTe
        // start api and keep servers
        arvadostest.StartAPI()
        arvadostest.StartKeep(2, false)
+       keepclient.ClearCache()
 
        // make arvadosclient
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               t.Error("Error creating arv")
+               t.Fatalf("Error creating arv: %s", err)
        }
 
        // keep client
-       keepClient = &keepclient.KeepClient{
-               Arvados:       arv,
-               Want_replicas: 1,
+       keepClient, err = keepclient.MakeKeepClient(arv)
+       if err != nil {
+               t.Fatalf("error creating KeepClient: %s", err)
        }
+       keepClient.Want_replicas = 1
 
        // discover keep services
        var servers []string
-       if err := keepClient.DiscoverKeepServers(); err != nil {
-               t.Error("Error discovering keep services")
-       }
        for _, host := range keepClient.LocalRoots() {
                servers = append(servers, host)
        }
index 6cf11a728075c60990c1b049c0fb916cea5cd5d5..e57a9abed4b336ca345808ec85bcbf8733b02a08 100644 (file)
@@ -5,8 +5,6 @@ import (
        "errors"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io/ioutil"
        "log"
        "net/http"
@@ -14,6 +12,9 @@ import (
        "regexp"
        "strings"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
 func main() {
@@ -153,7 +154,7 @@ func setupKeepClient(config apiConfig, keepServicesJSON string, blobSignatureTTL
                External: config.ExternalClient,
        }
 
-       // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+       // If keepServicesJSON is provided, use it instead of service discovery
        if keepServicesJSON == "" {
                kc, err = keepclient.MakeKeepClient(&arv)
                if err != nil {
index e49fe68616626275f00ba5bce21877dd969207dd..60c5fb51d135dd9e58f87e106457978ea2aa44ef 100644 (file)
@@ -64,6 +64,7 @@ func (s *DoMainTestSuite) SetUpSuite(c *C) {
 func (s *DoMainTestSuite) SetUpTest(c *C) {
        logOutput := io.MultiWriter(&logBuffer)
        log.SetOutput(logOutput)
+       keepclient.ClearCache()
 }
 
 func (s *DoMainTestSuite) TearDownTest(c *C) {
@@ -89,6 +90,8 @@ func setupKeepBlockCheckWithTTL(c *C, enforcePermissions bool, keepServicesJSON
        kc, ttl, err = setupKeepClient(config, keepServicesJSON, ttl)
        c.Assert(ttl, Equals, blobSignatureTTL)
        c.Check(err, IsNil)
+
+       keepclient.ClearCache()
 }
 
 // Setup test data
@@ -144,9 +147,8 @@ func setupBlockHashFile(c *C, name string, blocks []string) string {
 
 func checkErrorLog(c *C, blocks []string, prefix, suffix string) {
        for _, hash := range blocks {
-               expected := prefix + `.*` + hash + `.*` + suffix
-               match, _ := regexp.MatchString(expected, logBuffer.String())
-               c.Assert(match, Equals, true)
+               expected := `(?ms).*` + prefix + `.*` + hash + `.*` + suffix + `.*`
+               c.Check(logBuffer.String(), Matches, expected)
        }
 }
 
index c6e7665caa2a312c327b8a603159a7da07941450..d21a624f64d6ae855eec513777e55987f8fcc452 100644 (file)
@@ -6,8 +6,6 @@ import (
        "errors"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io/ioutil"
        "log"
        "net/http"
@@ -15,6 +13,9 @@ import (
        "regexp"
        "strings"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
 func main() {
@@ -170,7 +171,7 @@ func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, repl
                External: config.ExternalClient,
        }
 
-       // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+       // If keepServicesJSON is provided, use it instead of service discovery
        if keepServicesJSON == "" {
                kc, err = keepclient.MakeKeepClient(&arv)
                if err != nil {
index 09609eb7498bb8dc28d95bc41892f4cca9ec8563..6f128c69a1a6a1c52d16b7c5d95bd7560f37ab5d 100644 (file)
@@ -58,6 +58,7 @@ func (s *ServerRequiredSuite) SetUpTest(c *C) {
        dstKeepServicesJSON = ""
        kcSrc = &keepclient.KeepClient{}
        kcDst = &keepclient.KeepClient{}
+       keepclient.ClearCache()
 }
 
 func (s *ServerRequiredSuite) TearDownTest(c *C) {
@@ -65,6 +66,7 @@ func (s *ServerRequiredSuite) TearDownTest(c *C) {
 }
 
 func (s *DoMainTestSuite) SetUpTest(c *C) {
+       keepclient.ClearCache()
        args := []string{"keep-rsync"}
        os.Args = args
 }