1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
19 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
22 // RefreshServiceDiscovery clears the Keep service discovery cache.
23 func RefreshServiceDiscovery() {
26 svcListCacheMtx.Lock()
27 defer svcListCacheMtx.Unlock()
28 for _, ent := range svcListCache {
38 // RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls
39 // RefreshServiceDiscovery when SIGHUP is received.
40 func RefreshServiceDiscoveryOnSIGHUP() {
41 svcListCacheMtx.Lock()
42 defer svcListCacheMtx.Unlock()
43 if svcListCacheSignal != nil {
46 svcListCacheSignal = make(chan os.Signal, 1)
47 signal.Notify(svcListCacheSignal, syscall.SIGHUP)
49 for range svcListCacheSignal {
50 RefreshServiceDiscovery()
56 svcListCache = map[string]cachedSvcList{}
57 svcListCacheSignal chan os.Signal
58 svcListCacheMtx sync.Mutex
61 type cachedSvcList struct {
62 arv *arvadosclient.ArvadosClient
67 // Check for new services list every few minutes. Send the latest list
68 // to the "latest" channel as needed.
69 func (ent *cachedSvcList) poll() {
70 wakeup := make(chan struct{})
72 replace := make(chan svcList)
80 // Wait here for the next success, in
81 // order to avoid returning stale
82 // results on the "latest" channel.
84 case current = <-replace:
85 case ent.latest <- current:
90 okDelay := 5 * time.Minute
91 errDelay := 3 * time.Second
92 timer := time.NewTimer(okDelay)
98 // Lost race stopping timer; skip extra firing
103 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
105 log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
106 timer.Reset(errDelay)
114 // discoverServices gets the list of available keep services from
117 // If a list of services is provided in the arvadosclient (e.g., from
118 // an environment variable or local config), that list is used
121 // If an API call is made, the result is cached for 5 minutes or until
122 // ClearCache() is called, and during this interval it is reused by
123 // other KeepClients that use the same API server host.
124 func (kc *KeepClient) discoverServices() error {
125 if kc.disableDiscovery {
129 if kc.Arvados.KeepServiceURIs != nil {
130 kc.disableDiscovery = true
131 kc.foundNonDiskSvc = true
132 kc.replicasPerService = 0
133 roots := make(map[string]string)
134 for i, uri := range kc.Arvados.KeepServiceURIs {
135 roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
137 kc.setServiceRoots(roots, roots, roots)
141 svcListCacheMtx.Lock()
142 cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
145 cacheEnt = cachedSvcList{
146 latest: make(chan svcList),
147 clear: make(chan struct{}),
151 svcListCache[kc.Arvados.ApiServer] = cacheEnt
153 svcListCacheMtx.Unlock()
156 case <-time.After(time.Minute):
157 return errors.New("timed out while getting initial list of keep services")
158 case sl := <-cacheEnt.latest:
159 return kc.loadKeepServers(sl)
163 func (kc *KeepClient) RefreshServiceDiscovery() {
164 svcListCacheMtx.Lock()
165 ent, ok := svcListCache[kc.Arvados.ApiServer]
166 svcListCacheMtx.Unlock()
167 if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
170 ent.clear <- struct{}{}
173 // LoadKeepServicesFromJSON gets list of available keep services from
174 // given JSON and disables automatic service discovery.
175 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
176 kc.disableDiscovery = true
179 dec := json.NewDecoder(strings.NewReader(services))
180 if err := dec.Decode(&list); err != nil {
184 return kc.loadKeepServers(list)
187 func (kc *KeepClient) loadKeepServers(list svcList) error {
188 listed := make(map[string]bool)
189 localRoots := make(map[string]string)
190 gatewayRoots := make(map[string]string)
191 writableLocalRoots := make(map[string]string)
193 // replicasPerService is 1 for disks; unknown or unlimited otherwise
194 kc.replicasPerService = 1
196 for _, service := range list.Items {
201 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
209 localRoots[service.Uuid] = url
210 if service.ReadOnly == false {
211 writableLocalRoots[service.Uuid] = url
212 if service.SvcType != "disk" {
213 kc.replicasPerService = 0
217 if service.SvcType != "disk" {
218 kc.foundNonDiskSvc = true
221 // Gateway services are only used when specified by
222 // UUID, so there's nothing to gain by filtering them
223 // by service type. Including all accessible services
224 // (gateway and otherwise) merely accommodates more
225 // service configurations.
226 gatewayRoots[service.Uuid] = url
229 kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)