X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f4ca9ad94a6bb006d1f3c7ba207837f1736d1247..7407f41105f8000bb3908d41a31daaf3a30d9440:/sdk/go/keepclient/discover.go diff --git a/sdk/go/keepclient/discover.go b/sdk/go/keepclient/discover.go index f039c21810..726c3fb30c 100644 --- a/sdk/go/keepclient/discover.go +++ b/sdk/go/keepclient/discover.go @@ -1,89 +1,197 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package keepclient import ( "encoding/json" + "errors" "fmt" "log" "os" "os/signal" - "reflect" "strings" + "sync" "syscall" "time" -) -// DiscoverKeepServers gets list of available keep services from api server -func (this *KeepClient) DiscoverKeepServers() error { - var list svcList + "git.arvados.org/arvados.git/sdk/go/arvadosclient" +) - // Get keep services from api server - err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list) - if err != nil { - return err +// RefreshServiceDiscovery clears the Keep service discovery cache. +func RefreshServiceDiscovery() { + var wg sync.WaitGroup + defer wg.Wait() + svcListCacheMtx.Lock() + defer svcListCacheMtx.Unlock() + for _, ent := range svcListCache { + wg.Add(1) + clear := ent.clear + go func() { + clear <- struct{}{} + wg.Done() + }() } - - return this.loadKeepServers(list) } -// LoadKeepServicesFromJSON gets list of available keep services from given JSON -func (this *KeepClient) LoadKeepServicesFromJSON(services string) error { - var list svcList - - // Load keep services from given json - dec := json.NewDecoder(strings.NewReader(services)) - if err := dec.Decode(&list); err != nil { - return err +// RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls +// RefreshServiceDiscovery when SIGHUP is received. +func RefreshServiceDiscoveryOnSIGHUP() { + svcListCacheMtx.Lock() + defer svcListCacheMtx.Unlock() + if svcListCacheSignal != nil { + return } - - return this.loadKeepServers(list) + svcListCacheSignal = make(chan os.Signal, 1) + signal.Notify(svcListCacheSignal, syscall.SIGHUP) + go func() { + for range svcListCacheSignal { + RefreshServiceDiscovery() + } + }() } -// RefreshServices calls DiscoverKeepServers to refresh the keep -// service list on SIGHUP; when the given interval has elapsed since -// the last refresh; and (if the last refresh failed) the given -// errInterval has elapsed. -func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) { - var previousRoots = []map[string]string{} +var ( + svcListCache = map[string]cachedSvcList{} + svcListCacheSignal chan os.Signal + svcListCacheMtx sync.Mutex +) + +type cachedSvcList struct { + arv *arvadosclient.ArvadosClient + latest chan svcList + clear chan struct{} +} - timer := time.NewTimer(interval) - gotHUP := make(chan os.Signal, 1) - signal.Notify(gotHUP, syscall.SIGHUP) +// Check for new services list every few minutes. Send the latest list +// to the "latest" channel as needed. +func (ent *cachedSvcList) poll() { + wakeup := make(chan struct{}) + + replace := make(chan svcList) + go func() { + wakeup <- struct{}{} + current := <-replace + for { + select { + case <-ent.clear: + wakeup <- struct{}{} + // Wait here for the next success, in + // order to avoid returning stale + // results on the "latest" channel. + current = <-replace + case current = <-replace: + case ent.latest <- current: + } + } + }() + okDelay := 5 * time.Minute + errDelay := 3 * time.Second + timer := time.NewTimer(okDelay) for { select { - case <-gotHUP: case <-timer.C: + case <-wakeup: + if !timer.Stop() { + // Lost race stopping timer; skip extra firing + <-timer.C + } } - timer.Reset(interval) - - if err := kc.DiscoverKeepServers(); err != nil { - log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errInterval) - timer.Reset(errInterval) + var next svcList + err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next) + if err != nil { + log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay) + timer.Reset(errDelay) continue } - newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()} + replace <- next + timer.Reset(okDelay) + } +} + +// discoverServices gets the list of available keep services from +// the API server. +// +// If a list of services is provided in the arvadosclient (e.g., from +// an environment variable or local config), that list is used +// instead. +// +// If an API call is made, the result is cached for 5 minutes or until +// ClearCache() is called, and during this interval it is reused by +// other KeepClients that use the same API server host. +func (kc *KeepClient) discoverServices() error { + if kc.disableDiscovery { + return nil + } - if !reflect.DeepEqual(previousRoots, newRoots) { - DebugPrintf("DEBUG: Updated services list: locals %v gateways %v", newRoots[0], newRoots[1]) - previousRoots = newRoots + if kc.Arvados.KeepServiceURIs != nil { + kc.disableDiscovery = true + kc.foundNonDiskSvc = true + kc.replicasPerService = 0 + roots := make(map[string]string) + for i, uri := range kc.Arvados.KeepServiceURIs { + roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri } + kc.setServiceRoots(roots, roots, roots) + return nil + } - if len(newRoots[0]) == 0 { - log.Printf("WARNING: No local services (retrying in %v)", errInterval) - timer.Reset(errInterval) + svcListCacheMtx.Lock() + cacheEnt, ok := svcListCache[kc.Arvados.ApiServer] + if !ok { + arv := *kc.Arvados + cacheEnt = cachedSvcList{ + latest: make(chan svcList), + clear: make(chan struct{}), + arv: &arv, } + go cacheEnt.poll() + svcListCache[kc.Arvados.ApiServer] = cacheEnt + } + svcListCacheMtx.Unlock() + + select { + case <-time.After(time.Minute): + return errors.New("timed out while getting initial list of keep services") + case sl := <-cacheEnt.latest: + return kc.loadKeepServers(sl) + } +} + +func (kc *KeepClient) RefreshServiceDiscovery() { + svcListCacheMtx.Lock() + ent, ok := svcListCache[kc.Arvados.ApiServer] + svcListCacheMtx.Unlock() + if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery { + return + } + ent.clear <- struct{}{} +} + +// LoadKeepServicesFromJSON gets list of available keep services from +// given JSON and disables automatic service discovery. +func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error { + kc.disableDiscovery = true + + var list svcList + dec := json.NewDecoder(strings.NewReader(services)) + if err := dec.Decode(&list); err != nil { + return err } + + return kc.loadKeepServers(list) } -// loadKeepServers -func (this *KeepClient) loadKeepServers(list svcList) error { +func (kc *KeepClient) loadKeepServers(list svcList) error { listed := make(map[string]bool) localRoots := make(map[string]string) gatewayRoots := make(map[string]string) writableLocalRoots := make(map[string]string) // replicasPerService is 1 for disks; unknown or unlimited otherwise - this.replicasPerService = 1 + kc.replicasPerService = 1 for _, service := range list.Items { scheme := "http" @@ -102,12 +210,12 @@ func (this *KeepClient) loadKeepServers(list svcList) error { if service.ReadOnly == false { writableLocalRoots[service.Uuid] = url if service.SvcType != "disk" { - this.replicasPerService = 0 + kc.replicasPerService = 0 } } if service.SvcType != "disk" { - this.foundNonDiskSvc = true + kc.foundNonDiskSvc = true } // Gateway services are only used when specified by @@ -118,12 +226,6 @@ func (this *KeepClient) loadKeepServers(list svcList) error { gatewayRoots[service.Uuid] = url } - if this.foundNonDiskSvc { - this.setClientSettingsNonDisk() - } else { - this.setClientSettingsDisk() - } - - this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots) + kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots) return nil }