From: Tom Clegg Date: Wed, 31 May 2017 18:16:43 +0000 (-0400) Subject: 9005: Keep service discovery up to date automatically. X-Git-Tag: 1.1.0~215^2~6 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/7c7dc19ffa0a20fff6d97e51c874bfaca9596b24 9005: Keep service discovery up to date automatically. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/sdk/go/keepclient/discover.go b/sdk/go/keepclient/discover.go index 8889c4beda..c5413d4a4b 100644 --- a/sdk/go/keepclient/discover.go +++ b/sdk/go/keepclient/discover.go @@ -6,99 +6,165 @@ import ( "log" "os" "os/signal" - "reflect" "strings" + "sync" "syscall" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" ) -// DiscoverKeepServers gets list of available keep services from the -// API server. -// -// If a list of services is provided in the arvadosclient (e.g., from -// an environment variable or local config), that list is used -// instead. -func (this *KeepClient) DiscoverKeepServers() error { - if this.Arvados.KeepServiceURIs != nil { - this.foundNonDiskSvc = true - this.replicasPerService = 0 - roots := make(map[string]string) - for i, uri := range this.Arvados.KeepServiceURIs { - roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri - } - this.SetServiceRoots(roots, roots, roots) - return nil +// ClearCache clears the Keep service discovery cache. +func ClearCache() { + svcListCacheMtx.Lock() + defer svcListCacheMtx.Unlock() + for _, ent := range svcListCache { + ent.clear <- struct{}{} } +} - // ArvadosClient did not provide a services list. Ask API - // server for a list of accessible services. - var list svcList - err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list) - if err != nil { - return err +// ClearCacheOnSIGHUP installs a signal handler that calls +// ClearCache when SIGHUP is received. +func ClearCacheOnSIGHUP() { + svcListCacheMtx.Lock() + defer svcListCacheMtx.Unlock() + if svcListCacheSignal != nil { + return } - return this.loadKeepServers(list) + svcListCacheSignal = make(chan os.Signal, 1) + signal.Notify(svcListCacheSignal, syscall.SIGHUP) + go func() { + for range svcListCacheSignal { + ClearCache() + } + }() } -// LoadKeepServicesFromJSON gets list of available keep services from given JSON -func (this *KeepClient) LoadKeepServicesFromJSON(services string) error { - var list svcList - - // Load keep services from given json - dec := json.NewDecoder(strings.NewReader(services)) - if err := dec.Decode(&list); err != nil { - return err - } +var ( + svcListCache = map[string]cachedSvcList{} + svcListCacheSignal chan os.Signal + svcListCacheMtx sync.Mutex +) - return this.loadKeepServers(list) +type cachedSvcList struct { + arv *arvadosclient.ArvadosClient + latest chan svcList + clear chan struct{} } -// RefreshServices calls DiscoverKeepServers to refresh the keep -// service list on SIGHUP; when the given interval has elapsed since -// the last refresh; and (if the last refresh failed) the given -// errInterval has elapsed. -func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) { - var previousRoots = []map[string]string{} - - timer := time.NewTimer(interval) - gotHUP := make(chan os.Signal, 1) - signal.Notify(gotHUP, syscall.SIGHUP) +// Check for new services list every few minutes. Send the latest list +// to the "latest" channel as needed. +func (ent *cachedSvcList) poll() { + wakeup := make(chan struct{}) + + replace := make(chan svcList) + go func() { + wakeup <- struct{}{} + current := <-replace + for { + select { + case <-ent.clear: + wakeup <- struct{}{} + // Wait here for the next success, in + // order to avoid returning stale + // results on the "latest" channel. + current = <-replace + case current = <-replace: + case ent.latest <- current: + } + } + }() + okDelay := 5 * time.Minute + errDelay := 3 * time.Second + timer := time.NewTimer(okDelay) for { select { - case <-gotHUP: case <-timer.C: + case <-wakeup: + if !timer.Stop() { + // Lost race stopping timer; skip extra firing + <-timer.C + } } - timer.Reset(interval) - - if err := kc.DiscoverKeepServers(); err != nil { - log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errInterval) - timer.Reset(errInterval) + var next svcList + err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next) + if err != nil { + log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay) + timer.Reset(errDelay) continue } - newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()} + replace <- next + timer.Reset(okDelay) + } +} + +// discoverServices gets the list of available keep services from +// the API server. +// +// If a list of services is provided in the arvadosclient (e.g., from +// an environment variable or local config), that list is used +// instead. +// +// If an API call is made, the result is cached for 5 minutes or until +// ClearCache() is called, and during this interval it is reused by +// other KeepClients that use the same API server host. +func (kc *KeepClient) discoverServices() error { + if kc.disableDiscovery { + return nil + } - if !reflect.DeepEqual(previousRoots, newRoots) { - DebugPrintf("DEBUG: Updated services list: locals %v gateways %v", newRoots[0], newRoots[1]) - previousRoots = newRoots + if kc.Arvados.KeepServiceURIs != nil { + kc.disableDiscovery = true + kc.foundNonDiskSvc = true + kc.replicasPerService = 0 + roots := make(map[string]string) + for i, uri := range kc.Arvados.KeepServiceURIs { + roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri } + kc.setServiceRoots(roots, roots, roots) + return nil + } - if len(newRoots[0]) == 0 { - log.Printf("WARNING: No local services (retrying in %v)", errInterval) - timer.Reset(errInterval) + svcListCacheMtx.Lock() + cacheEnt, ok := svcListCache[kc.Arvados.ApiServer] + if !ok { + arv := *kc.Arvados + cacheEnt = cachedSvcList{ + latest: make(chan svcList), + clear: make(chan struct{}), + arv: &arv, } + go cacheEnt.poll() + svcListCache[kc.Arvados.ApiServer] = cacheEnt + } + svcListCacheMtx.Unlock() + + return kc.loadKeepServers(<-cacheEnt.latest) +} + +// LoadKeepServicesFromJSON gets list of available keep services from +// given JSON and disables automatic service discovery. +func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error { + kc.disableDiscovery = true + + var list svcList + dec := json.NewDecoder(strings.NewReader(services)) + if err := dec.Decode(&list); err != nil { + return err } + + return kc.loadKeepServers(list) } -// loadKeepServers -func (this *KeepClient) loadKeepServers(list svcList) error { +func (kc *KeepClient) loadKeepServers(list svcList) error { listed := make(map[string]bool) localRoots := make(map[string]string) gatewayRoots := make(map[string]string) writableLocalRoots := make(map[string]string) // replicasPerService is 1 for disks; unknown or unlimited otherwise - this.replicasPerService = 1 + kc.replicasPerService = 1 for _, service := range list.Items { scheme := "http" @@ -117,12 +183,12 @@ func (this *KeepClient) loadKeepServers(list svcList) error { if service.ReadOnly == false { writableLocalRoots[service.Uuid] = url if service.SvcType != "disk" { - this.replicasPerService = 0 + kc.replicasPerService = 0 } } if service.SvcType != "disk" { - this.foundNonDiskSvc = true + kc.foundNonDiskSvc = true } // Gateway services are only used when specified by @@ -133,6 +199,6 @@ func (this *KeepClient) loadKeepServers(list svcList) error { gatewayRoots[service.Uuid] = url } - this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots) + kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots) return nil } diff --git a/sdk/go/keepclient/discover_test.go b/sdk/go/keepclient/discover_test.go index 379d44c820..4065ce342e 100644 --- a/sdk/go/keepclient/discover_test.go +++ b/sdk/go/keepclient/discover_test.go @@ -3,28 +3,15 @@ package keepclient import ( "crypto/md5" "fmt" - "gopkg.in/check.v1" "net/http" "os" - "time" + + "gopkg.in/check.v1" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" ) -func ExampleKeepClient_RefreshServices() { - arv, err := arvadosclient.MakeArvadosClient() - if err != nil { - panic(err) - } - kc, err := MakeKeepClient(arv) - if err != nil { - panic(err) - } - go kc.RefreshServices(5*time.Minute, 3*time.Second) - fmt.Printf("LocalRoots: %#v\n", kc.LocalRoots()) -} - func (s *ServerRequiredSuite) TestOverrideDiscovery(c *check.C) { defer os.Setenv("ARVADOS_KEEP_SERVICES", "") diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index 76ea17517f..029c6ee7f3 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -88,9 +88,9 @@ type HTTPClient interface { type KeepClient struct { Arvados *arvadosclient.ArvadosClient Want_replicas int - localRoots *map[string]string - writableLocalRoots *map[string]string - gatewayRoots *map[string]string + localRoots map[string]string + writableLocalRoots map[string]string + gatewayRoots map[string]string lock sync.RWMutex HTTPClient HTTPClient Retries int @@ -101,6 +101,9 @@ type KeepClient struct { // Any non-disk typed services found in the list of keepservers? foundNonDiskSvc bool + + // Disable automatic discovery of keep services + disableDiscovery bool } // MakeKeepClient creates a new KeepClient, calls @@ -108,12 +111,11 @@ type KeepClient struct { // use. func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) { kc := New(arv) - return kc, kc.DiscoverKeepServers() + return kc, kc.discoverServices() } -// New creates a new KeepClient. The caller must call -// DiscoverKeepServers() before using the returned client to read or -// write data. +// New creates a new KeepClient. Service discovery will occur on the +// next read/write operation. func New(arv *arvadosclient.ArvadosClient) *KeepClient { defaultReplicationLevel := 2 value, err := arv.Discovery("defaultCollectionReplication") @@ -349,55 +351,47 @@ func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error // LocalRoots() returns the map of local (i.e., disk and proxy) Keep // services: uuid -> baseURI. func (kc *KeepClient) LocalRoots() map[string]string { + kc.discoverServices() kc.lock.RLock() defer kc.lock.RUnlock() - return *kc.localRoots + return kc.localRoots } // GatewayRoots() returns the map of Keep remote gateway services: // uuid -> baseURI. func (kc *KeepClient) GatewayRoots() map[string]string { + kc.discoverServices() kc.lock.RLock() defer kc.lock.RUnlock() - return *kc.gatewayRoots + return kc.gatewayRoots } // WritableLocalRoots() returns the map of writable local Keep services: // uuid -> baseURI. func (kc *KeepClient) WritableLocalRoots() map[string]string { + kc.discoverServices() kc.lock.RLock() defer kc.lock.RUnlock() - return *kc.writableLocalRoots + return kc.writableLocalRoots } -// SetServiceRoots updates the localRoots and gatewayRoots maps, -// without risk of disrupting operations that are already in progress. +// SetServiceRoots disables service discovery and updates the +// localRoots and gatewayRoots maps, without disrupting operations +// that are already in progress. // -// The KeepClient makes its own copy of the supplied maps, so the -// caller can reuse/modify them after SetServiceRoots returns, but -// they should not be modified by any other goroutine while -// SetServiceRoots is running. -func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) { - locals := make(map[string]string) - for uuid, root := range newLocals { - locals[uuid] = root - } - - writables := make(map[string]string) - for uuid, root := range newWritableLocals { - writables[uuid] = root - } - - gateways := make(map[string]string) - for uuid, root := range newGateways { - gateways[uuid] = root - } +// The supplied maps must not be modified after calling +// SetServiceRoots. +func (kc *KeepClient) SetServiceRoots(locals, writables, gateways map[string]string) { + kc.disableDiscovery = true + kc.setServiceRoots(locals, writables, gateways) +} +func (kc *KeepClient) setServiceRoots(locals, writables, gateways map[string]string) { kc.lock.Lock() defer kc.lock.Unlock() - kc.localRoots = &locals - kc.writableLocalRoots = &writables - kc.gatewayRoots = &gateways + kc.localRoots = locals + kc.writableLocalRoots = writables + kc.gatewayRoots = gateways } // getSortedRoots returns a list of base URIs of Keep services, in the diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index d2b84e9425..ff0f57bbe6 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -35,6 +35,10 @@ type ServerRequiredSuite struct{} // Standalone tests type StandaloneSuite struct{} +func (s *StandaloneSuite) SetUpTest(c *C) { + ClearCache() +} + func pythonDir() string { cwd, _ := os.Getwd() return fmt.Sprintf("%s/../../python/tests", cwd) @@ -50,6 +54,10 @@ func (s *ServerRequiredSuite) TearDownSuite(c *C) { arvadostest.StopAPI() } +func (s *ServerRequiredSuite) SetUpTest(c *C) { + ClearCache() +} + func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { arv, err := arvadosclient.MakeArvadosClient() c.Assert(err, Equals, nil) @@ -1067,12 +1075,14 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) { defer ks.listener.Close() arv, err := arvadosclient.MakeArvadosClient() - kc, _ := MakeKeepClient(arv) + c.Assert(err, IsNil) + kc, err := MakeKeepClient(arv) + c.Assert(err, IsNil) arv.ApiToken = "abc123" kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", "") - c.Check(err, Equals, nil) + c.Check(err, IsNil) content, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) @@ -1098,7 +1108,7 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) { kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil) r, err := kc.GetIndex("x", hash[0:3]) - c.Check(err, Equals, nil) + c.Assert(err, Equals, nil) content, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) @@ -1237,6 +1247,7 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { &blobKeepService) c.Assert(err, Equals, nil) defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }() + ClearCache() // Make a keepclient and ensure that the testblobstore is included kc, err := MakeKeepClient(arv) diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go index 85ec93b8d9..4ec0a38f05 100644 --- a/services/keep-web/handler.go +++ b/services/keep-web/handler.go @@ -64,6 +64,7 @@ func parseCollectionIDFromURL(s string) string { func (h *handler) setup() { h.clientPool = arvadosclient.MakeClientPool() + keepclient.ClearCacheOnSIGHUP() } // ServeHTTP implements http.Handler. diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index f9239a0f15..a7aa0e69bf 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -133,7 +133,6 @@ func main() { if cfg.DefaultReplicas > 0 { kc.Want_replicas = cfg.DefaultReplicas } - go kc.RefreshServices(5*time.Minute, 3*time.Second) listener, err = net.Listen("tcp", cfg.Listen) if err != nil { diff --git a/services/keepstore/pull_worker_integration_test.go b/services/keepstore/pull_worker_integration_test.go index c0a7c6f6a5..c0ea4fae21 100644 --- a/services/keepstore/pull_worker_integration_test.go +++ b/services/keepstore/pull_worker_integration_test.go @@ -29,24 +29,23 @@ func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTe // start api and keep servers arvadostest.StartAPI() arvadostest.StartKeep(2, false) + keepclient.ClearCache() // make arvadosclient arv, err := arvadosclient.MakeArvadosClient() if err != nil { - t.Error("Error creating arv") + t.Fatalf("Error creating arv: %s", err) } // keep client - keepClient = &keepclient.KeepClient{ - Arvados: arv, - Want_replicas: 1, + keepClient, err = keepclient.MakeKeepClient(arv) + if err != nil { + t.Fatalf("error creating KeepClient: %s", err) } + keepClient.Want_replicas = 1 // discover keep services var servers []string - if err := keepClient.DiscoverKeepServers(); err != nil { - t.Error("Error discovering keep services") - } for _, host := range keepClient.LocalRoots() { servers = append(servers, host) } diff --git a/tools/keep-block-check/keep-block-check.go b/tools/keep-block-check/keep-block-check.go index 6cf11a7280..e57a9abed4 100644 --- a/tools/keep-block-check/keep-block-check.go +++ b/tools/keep-block-check/keep-block-check.go @@ -5,8 +5,6 @@ import ( "errors" "flag" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" "io/ioutil" "log" "net/http" @@ -14,6 +12,9 @@ import ( "regexp" "strings" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" ) func main() { @@ -153,7 +154,7 @@ func setupKeepClient(config apiConfig, keepServicesJSON string, blobSignatureTTL External: config.ExternalClient, } - // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers + // If keepServicesJSON is provided, use it instead of service discovery if keepServicesJSON == "" { kc, err = keepclient.MakeKeepClient(&arv) if err != nil { diff --git a/tools/keep-block-check/keep-block-check_test.go b/tools/keep-block-check/keep-block-check_test.go index e49fe68616..60c5fb51d1 100644 --- a/tools/keep-block-check/keep-block-check_test.go +++ b/tools/keep-block-check/keep-block-check_test.go @@ -64,6 +64,7 @@ func (s *DoMainTestSuite) SetUpSuite(c *C) { func (s *DoMainTestSuite) SetUpTest(c *C) { logOutput := io.MultiWriter(&logBuffer) log.SetOutput(logOutput) + keepclient.ClearCache() } func (s *DoMainTestSuite) TearDownTest(c *C) { @@ -89,6 +90,8 @@ func setupKeepBlockCheckWithTTL(c *C, enforcePermissions bool, keepServicesJSON kc, ttl, err = setupKeepClient(config, keepServicesJSON, ttl) c.Assert(ttl, Equals, blobSignatureTTL) c.Check(err, IsNil) + + keepclient.ClearCache() } // Setup test data @@ -144,9 +147,8 @@ func setupBlockHashFile(c *C, name string, blocks []string) string { func checkErrorLog(c *C, blocks []string, prefix, suffix string) { for _, hash := range blocks { - expected := prefix + `.*` + hash + `.*` + suffix - match, _ := regexp.MatchString(expected, logBuffer.String()) - c.Assert(match, Equals, true) + expected := `(?ms).*` + prefix + `.*` + hash + `.*` + suffix + `.*` + c.Check(logBuffer.String(), Matches, expected) } } diff --git a/tools/keep-rsync/keep-rsync.go b/tools/keep-rsync/keep-rsync.go index c6e7665caa..d21a624f64 100644 --- a/tools/keep-rsync/keep-rsync.go +++ b/tools/keep-rsync/keep-rsync.go @@ -6,8 +6,6 @@ import ( "errors" "flag" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" "io/ioutil" "log" "net/http" @@ -15,6 +13,9 @@ import ( "regexp" "strings" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" ) func main() { @@ -170,7 +171,7 @@ func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, repl External: config.ExternalClient, } - // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers + // If keepServicesJSON is provided, use it instead of service discovery if keepServicesJSON == "" { kc, err = keepclient.MakeKeepClient(&arv) if err != nil { diff --git a/tools/keep-rsync/keep-rsync_test.go b/tools/keep-rsync/keep-rsync_test.go index 09609eb749..6f128c69a1 100644 --- a/tools/keep-rsync/keep-rsync_test.go +++ b/tools/keep-rsync/keep-rsync_test.go @@ -58,6 +58,7 @@ func (s *ServerRequiredSuite) SetUpTest(c *C) { dstKeepServicesJSON = "" kcSrc = &keepclient.KeepClient{} kcDst = &keepclient.KeepClient{} + keepclient.ClearCache() } func (s *ServerRequiredSuite) TearDownTest(c *C) { @@ -65,6 +66,7 @@ func (s *ServerRequiredSuite) TearDownTest(c *C) { } func (s *DoMainTestSuite) SetUpTest(c *C) { + keepclient.ClearCache() args := []string{"keep-rsync"} os.Args = args }