+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package keepclient
import (
"encoding/json"
+ "errors"
"fmt"
"log"
"os"
"os/signal"
- "reflect"
"strings"
+ "sync"
"syscall"
"time"
-)
-// DiscoverKeepServers gets list of available keep services from api server
-func (this *KeepClient) DiscoverKeepServers() error {
- var list svcList
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+)
- // Get keep services from api server
- err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
- if err != nil {
- return err
+// RefreshServiceDiscovery clears the Keep service discovery cache.
+func RefreshServiceDiscovery() {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ svcListCacheMtx.Lock()
+ defer svcListCacheMtx.Unlock()
+ for _, ent := range svcListCache {
+ wg.Add(1)
+ clear := ent.clear
+ go func() {
+ clear <- struct{}{}
+ wg.Done()
+ }()
}
-
- return this.loadKeepServers(list)
}
-// LoadKeepServicesFromJSON gets list of available keep services from given JSON
-func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
- var list svcList
-
- // Load keep services from given json
- dec := json.NewDecoder(strings.NewReader(services))
- if err := dec.Decode(&list); err != nil {
- return err
+// RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls
+// RefreshServiceDiscovery when SIGHUP is received.
+func RefreshServiceDiscoveryOnSIGHUP() {
+ svcListCacheMtx.Lock()
+ defer svcListCacheMtx.Unlock()
+ if svcListCacheSignal != nil {
+ return
}
-
- return this.loadKeepServers(list)
+ svcListCacheSignal = make(chan os.Signal, 1)
+ signal.Notify(svcListCacheSignal, syscall.SIGHUP)
+ go func() {
+ for range svcListCacheSignal {
+ RefreshServiceDiscovery()
+ }
+ }()
}
-// RefreshServices calls DiscoverKeepServers to refresh the keep
-// service list on SIGHUP; when the given interval has elapsed since
-// the last refresh; and (if the last refresh failed) the given
-// errInterval has elapsed.
-func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
- var previousRoots = []map[string]string{}
+var (
+ svcListCache = map[string]cachedSvcList{}
+ svcListCacheSignal chan os.Signal
+ svcListCacheMtx sync.Mutex
+)
+
+type cachedSvcList struct {
+ arv *arvadosclient.ArvadosClient
+ latest chan svcList
+ clear chan struct{}
+}
- timer := time.NewTimer(interval)
- gotHUP := make(chan os.Signal, 1)
- signal.Notify(gotHUP, syscall.SIGHUP)
+// Check for new services list every few minutes. Send the latest list
+// to the "latest" channel as needed.
+func (ent *cachedSvcList) poll() {
+ wakeup := make(chan struct{})
+
+ replace := make(chan svcList)
+ go func() {
+ wakeup <- struct{}{}
+ current := <-replace
+ for {
+ select {
+ case <-ent.clear:
+ wakeup <- struct{}{}
+ // Wait here for the next success, in
+ // order to avoid returning stale
+ // results on the "latest" channel.
+ current = <-replace
+ case current = <-replace:
+ case ent.latest <- current:
+ }
+ }
+ }()
+ okDelay := 5 * time.Minute
+ errDelay := 3 * time.Second
+ timer := time.NewTimer(okDelay)
for {
select {
- case <-gotHUP:
case <-timer.C:
+ case <-wakeup:
+ if !timer.Stop() {
+ // Lost race stopping timer; skip extra firing
+ <-timer.C
+ }
}
- timer.Reset(interval)
-
- if err := kc.DiscoverKeepServers(); err != nil {
- log.Println("Error retrieving services list: %v (retrying in %v)", err, errInterval)
- timer.Reset(errInterval)
+ var next svcList
+ err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
+ if err != nil {
+ log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
+ timer.Reset(errDelay)
continue
}
- newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
+ replace <- next
+ timer.Reset(okDelay)
+ }
+}
- if !reflect.DeepEqual(previousRoots, newRoots) {
- log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
- previousRoots = newRoots
+// discoverServices gets the list of available keep services from
+// the API server.
+//
+// If a list of services is provided in the arvadosclient (e.g., from
+// an environment variable or local config), that list is used
+// instead.
+//
+// If an API call is made, the result is cached for 5 minutes or until
+// ClearCache() is called, and during this interval it is reused by
+// other KeepClients that use the same API server host.
+func (kc *KeepClient) discoverServices() error {
+ if kc.disableDiscovery {
+ return nil
+ }
+
+ if kc.Arvados.KeepServiceURIs != nil {
+ kc.disableDiscovery = true
+ kc.foundNonDiskSvc = true
+ kc.replicasPerService = 0
+ roots := make(map[string]string)
+ for i, uri := range kc.Arvados.KeepServiceURIs {
+ roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
}
+ kc.setServiceRoots(roots, roots, roots)
+ return nil
+ }
- if len(newRoots[0]) == 0 {
- log.Printf("WARNING: No local services (retrying in %v)", errInterval)
- timer.Reset(errInterval)
+ svcListCacheMtx.Lock()
+ cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
+ if !ok {
+ arv := *kc.Arvados
+ cacheEnt = cachedSvcList{
+ latest: make(chan svcList),
+ clear: make(chan struct{}),
+ arv: &arv,
}
+ go cacheEnt.poll()
+ svcListCache[kc.Arvados.ApiServer] = cacheEnt
+ }
+ svcListCacheMtx.Unlock()
+
+ select {
+ case <-time.After(time.Minute):
+ return errors.New("timed out while getting initial list of keep services")
+ case sl := <-cacheEnt.latest:
+ return kc.loadKeepServers(sl)
+ }
+}
+
+func (kc *KeepClient) RefreshServiceDiscovery() {
+ svcListCacheMtx.Lock()
+ ent, ok := svcListCache[kc.Arvados.ApiServer]
+ svcListCacheMtx.Unlock()
+ if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
+ return
+ }
+ ent.clear <- struct{}{}
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from
+// given JSON and disables automatic service discovery.
+func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
+ kc.disableDiscovery = true
+
+ var list svcList
+ dec := json.NewDecoder(strings.NewReader(services))
+ if err := dec.Decode(&list); err != nil {
+ return err
}
+
+ return kc.loadKeepServers(list)
}
-// loadKeepServers
-func (this *KeepClient) loadKeepServers(list svcList) error {
+func (kc *KeepClient) loadKeepServers(list svcList) error {
listed := make(map[string]bool)
localRoots := make(map[string]string)
gatewayRoots := make(map[string]string)
writableLocalRoots := make(map[string]string)
// replicasPerService is 1 for disks; unknown or unlimited otherwise
- this.replicasPerService = 1
- this.Using_proxy = false
+ kc.replicasPerService = 1
for _, service := range list.Items {
scheme := "http"
listed[url] = true
localRoots[service.Uuid] = url
- if service.SvcType == "proxy" {
- this.Using_proxy = true
- }
-
if service.ReadOnly == false {
writableLocalRoots[service.Uuid] = url
if service.SvcType != "disk" {
- this.replicasPerService = 0
+ kc.replicasPerService = 0
}
}
+ if service.SvcType != "disk" {
+ kc.foundNonDiskSvc = true
+ }
+
// Gateway services are only used when specified by
// UUID, so there's nothing to gain by filtering them
// by service type. Including all accessible services
gatewayRoots[service.Uuid] = url
}
- if this.Using_proxy {
- this.setClientSettingsProxy()
- } else {
- this.setClientSettingsDisk()
- }
-
- this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
+ kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
return nil
}