1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
19 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
22 // ClearCache clears the Keep service discovery cache.
23 func RefreshServiceDiscovery() {
26 svcListCacheMtx.Lock()
27 defer svcListCacheMtx.Unlock()
28 for _, ent := range svcListCache {
31 ent.clear <- struct{}{}
37 // ClearCacheOnSIGHUP installs a signal handler that calls
38 // ClearCache when SIGHUP is received.
39 func RefreshServiceDiscoveryOnSIGHUP() {
40 svcListCacheMtx.Lock()
41 defer svcListCacheMtx.Unlock()
42 if svcListCacheSignal != nil {
45 svcListCacheSignal = make(chan os.Signal, 1)
46 signal.Notify(svcListCacheSignal, syscall.SIGHUP)
48 for range svcListCacheSignal {
49 RefreshServiceDiscovery()
55 svcListCache = map[string]cachedSvcList{}
56 svcListCacheSignal chan os.Signal
57 svcListCacheMtx sync.Mutex
60 type cachedSvcList struct {
61 arv *arvadosclient.ArvadosClient
66 // Check for new services list every few minutes. Send the latest list
67 // to the "latest" channel as needed.
68 func (ent *cachedSvcList) poll() {
69 wakeup := make(chan struct{})
71 replace := make(chan svcList)
79 // Wait here for the next success, in
80 // order to avoid returning stale
81 // results on the "latest" channel.
83 case current = <-replace:
84 case ent.latest <- current:
89 okDelay := 5 * time.Minute
90 errDelay := 3 * time.Second
91 timer := time.NewTimer(okDelay)
97 // Lost race stopping timer; skip extra firing
102 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
104 log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
105 timer.Reset(errDelay)
113 // discoverServices gets the list of available keep services from
116 // If a list of services is provided in the arvadosclient (e.g., from
117 // an environment variable or local config), that list is used
120 // If an API call is made, the result is cached for 5 minutes or until
121 // ClearCache() is called, and during this interval it is reused by
122 // other KeepClients that use the same API server host.
123 func (kc *KeepClient) discoverServices() error {
124 if kc.disableDiscovery {
128 if kc.Arvados.KeepServiceURIs != nil {
129 kc.disableDiscovery = true
130 kc.foundNonDiskSvc = true
131 kc.replicasPerService = 0
132 roots := make(map[string]string)
133 for i, uri := range kc.Arvados.KeepServiceURIs {
134 roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
136 kc.setServiceRoots(roots, roots, roots)
140 svcListCacheMtx.Lock()
141 cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
144 cacheEnt = cachedSvcList{
145 latest: make(chan svcList),
146 clear: make(chan struct{}),
150 svcListCache[kc.Arvados.ApiServer] = cacheEnt
152 svcListCacheMtx.Unlock()
155 case <-time.After(time.Minute):
156 return errors.New("timed out while getting initial list of keep services")
157 case sl := <-cacheEnt.latest:
158 return kc.loadKeepServers(sl)
162 func (kc *KeepClient) RefreshServiceDiscovery() {
163 svcListCacheMtx.Lock()
164 ent, ok := svcListCache[kc.Arvados.ApiServer]
165 svcListCacheMtx.Unlock()
166 if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
169 ent.clear <- struct{}{}
172 // LoadKeepServicesFromJSON gets list of available keep services from
173 // given JSON and disables automatic service discovery.
174 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
175 kc.disableDiscovery = true
178 dec := json.NewDecoder(strings.NewReader(services))
179 if err := dec.Decode(&list); err != nil {
183 return kc.loadKeepServers(list)
186 func (kc *KeepClient) loadKeepServers(list svcList) error {
187 listed := make(map[string]bool)
188 localRoots := make(map[string]string)
189 gatewayRoots := make(map[string]string)
190 writableLocalRoots := make(map[string]string)
192 // replicasPerService is 1 for disks; unknown or unlimited otherwise
193 kc.replicasPerService = 1
195 for _, service := range list.Items {
200 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
208 localRoots[service.Uuid] = url
209 if service.ReadOnly == false {
210 writableLocalRoots[service.Uuid] = url
211 if service.SvcType != "disk" {
212 kc.replicasPerService = 0
216 if service.SvcType != "disk" {
217 kc.foundNonDiskSvc = true
220 // Gateway services are only used when specified by
221 // UUID, so there's nothing to gain by filtering them
222 // by service type. Including all accessible services
223 // (gateway and otherwise) merely accommodates more
224 // service configurations.
225 gatewayRoots[service.Uuid] = url
228 kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)