1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
18 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
21 // ClearCache clears the Keep service discovery cache.
22 func RefreshServiceDiscovery() {
23 svcListCacheMtx.Lock()
24 defer svcListCacheMtx.Unlock()
25 for _, ent := range svcListCache {
26 ent.clear <- struct{}{}
30 // ClearCacheOnSIGHUP installs a signal handler that calls
31 // ClearCache when SIGHUP is received.
32 func RefreshServiceDiscoveryOnSIGHUP() {
33 svcListCacheMtx.Lock()
34 defer svcListCacheMtx.Unlock()
35 if svcListCacheSignal != nil {
38 svcListCacheSignal = make(chan os.Signal, 1)
39 signal.Notify(svcListCacheSignal, syscall.SIGHUP)
41 for range svcListCacheSignal {
42 RefreshServiceDiscovery()
48 svcListCache = map[string]cachedSvcList{}
49 svcListCacheSignal chan os.Signal
50 svcListCacheMtx sync.Mutex
53 type cachedSvcList struct {
54 arv *arvadosclient.ArvadosClient
59 // Check for new services list every few minutes. Send the latest list
60 // to the "latest" channel as needed.
61 func (ent *cachedSvcList) poll() {
62 wakeup := make(chan struct{})
64 replace := make(chan svcList)
72 // Wait here for the next success, in
73 // order to avoid returning stale
74 // results on the "latest" channel.
76 case current = <-replace:
77 case ent.latest <- current:
82 okDelay := 5 * time.Minute
83 errDelay := 3 * time.Second
84 timer := time.NewTimer(okDelay)
90 // Lost race stopping timer; skip extra firing
95 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
97 log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
106 // discoverServices gets the list of available keep services from
109 // If a list of services is provided in the arvadosclient (e.g., from
110 // an environment variable or local config), that list is used
113 // If an API call is made, the result is cached for 5 minutes or until
114 // ClearCache() is called, and during this interval it is reused by
115 // other KeepClients that use the same API server host.
116 func (kc *KeepClient) discoverServices() error {
117 if kc.disableDiscovery {
121 if kc.Arvados.KeepServiceURIs != nil {
122 kc.disableDiscovery = true
123 kc.foundNonDiskSvc = true
124 kc.replicasPerService = 0
125 roots := make(map[string]string)
126 for i, uri := range kc.Arvados.KeepServiceURIs {
127 roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
129 kc.setServiceRoots(roots, roots, roots)
133 svcListCacheMtx.Lock()
134 cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
137 cacheEnt = cachedSvcList{
138 latest: make(chan svcList),
139 clear: make(chan struct{}),
143 svcListCache[kc.Arvados.ApiServer] = cacheEnt
145 svcListCacheMtx.Unlock()
147 return kc.loadKeepServers(<-cacheEnt.latest)
150 // LoadKeepServicesFromJSON gets list of available keep services from
151 // given JSON and disables automatic service discovery.
152 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
153 kc.disableDiscovery = true
156 dec := json.NewDecoder(strings.NewReader(services))
157 if err := dec.Decode(&list); err != nil {
161 return kc.loadKeepServers(list)
164 func (kc *KeepClient) loadKeepServers(list svcList) error {
165 listed := make(map[string]bool)
166 localRoots := make(map[string]string)
167 gatewayRoots := make(map[string]string)
168 writableLocalRoots := make(map[string]string)
170 // replicasPerService is 1 for disks; unknown or unlimited otherwise
171 kc.replicasPerService = 1
173 for _, service := range list.Items {
178 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
186 localRoots[service.Uuid] = url
187 if service.ReadOnly == false {
188 writableLocalRoots[service.Uuid] = url
189 if service.SvcType != "disk" {
190 kc.replicasPerService = 0
194 if service.SvcType != "disk" {
195 kc.foundNonDiskSvc = true
198 // Gateway services are only used when specified by
199 // UUID, so there's nothing to gain by filtering them
200 // by service type. Including all accessible services
201 // (gateway and otherwise) merely accommodates more
202 // service configurations.
203 gatewayRoots[service.Uuid] = url
206 kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)