1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
19 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
22 // RefreshServiceDiscovery clears the Keep service discovery cache.
23 func RefreshServiceDiscovery() {
26 svcListCacheMtx.Lock()
27 defer svcListCacheMtx.Unlock()
28 for _, ent := range svcListCache {
38 // RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls
39 // RefreshServiceDiscovery when SIGHUP is received.
40 func RefreshServiceDiscoveryOnSIGHUP() {
41 svcListCacheMtx.Lock()
42 defer svcListCacheMtx.Unlock()
43 if svcListCacheSignal != nil {
46 svcListCacheSignal = make(chan os.Signal, 1)
47 signal.Notify(svcListCacheSignal, syscall.SIGHUP)
49 for range svcListCacheSignal {
50 RefreshServiceDiscovery()
56 svcListCache = map[string]cachedSvcList{}
57 svcListCacheSignal chan os.Signal
58 svcListCacheMtx sync.Mutex
61 type cachedSvcList struct {
62 arv *arvadosclient.ArvadosClient
67 // Check for new services list every few minutes. Send the latest list
68 // to the "latest" channel as needed.
69 func (ent *cachedSvcList) poll() {
70 wakeup := make(chan struct{})
72 replace := make(chan svcList)
80 // Wait here for the next success, in
81 // order to avoid returning stale
82 // results on the "latest" channel.
84 case current = <-replace:
85 case ent.latest <- current:
90 okDelay := 5 * time.Minute
91 errDelay := 3 * time.Second
92 timer := time.NewTimer(okDelay)
98 // Lost race stopping timer; skip extra firing
103 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
105 log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
106 timer.Reset(errDelay)
114 // discoverServices gets the list of available keep services from
117 // If a list of services is provided in the arvadosclient (e.g., from
118 // an environment variable or local config), that list is used
121 // If an API call is made, the result is cached for 5 minutes or until
122 // ClearCache() is called, and during this interval it is reused by
123 // other KeepClients that use the same API server host.
124 func (kc *KeepClient) discoverServices() error {
125 if kc.disableDiscovery {
129 if kc.Arvados.KeepServiceURIs != nil {
130 kc.disableDiscovery = true
131 kc.foundNonDiskSvc = true
132 kc.replicasPerService = 0
133 roots := make(map[string]string)
134 for i, uri := range kc.Arvados.KeepServiceURIs {
135 roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
137 kc.setServiceRoots(roots, roots, roots)
141 if kc.Arvados.ApiServer == "" {
142 return fmt.Errorf("Arvados client is not configured (target API host is not set). Maybe env var ARVADOS_API_HOST should be set first?")
145 svcListCacheMtx.Lock()
146 cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
149 cacheEnt = cachedSvcList{
150 latest: make(chan svcList),
151 clear: make(chan struct{}),
155 svcListCache[kc.Arvados.ApiServer] = cacheEnt
157 svcListCacheMtx.Unlock()
160 case <-time.After(time.Minute):
161 return errors.New("timed out while getting initial list of keep services")
162 case sl := <-cacheEnt.latest:
163 return kc.loadKeepServers(sl)
167 func (kc *KeepClient) RefreshServiceDiscovery() {
168 svcListCacheMtx.Lock()
169 ent, ok := svcListCache[kc.Arvados.ApiServer]
170 svcListCacheMtx.Unlock()
171 if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
174 ent.clear <- struct{}{}
177 // LoadKeepServicesFromJSON gets list of available keep services from
178 // given JSON and disables automatic service discovery.
179 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
180 kc.disableDiscovery = true
183 dec := json.NewDecoder(strings.NewReader(services))
184 if err := dec.Decode(&list); err != nil {
188 return kc.loadKeepServers(list)
191 func (kc *KeepClient) loadKeepServers(list svcList) error {
192 listed := make(map[string]bool)
193 localRoots := make(map[string]string)
194 gatewayRoots := make(map[string]string)
195 writableLocalRoots := make(map[string]string)
197 // replicasPerService is 1 for disks; unknown or unlimited otherwise
198 kc.replicasPerService = 1
200 for _, service := range list.Items {
205 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
213 localRoots[service.Uuid] = url
214 if service.ReadOnly == false {
215 writableLocalRoots[service.Uuid] = url
216 if service.SvcType != "disk" {
217 kc.replicasPerService = 0
221 if service.SvcType != "disk" {
222 kc.foundNonDiskSvc = true
225 // Gateway services are only used when specified by
226 // UUID, so there's nothing to gain by filtering them
227 // by service type. Including all accessible services
228 // (gateway and otherwise) merely accommodates more
229 // service configurations.
230 gatewayRoots[service.Uuid] = url
233 kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)