9005: Keep service discovery up to date automatically.
[arvados.git] / sdk / go / keepclient / discover.go
index 8889c4bedae23f52d62667ba5b8747ed11c3f67b..c5413d4a4b067c041141b89109afd81a144ed2a6 100644 (file)
@@ -6,99 +6,165 @@ import (
        "log"
        "os"
        "os/signal"
-       "reflect"
        "strings"
+       "sync"
        "syscall"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 )
 
-// DiscoverKeepServers gets list of available keep services from the
-// API server.
-//
-// If a list of services is provided in the arvadosclient (e.g., from
-// an environment variable or local config), that list is used
-// instead.
-func (this *KeepClient) DiscoverKeepServers() error {
-       if this.Arvados.KeepServiceURIs != nil {
-               this.foundNonDiskSvc = true
-               this.replicasPerService = 0
-               roots := make(map[string]string)
-               for i, uri := range this.Arvados.KeepServiceURIs {
-                       roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
-               }
-               this.SetServiceRoots(roots, roots, roots)
-               return nil
+// ClearCache clears the Keep service discovery cache.
+func ClearCache() {
+       svcListCacheMtx.Lock()
+       defer svcListCacheMtx.Unlock()
+       for _, ent := range svcListCache {
+               ent.clear <- struct{}{}
        }
+}
 
-       // ArvadosClient did not provide a services list. Ask API
-       // server for a list of accessible services.
-       var list svcList
-       err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
-       if err != nil {
-               return err
+// ClearCacheOnSIGHUP installs a signal handler that calls
+// ClearCache when SIGHUP is received.
+func ClearCacheOnSIGHUP() {
+       svcListCacheMtx.Lock()
+       defer svcListCacheMtx.Unlock()
+       if svcListCacheSignal != nil {
+               return
        }
-       return this.loadKeepServers(list)
+       svcListCacheSignal = make(chan os.Signal, 1)
+       signal.Notify(svcListCacheSignal, syscall.SIGHUP)
+       go func() {
+               for range svcListCacheSignal {
+                       ClearCache()
+               }
+       }()
 }
 
-// LoadKeepServicesFromJSON gets list of available keep services from given JSON
-func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
-       var list svcList
-
-       // Load keep services from given json
-       dec := json.NewDecoder(strings.NewReader(services))
-       if err := dec.Decode(&list); err != nil {
-               return err
-       }
+var (
+       svcListCache       = map[string]cachedSvcList{}
+       svcListCacheSignal chan os.Signal
+       svcListCacheMtx    sync.Mutex
+)
 
-       return this.loadKeepServers(list)
+type cachedSvcList struct {
+       arv    *arvadosclient.ArvadosClient
+       latest chan svcList
+       clear  chan struct{}
 }
 
-// RefreshServices calls DiscoverKeepServers to refresh the keep
-// service list on SIGHUP; when the given interval has elapsed since
-// the last refresh; and (if the last refresh failed) the given
-// errInterval has elapsed.
-func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
-       var previousRoots = []map[string]string{}
-
-       timer := time.NewTimer(interval)
-       gotHUP := make(chan os.Signal, 1)
-       signal.Notify(gotHUP, syscall.SIGHUP)
+// Check for new services list every few minutes. Send the latest list
+// to the "latest" channel as needed.
+func (ent *cachedSvcList) poll() {
+       wakeup := make(chan struct{})
+
+       replace := make(chan svcList)
+       go func() {
+               wakeup <- struct{}{}
+               current := <-replace
+               for {
+                       select {
+                       case <-ent.clear:
+                               wakeup <- struct{}{}
+                               // Wait here for the next success, in
+                               // order to avoid returning stale
+                               // results on the "latest" channel.
+                               current = <-replace
+                       case current = <-replace:
+                       case ent.latest <- current:
+                       }
+               }
+       }()
 
+       okDelay := 5 * time.Minute
+       errDelay := 3 * time.Second
+       timer := time.NewTimer(okDelay)
        for {
                select {
-               case <-gotHUP:
                case <-timer.C:
+               case <-wakeup:
+                       if !timer.Stop() {
+                               // Lost race stopping timer; skip extra firing
+                               <-timer.C
+                       }
                }
-               timer.Reset(interval)
-
-               if err := kc.DiscoverKeepServers(); err != nil {
-                       log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errInterval)
-                       timer.Reset(errInterval)
+               var next svcList
+               err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
+               if err != nil {
+                       log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
+                       timer.Reset(errDelay)
                        continue
                }
-               newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
+               replace <- next
+               timer.Reset(okDelay)
+       }
+}
+
+// discoverServices gets the list of available keep services from
+// the API server.
+//
+// If a list of services is provided in the arvadosclient (e.g., from
+// an environment variable or local config), that list is used
+// instead.
+//
+// If an API call is made, the result is cached for 5 minutes or until
+// ClearCache() is called, and during this interval it is reused by
+// other KeepClients that use the same API server host.
+func (kc *KeepClient) discoverServices() error {
+       if kc.disableDiscovery {
+               return nil
+       }
 
-               if !reflect.DeepEqual(previousRoots, newRoots) {
-                       DebugPrintf("DEBUG: Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
-                       previousRoots = newRoots
+       if kc.Arvados.KeepServiceURIs != nil {
+               kc.disableDiscovery = true
+               kc.foundNonDiskSvc = true
+               kc.replicasPerService = 0
+               roots := make(map[string]string)
+               for i, uri := range kc.Arvados.KeepServiceURIs {
+                       roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
                }
+               kc.setServiceRoots(roots, roots, roots)
+               return nil
+       }
 
-               if len(newRoots[0]) == 0 {
-                       log.Printf("WARNING: No local services (retrying in %v)", errInterval)
-                       timer.Reset(errInterval)
+       svcListCacheMtx.Lock()
+       cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
+       if !ok {
+               arv := *kc.Arvados
+               cacheEnt = cachedSvcList{
+                       latest: make(chan svcList),
+                       clear:  make(chan struct{}),
+                       arv:    &arv,
                }
+               go cacheEnt.poll()
+               svcListCache[kc.Arvados.ApiServer] = cacheEnt
+       }
+       svcListCacheMtx.Unlock()
+
+       return kc.loadKeepServers(<-cacheEnt.latest)
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from
+// given JSON and disables automatic service discovery.
+func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
+       kc.disableDiscovery = true
+
+       var list svcList
+       dec := json.NewDecoder(strings.NewReader(services))
+       if err := dec.Decode(&list); err != nil {
+               return err
        }
+
+       return kc.loadKeepServers(list)
 }
 
-// loadKeepServers
-func (this *KeepClient) loadKeepServers(list svcList) error {
+func (kc *KeepClient) loadKeepServers(list svcList) error {
        listed := make(map[string]bool)
        localRoots := make(map[string]string)
        gatewayRoots := make(map[string]string)
        writableLocalRoots := make(map[string]string)
 
        // replicasPerService is 1 for disks; unknown or unlimited otherwise
-       this.replicasPerService = 1
+       kc.replicasPerService = 1
 
        for _, service := range list.Items {
                scheme := "http"
@@ -117,12 +183,12 @@ func (this *KeepClient) loadKeepServers(list svcList) error {
                if service.ReadOnly == false {
                        writableLocalRoots[service.Uuid] = url
                        if service.SvcType != "disk" {
-                               this.replicasPerService = 0
+                               kc.replicasPerService = 0
                        }
                }
 
                if service.SvcType != "disk" {
-                       this.foundNonDiskSvc = true
+                       kc.foundNonDiskSvc = true
                }
 
                // Gateway services are only used when specified by
@@ -133,6 +199,6 @@ func (this *KeepClient) loadKeepServers(list svcList) error {
                gatewayRoots[service.Uuid] = url
        }
 
-       this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
+       kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
        return nil
 }