1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
20 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
23 // RefreshServiceDiscovery clears the Keep service discovery cache.
24 func RefreshServiceDiscovery() {
27 svcListCacheMtx.Lock()
28 defer svcListCacheMtx.Unlock()
29 for _, ent := range svcListCache {
39 // RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls
40 // RefreshServiceDiscovery when SIGHUP is received.
41 func RefreshServiceDiscoveryOnSIGHUP() {
42 svcListCacheMtx.Lock()
43 defer svcListCacheMtx.Unlock()
44 if svcListCacheSignal != nil {
47 svcListCacheSignal = make(chan os.Signal, 1)
48 signal.Notify(svcListCacheSignal, syscall.SIGHUP)
50 for range svcListCacheSignal {
51 RefreshServiceDiscovery()
57 svcListCache = map[string]cachedSvcList{}
58 svcListCacheSignal chan os.Signal
59 svcListCacheMtx sync.Mutex
62 type cachedSvcList struct {
63 arv *arvadosclient.ArvadosClient
68 // Check for new services list every few minutes. Send the latest list
69 // to the "latest" channel as needed.
70 func (ent *cachedSvcList) poll() {
71 wakeup := make(chan struct{})
73 replace := make(chan svcList)
81 // Wait here for the next success, in
82 // order to avoid returning stale
83 // results on the "latest" channel.
85 case current = <-replace:
86 case ent.latest <- current:
91 okDelay := 5 * time.Minute
92 errDelay := 3 * time.Second
93 timer := time.NewTimer(okDelay)
99 // Lost race stopping timer; skip extra firing
104 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
106 if ent.arv.Logger != nil {
107 ent.arv.Logger.WithError(err).Warnf("error retrieving services list (retrying in %v)", errDelay)
109 log.Printf("WARNING: Error retrieving services list: %s (retrying in %v)", err, errDelay)
111 timer.Reset(errDelay)
119 // discoverServices gets the list of available keep services from
122 // If a list of services is provided in the arvadosclient (e.g., from
123 // an environment variable or local config), that list is used
126 // If an API call is made, the result is cached for 5 minutes or until
127 // ClearCache() is called, and during this interval it is reused by
128 // other KeepClients that use the same API server host.
129 func (kc *KeepClient) discoverServices() error {
130 if kc.disableDiscovery {
134 if kc.Arvados.KeepServiceURIs != nil {
135 kc.disableDiscovery = true
136 kc.foundNonDiskSvc = true
137 kc.replicasPerService = 0
138 roots := make(map[string]string)
139 for i, uri := range kc.Arvados.KeepServiceURIs {
140 roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
142 kc.setServiceRoots(roots, roots, roots)
146 if kc.Arvados.Cluster != nil && os.Getenv("ARVADOS_USE_KEEP_ACCESSIBLE_API") == "" {
147 kc.disableDiscovery = true
148 roots := make(map[string]string)
149 for url, info := range kc.Arvados.Cluster.Services.Keepstore.InternalURLs {
150 rvz := info.Rendezvous
154 // If info.Rendezvous is 15 ascii alphanums,
155 // we use it verbatim as the last 15 chars of
156 // the UUID. Otherwise, we hash
157 // info.Rendezvous (or, if empty, the URL) and
158 // use the first 15 chars of the hash as the
159 // last 15 chars of the UUID. This matches the
161 // services/api/app/models/keep_service.rb.
162 rvzhash := len(rvz) != 15
163 for i := 0; i < len(rvz) && !rvzhash; i++ {
164 rvzhash = !(rvz[i] >= '0' && rvz[i] <= '9' ||
165 rvz[i] >= 'a' && rvz[i] <= 'z' ||
166 rvz[i] >= 'A' && rvz[i] <= 'Z')
169 rvz = fmt.Sprintf("%x", md5.Sum([]byte(rvz)))[:15]
171 uuid := kc.Arvados.Cluster.ClusterID + "-bi6l4-" + rvz
172 roots[uuid] = strings.TrimSuffix(url.String(), "/")
174 kc.setServiceRoots(roots, roots, nil)
178 if kc.Arvados.ApiServer == "" {
179 return fmt.Errorf("Arvados client is not configured (target API host is not set). Maybe env var ARVADOS_API_HOST should be set first?")
182 svcListCacheMtx.Lock()
183 cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
186 cacheEnt = cachedSvcList{
187 latest: make(chan svcList),
188 clear: make(chan struct{}),
192 svcListCache[kc.Arvados.ApiServer] = cacheEnt
194 svcListCacheMtx.Unlock()
197 case <-time.After(time.Minute):
198 return errors.New("timed out while getting initial list of keep services")
199 case sl := <-cacheEnt.latest:
200 return kc.loadKeepServers(sl)
204 func (kc *KeepClient) RefreshServiceDiscovery() {
205 svcListCacheMtx.Lock()
206 ent, ok := svcListCache[kc.Arvados.ApiServer]
207 svcListCacheMtx.Unlock()
208 if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
211 ent.clear <- struct{}{}
214 // LoadKeepServicesFromJSON gets list of available keep services from
215 // given JSON and disables automatic service discovery.
216 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
217 kc.disableDiscovery = true
220 dec := json.NewDecoder(strings.NewReader(services))
221 if err := dec.Decode(&list); err != nil {
225 return kc.loadKeepServers(list)
228 func (kc *KeepClient) loadKeepServers(list svcList) error {
229 listed := make(map[string]bool)
230 localRoots := make(map[string]string)
231 gatewayRoots := make(map[string]string)
232 writableLocalRoots := make(map[string]string)
234 // replicasPerService is 1 for disks; unknown or unlimited otherwise
235 kc.replicasPerService = 1
237 for _, service := range list.Items {
242 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
250 localRoots[service.Uuid] = url
251 if service.ReadOnly == false {
252 writableLocalRoots[service.Uuid] = url
253 if service.SvcType != "disk" {
254 kc.replicasPerService = 0
258 if service.SvcType != "disk" {
259 kc.foundNonDiskSvc = true
262 // Gateway services are only used when specified by
263 // UUID, so there's nothing to gain by filtering them
264 // by service type. Including all accessible services
265 // (gateway and otherwise) merely accommodates more
266 // service configurations.
267 gatewayRoots[service.Uuid] = url
270 kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)