1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
19 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
22 // RefreshServiceDiscovery clears the Keep service discovery cache.
23 func RefreshServiceDiscovery() {
26 svcListCacheMtx.Lock()
27 defer svcListCacheMtx.Unlock()
28 for _, ent := range svcListCache {
38 // RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls
39 // RefreshServiceDiscovery when SIGHUP is received.
40 func RefreshServiceDiscoveryOnSIGHUP() {
41 svcListCacheMtx.Lock()
42 defer svcListCacheMtx.Unlock()
43 if svcListCacheSignal != nil {
46 svcListCacheSignal = make(chan os.Signal, 1)
47 signal.Notify(svcListCacheSignal, syscall.SIGHUP)
49 for range svcListCacheSignal {
50 RefreshServiceDiscovery()
56 svcListCache = map[string]cachedSvcList{}
57 svcListCacheSignal chan os.Signal
58 svcListCacheMtx sync.Mutex
61 type cachedSvcList struct {
62 arv *arvadosclient.ArvadosClient
67 // Check for new services list every few minutes. Send the latest list
68 // to the "latest" channel as needed.
69 func (ent *cachedSvcList) poll() {
70 wakeup := make(chan struct{})
72 replace := make(chan svcList)
80 // Wait here for the next success, in
81 // order to avoid returning stale
82 // results on the "latest" channel.
84 case current = <-replace:
85 case ent.latest <- current:
90 okDelay := 5 * time.Minute
91 errDelay := 3 * time.Second
92 timer := time.NewTimer(okDelay)
98 // Lost race stopping timer; skip extra firing
103 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
105 if ent.arv.Logger != nil {
106 ent.arv.Logger.WithError(err).Warnf("error retrieving services list (retrying in %v)", errDelay)
108 log.Printf("WARNING: Error retrieving services list: %s (retrying in %v)", err, errDelay)
110 timer.Reset(errDelay)
118 // discoverServices gets the list of available keep services from
121 // If a list of services is provided in the arvadosclient (e.g., from
122 // an environment variable or local config), that list is used
125 // If an API call is made, the result is cached for 5 minutes or until
126 // ClearCache() is called, and during this interval it is reused by
127 // other KeepClients that use the same API server host.
128 func (kc *KeepClient) discoverServices() error {
129 if kc.disableDiscovery {
133 if kc.Arvados.KeepServiceURIs != nil {
134 kc.disableDiscovery = true
135 kc.foundNonDiskSvc = true
136 kc.replicasPerService = 0
137 roots := make(map[string]string)
138 for i, uri := range kc.Arvados.KeepServiceURIs {
139 roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
141 kc.setServiceRoots(roots, roots, roots)
145 if kc.Arvados.ApiServer == "" {
146 return fmt.Errorf("Arvados client is not configured (target API host is not set). Maybe env var ARVADOS_API_HOST should be set first?")
149 svcListCacheMtx.Lock()
150 cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
153 cacheEnt = cachedSvcList{
154 latest: make(chan svcList),
155 clear: make(chan struct{}),
159 svcListCache[kc.Arvados.ApiServer] = cacheEnt
161 svcListCacheMtx.Unlock()
164 case <-time.After(time.Minute):
165 return errors.New("timed out while getting initial list of keep services")
166 case sl := <-cacheEnt.latest:
167 return kc.loadKeepServers(sl)
171 func (kc *KeepClient) RefreshServiceDiscovery() {
172 svcListCacheMtx.Lock()
173 ent, ok := svcListCache[kc.Arvados.ApiServer]
174 svcListCacheMtx.Unlock()
175 if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
178 ent.clear <- struct{}{}
181 // LoadKeepServicesFromJSON gets list of available keep services from
182 // given JSON and disables automatic service discovery.
183 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
184 kc.disableDiscovery = true
187 dec := json.NewDecoder(strings.NewReader(services))
188 if err := dec.Decode(&list); err != nil {
192 return kc.loadKeepServers(list)
195 func (kc *KeepClient) loadKeepServers(list svcList) error {
196 listed := make(map[string]bool)
197 localRoots := make(map[string]string)
198 gatewayRoots := make(map[string]string)
199 writableLocalRoots := make(map[string]string)
201 // replicasPerService is 1 for disks; unknown or unlimited otherwise
202 kc.replicasPerService = 1
204 for _, service := range list.Items {
209 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
217 localRoots[service.Uuid] = url
218 if service.ReadOnly == false {
219 writableLocalRoots[service.Uuid] = url
220 if service.SvcType != "disk" {
221 kc.replicasPerService = 0
225 if service.SvcType != "disk" {
226 kc.foundNonDiskSvc = true
229 // Gateway services are only used when specified by
230 // UUID, so there's nothing to gain by filtering them
231 // by service type. Including all accessible services
232 // (gateway and otherwise) merely accommodates more
233 // service configurations.
234 gatewayRoots[service.Uuid] = url
237 kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)