"log"
"os"
"os/signal"
- "reflect"
"strings"
+ "sync"
"syscall"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
)
-// DiscoverKeepServers gets list of available keep services from the
-// API server.
-//
-// If a list of services is provided in the arvadosclient (e.g., from
-// an environment variable or local config), that list is used
-// instead.
-func (this *KeepClient) DiscoverKeepServers() error {
- if this.Arvados.KeepServiceURIs != nil {
- this.foundNonDiskSvc = true
- this.replicasPerService = 0
- roots := make(map[string]string)
- for i, uri := range this.Arvados.KeepServiceURIs {
- roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
- }
- this.SetServiceRoots(roots, roots, roots)
- return nil
+// ClearCache clears the Keep service discovery cache.
+func ClearCache() {
+ svcListCacheMtx.Lock()
+ defer svcListCacheMtx.Unlock()
+ for _, ent := range svcListCache {
+ ent.clear <- struct{}{}
}
+}
- // ArvadosClient did not provide a services list. Ask API
- // server for a list of accessible services.
- var list svcList
- err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
- if err != nil {
- return err
+// ClearCacheOnSIGHUP installs a signal handler that calls
+// ClearCache when SIGHUP is received.
+func ClearCacheOnSIGHUP() {
+ svcListCacheMtx.Lock()
+ defer svcListCacheMtx.Unlock()
+ if svcListCacheSignal != nil {
+ return
}
- return this.loadKeepServers(list)
+ svcListCacheSignal = make(chan os.Signal, 1)
+ signal.Notify(svcListCacheSignal, syscall.SIGHUP)
+ go func() {
+ for range svcListCacheSignal {
+ ClearCache()
+ }
+ }()
}
-// LoadKeepServicesFromJSON gets list of available keep services from given JSON
-func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
- var list svcList
-
- // Load keep services from given json
- dec := json.NewDecoder(strings.NewReader(services))
- if err := dec.Decode(&list); err != nil {
- return err
- }
+var (
+ svcListCache = map[string]cachedSvcList{}
+ svcListCacheSignal chan os.Signal
+ svcListCacheMtx sync.Mutex
+)
- return this.loadKeepServers(list)
+type cachedSvcList struct {
+ arv *arvadosclient.ArvadosClient
+ latest chan svcList
+ clear chan struct{}
}
-// RefreshServices calls DiscoverKeepServers to refresh the keep
-// service list on SIGHUP; when the given interval has elapsed since
-// the last refresh; and (if the last refresh failed) the given
-// errInterval has elapsed.
-func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
- var previousRoots = []map[string]string{}
-
- timer := time.NewTimer(interval)
- gotHUP := make(chan os.Signal, 1)
- signal.Notify(gotHUP, syscall.SIGHUP)
+// Check for new services list every few minutes. Send the latest list
+// to the "latest" channel as needed.
+func (ent *cachedSvcList) poll() {
+ wakeup := make(chan struct{})
+
+ replace := make(chan svcList)
+ go func() {
+ wakeup <- struct{}{}
+ current := <-replace
+ for {
+ select {
+ case <-ent.clear:
+ wakeup <- struct{}{}
+ // Wait here for the next success, in
+ // order to avoid returning stale
+ // results on the "latest" channel.
+ current = <-replace
+ case current = <-replace:
+ case ent.latest <- current:
+ }
+ }
+ }()
+ okDelay := 5 * time.Minute
+ errDelay := 3 * time.Second
+ timer := time.NewTimer(okDelay)
for {
select {
- case <-gotHUP:
case <-timer.C:
+ case <-wakeup:
+ if !timer.Stop() {
+ // Lost race stopping timer; skip extra firing
+ <-timer.C
+ }
}
- timer.Reset(interval)
-
- if err := kc.DiscoverKeepServers(); err != nil {
- log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errInterval)
- timer.Reset(errInterval)
+ var next svcList
+ err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
+ if err != nil {
+ log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
+ timer.Reset(errDelay)
continue
}
- newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
+ replace <- next
+ timer.Reset(okDelay)
+ }
+}
+
+// discoverServices gets the list of available keep services from
+// the API server.
+//
+// If a list of services is provided in the arvadosclient (e.g., from
+// an environment variable or local config), that list is used
+// instead.
+//
+// If an API call is made, the result is cached for 5 minutes or until
+// ClearCache() is called, and during this interval it is reused by
+// other KeepClients that use the same API server host.
+func (kc *KeepClient) discoverServices() error {
+ if kc.disableDiscovery {
+ return nil
+ }
- if !reflect.DeepEqual(previousRoots, newRoots) {
- DebugPrintf("DEBUG: Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
- previousRoots = newRoots
+ if kc.Arvados.KeepServiceURIs != nil {
+ kc.disableDiscovery = true
+ kc.foundNonDiskSvc = true
+ kc.replicasPerService = 0
+ roots := make(map[string]string)
+ for i, uri := range kc.Arvados.KeepServiceURIs {
+ roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
}
+ kc.setServiceRoots(roots, roots, roots)
+ return nil
+ }
- if len(newRoots[0]) == 0 {
- log.Printf("WARNING: No local services (retrying in %v)", errInterval)
- timer.Reset(errInterval)
+ svcListCacheMtx.Lock()
+ cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
+ if !ok {
+ arv := *kc.Arvados
+ cacheEnt = cachedSvcList{
+ latest: make(chan svcList),
+ clear: make(chan struct{}),
+ arv: &arv,
}
+ go cacheEnt.poll()
+ svcListCache[kc.Arvados.ApiServer] = cacheEnt
+ }
+ svcListCacheMtx.Unlock()
+
+ return kc.loadKeepServers(<-cacheEnt.latest)
+}
+
+// LoadKeepServicesFromJSON gets list of available keep services from
+// given JSON and disables automatic service discovery.
+func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
+ kc.disableDiscovery = true
+
+ var list svcList
+ dec := json.NewDecoder(strings.NewReader(services))
+ if err := dec.Decode(&list); err != nil {
+ return err
}
+
+ return kc.loadKeepServers(list)
}
-// loadKeepServers
-func (this *KeepClient) loadKeepServers(list svcList) error {
+func (kc *KeepClient) loadKeepServers(list svcList) error {
listed := make(map[string]bool)
localRoots := make(map[string]string)
gatewayRoots := make(map[string]string)
writableLocalRoots := make(map[string]string)
// replicasPerService is 1 for disks; unknown or unlimited otherwise
- this.replicasPerService = 1
+ kc.replicasPerService = 1
for _, service := range list.Items {
scheme := "http"
if service.ReadOnly == false {
writableLocalRoots[service.Uuid] = url
if service.SvcType != "disk" {
- this.replicasPerService = 0
+ kc.replicasPerService = 0
}
}
if service.SvcType != "disk" {
- this.foundNonDiskSvc = true
+ kc.foundNonDiskSvc = true
}
// Gateway services are only used when specified by
gatewayRoots[service.Uuid] = url
}
- this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
+ kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
return nil
}
import (
"crypto/md5"
"fmt"
- "gopkg.in/check.v1"
"net/http"
"os"
- "time"
+
+ "gopkg.in/check.v1"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
)
-func ExampleKeepClient_RefreshServices() {
- arv, err := arvadosclient.MakeArvadosClient()
- if err != nil {
- panic(err)
- }
- kc, err := MakeKeepClient(arv)
- if err != nil {
- panic(err)
- }
- go kc.RefreshServices(5*time.Minute, 3*time.Second)
- fmt.Printf("LocalRoots: %#v\n", kc.LocalRoots())
-}
-
func (s *ServerRequiredSuite) TestOverrideDiscovery(c *check.C) {
defer os.Setenv("ARVADOS_KEEP_SERVICES", "")
type KeepClient struct {
Arvados *arvadosclient.ArvadosClient
Want_replicas int
- localRoots *map[string]string
- writableLocalRoots *map[string]string
- gatewayRoots *map[string]string
+ localRoots map[string]string
+ writableLocalRoots map[string]string
+ gatewayRoots map[string]string
lock sync.RWMutex
HTTPClient HTTPClient
Retries int
// Any non-disk typed services found in the list of keepservers?
foundNonDiskSvc bool
+
+ // Disable automatic discovery of keep services
+ disableDiscovery bool
}
// MakeKeepClient creates a new KeepClient, calls
// use.
func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
kc := New(arv)
- return kc, kc.DiscoverKeepServers()
+ return kc, kc.discoverServices()
}
-// New creates a new KeepClient. The caller must call
-// DiscoverKeepServers() before using the returned client to read or
-// write data.
+// New creates a new KeepClient. Service discovery will occur on the
+// next read/write operation.
func New(arv *arvadosclient.ArvadosClient) *KeepClient {
defaultReplicationLevel := 2
value, err := arv.Discovery("defaultCollectionReplication")
// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
// services: uuid -> baseURI.
func (kc *KeepClient) LocalRoots() map[string]string {
+ kc.discoverServices()
kc.lock.RLock()
defer kc.lock.RUnlock()
- return *kc.localRoots
+ return kc.localRoots
}
// GatewayRoots() returns the map of Keep remote gateway services:
// uuid -> baseURI.
func (kc *KeepClient) GatewayRoots() map[string]string {
+ kc.discoverServices()
kc.lock.RLock()
defer kc.lock.RUnlock()
- return *kc.gatewayRoots
+ return kc.gatewayRoots
}
// WritableLocalRoots() returns the map of writable local Keep services:
// uuid -> baseURI.
func (kc *KeepClient) WritableLocalRoots() map[string]string {
+ kc.discoverServices()
kc.lock.RLock()
defer kc.lock.RUnlock()
- return *kc.writableLocalRoots
+ return kc.writableLocalRoots
}
-// SetServiceRoots updates the localRoots and gatewayRoots maps,
-// without risk of disrupting operations that are already in progress.
+// SetServiceRoots disables service discovery and updates the
+// localRoots and gatewayRoots maps, without disrupting operations
+// that are already in progress.
//
-// The KeepClient makes its own copy of the supplied maps, so the
-// caller can reuse/modify them after SetServiceRoots returns, but
-// they should not be modified by any other goroutine while
-// SetServiceRoots is running.
-func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals, newGateways map[string]string) {
- locals := make(map[string]string)
- for uuid, root := range newLocals {
- locals[uuid] = root
- }
-
- writables := make(map[string]string)
- for uuid, root := range newWritableLocals {
- writables[uuid] = root
- }
-
- gateways := make(map[string]string)
- for uuid, root := range newGateways {
- gateways[uuid] = root
- }
+// The supplied maps must not be modified after calling
+// SetServiceRoots.
+func (kc *KeepClient) SetServiceRoots(locals, writables, gateways map[string]string) {
+ kc.disableDiscovery = true
+ kc.setServiceRoots(locals, writables, gateways)
+}
+func (kc *KeepClient) setServiceRoots(locals, writables, gateways map[string]string) {
kc.lock.Lock()
defer kc.lock.Unlock()
- kc.localRoots = &locals
- kc.writableLocalRoots = &writables
- kc.gatewayRoots = &gateways
+ kc.localRoots = locals
+ kc.writableLocalRoots = writables
+ kc.gatewayRoots = gateways
}
// getSortedRoots returns a list of base URIs of Keep services, in the
// Standalone tests
type StandaloneSuite struct{}
+func (s *StandaloneSuite) SetUpTest(c *C) {
+ ClearCache()
+}
+
func pythonDir() string {
cwd, _ := os.Getwd()
return fmt.Sprintf("%s/../../python/tests", cwd)
arvadostest.StopAPI()
}
+func (s *ServerRequiredSuite) SetUpTest(c *C) {
+ ClearCache()
+}
+
func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, Equals, nil)
defer ks.listener.Close()
arv, err := arvadosclient.MakeArvadosClient()
- kc, _ := MakeKeepClient(arv)
+ c.Assert(err, IsNil)
+ kc, err := MakeKeepClient(arv)
+ c.Assert(err, IsNil)
arv.ApiToken = "abc123"
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, err := kc.GetIndex("x", "")
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, err := kc.GetIndex("x", hash[0:3])
- c.Check(err, Equals, nil)
+ c.Assert(err, Equals, nil)
content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
&blobKeepService)
c.Assert(err, Equals, nil)
defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
+ ClearCache()
// Make a keepclient and ensure that the testblobstore is included
kc, err := MakeKeepClient(arv)
func (h *handler) setup() {
h.clientPool = arvadosclient.MakeClientPool()
+ keepclient.ClearCacheOnSIGHUP()
}
// ServeHTTP implements http.Handler.
if cfg.DefaultReplicas > 0 {
kc.Want_replicas = cfg.DefaultReplicas
}
- go kc.RefreshServices(5*time.Minute, 3*time.Second)
listener, err = net.Listen("tcp", cfg.Listen)
if err != nil {
// start api and keep servers
arvadostest.StartAPI()
arvadostest.StartKeep(2, false)
+ keepclient.ClearCache()
// make arvadosclient
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
- t.Error("Error creating arv")
+ t.Fatalf("Error creating arv: %s", err)
}
// keep client
- keepClient = &keepclient.KeepClient{
- Arvados: arv,
- Want_replicas: 1,
+ keepClient, err = keepclient.MakeKeepClient(arv)
+ if err != nil {
+ t.Fatalf("error creating KeepClient: %s", err)
}
+ keepClient.Want_replicas = 1
// discover keep services
var servers []string
- if err := keepClient.DiscoverKeepServers(); err != nil {
- t.Error("Error discovering keep services")
- }
for _, host := range keepClient.LocalRoots() {
servers = append(servers, host)
}
"errors"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
"log"
"net/http"
"regexp"
"strings"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
func main() {
External: config.ExternalClient,
}
- // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+ // If keepServicesJSON is provided, use it instead of service discovery
if keepServicesJSON == "" {
kc, err = keepclient.MakeKeepClient(&arv)
if err != nil {
func (s *DoMainTestSuite) SetUpTest(c *C) {
logOutput := io.MultiWriter(&logBuffer)
log.SetOutput(logOutput)
+ keepclient.ClearCache()
}
func (s *DoMainTestSuite) TearDownTest(c *C) {
kc, ttl, err = setupKeepClient(config, keepServicesJSON, ttl)
c.Assert(ttl, Equals, blobSignatureTTL)
c.Check(err, IsNil)
+
+ keepclient.ClearCache()
}
// Setup test data
func checkErrorLog(c *C, blocks []string, prefix, suffix string) {
for _, hash := range blocks {
- expected := prefix + `.*` + hash + `.*` + suffix
- match, _ := regexp.MatchString(expected, logBuffer.String())
- c.Assert(match, Equals, true)
+ expected := `(?ms).*` + prefix + `.*` + hash + `.*` + suffix + `.*`
+ c.Check(logBuffer.String(), Matches, expected)
}
}
"errors"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
"log"
"net/http"
"regexp"
"strings"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
func main() {
External: config.ExternalClient,
}
- // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+ // If keepServicesJSON is provided, use it instead of service discovery
if keepServicesJSON == "" {
kc, err = keepclient.MakeKeepClient(&arv)
if err != nil {
dstKeepServicesJSON = ""
kcSrc = &keepclient.KeepClient{}
kcDst = &keepclient.KeepClient{}
+ keepclient.ClearCache()
}
func (s *ServerRequiredSuite) TearDownTest(c *C) {
}
func (s *DoMainTestSuite) SetUpTest(c *C) {
+ keepclient.ClearCache()
args := []string{"keep-rsync"}
os.Args = args
}