1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
18 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
21 // ClearCache clears the Keep service discovery cache.
22 func RefreshServiceDiscovery() {
25 svcListCacheMtx.Lock()
26 defer svcListCacheMtx.Unlock()
27 for _, ent := range svcListCache {
30 ent.clear <- struct{}{}
36 // ClearCacheOnSIGHUP installs a signal handler that calls
37 // ClearCache when SIGHUP is received.
38 func RefreshServiceDiscoveryOnSIGHUP() {
39 svcListCacheMtx.Lock()
40 defer svcListCacheMtx.Unlock()
41 if svcListCacheSignal != nil {
44 svcListCacheSignal = make(chan os.Signal, 1)
45 signal.Notify(svcListCacheSignal, syscall.SIGHUP)
47 for range svcListCacheSignal {
48 RefreshServiceDiscovery()
54 svcListCache = map[string]cachedSvcList{}
55 svcListCacheSignal chan os.Signal
56 svcListCacheMtx sync.Mutex
59 type cachedSvcList struct {
60 arv *arvadosclient.ArvadosClient
65 // Check for new services list every few minutes. Send the latest list
66 // to the "latest" channel as needed.
67 func (ent *cachedSvcList) poll() {
68 wakeup := make(chan struct{})
70 replace := make(chan svcList)
78 // Wait here for the next success, in
79 // order to avoid returning stale
80 // results on the "latest" channel.
82 case current = <-replace:
83 case ent.latest <- current:
88 okDelay := 5 * time.Minute
89 errDelay := 3 * time.Second
90 timer := time.NewTimer(okDelay)
96 // Lost race stopping timer; skip extra firing
101 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
103 log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
104 timer.Reset(errDelay)
112 // discoverServices gets the list of available keep services from
115 // If a list of services is provided in the arvadosclient (e.g., from
116 // an environment variable or local config), that list is used
119 // If an API call is made, the result is cached for 5 minutes or until
120 // ClearCache() is called, and during this interval it is reused by
121 // other KeepClients that use the same API server host.
122 func (kc *KeepClient) discoverServices() error {
123 if kc.disableDiscovery {
127 if kc.Arvados.KeepServiceURIs != nil {
128 kc.disableDiscovery = true
129 kc.foundNonDiskSvc = true
130 kc.replicasPerService = 0
131 roots := make(map[string]string)
132 for i, uri := range kc.Arvados.KeepServiceURIs {
133 roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
135 kc.setServiceRoots(roots, roots, roots)
139 svcListCacheMtx.Lock()
140 cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
143 cacheEnt = cachedSvcList{
144 latest: make(chan svcList),
145 clear: make(chan struct{}),
149 svcListCache[kc.Arvados.ApiServer] = cacheEnt
151 svcListCacheMtx.Unlock()
153 return kc.loadKeepServers(<-cacheEnt.latest)
156 func (kc *KeepClient) RefreshServiceDiscovery() {
157 svcListCacheMtx.Lock()
158 ent, ok := svcListCache[kc.Arvados.ApiServer]
159 svcListCacheMtx.Unlock()
160 if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
163 ent.clear <- struct{}{}
166 // LoadKeepServicesFromJSON gets list of available keep services from
167 // given JSON and disables automatic service discovery.
168 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
169 kc.disableDiscovery = true
172 dec := json.NewDecoder(strings.NewReader(services))
173 if err := dec.Decode(&list); err != nil {
177 return kc.loadKeepServers(list)
180 func (kc *KeepClient) loadKeepServers(list svcList) error {
181 listed := make(map[string]bool)
182 localRoots := make(map[string]string)
183 gatewayRoots := make(map[string]string)
184 writableLocalRoots := make(map[string]string)
186 // replicasPerService is 1 for disks; unknown or unlimited otherwise
187 kc.replicasPerService = 1
189 for _, service := range list.Items {
194 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
202 localRoots[service.Uuid] = url
203 if service.ReadOnly == false {
204 writableLocalRoots[service.Uuid] = url
205 if service.SvcType != "disk" {
206 kc.replicasPerService = 0
210 if service.SvcType != "disk" {
211 kc.foundNonDiskSvc = true
214 // Gateway services are only used when specified by
215 // UUID, so there's nothing to gain by filtering them
216 // by service type. Including all accessible services
217 // (gateway and otherwise) merely accommodates more
218 // service configurations.
219 gatewayRoots[service.Uuid] = url
222 kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)