21842: set suggestions to stay open on select
[arvados.git] / sdk / go / keepclient / discover.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "crypto/md5"
9         "encoding/json"
10         "errors"
11         "fmt"
12         "log"
13         "os"
14         "os/signal"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
21 )
22
23 // RefreshServiceDiscovery clears the Keep service discovery cache.
24 func RefreshServiceDiscovery() {
25         var wg sync.WaitGroup
26         defer wg.Wait()
27         svcListCacheMtx.Lock()
28         defer svcListCacheMtx.Unlock()
29         for _, ent := range svcListCache {
30                 wg.Add(1)
31                 clear := ent.clear
32                 go func() {
33                         clear <- struct{}{}
34                         wg.Done()
35                 }()
36         }
37 }
38
39 // RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls
40 // RefreshServiceDiscovery when SIGHUP is received.
41 func RefreshServiceDiscoveryOnSIGHUP() {
42         svcListCacheMtx.Lock()
43         defer svcListCacheMtx.Unlock()
44         if svcListCacheSignal != nil {
45                 return
46         }
47         svcListCacheSignal = make(chan os.Signal, 1)
48         signal.Notify(svcListCacheSignal, syscall.SIGHUP)
49         go func() {
50                 for range svcListCacheSignal {
51                         RefreshServiceDiscovery()
52                 }
53         }()
54 }
55
56 var (
57         svcListCache       = map[string]cachedSvcList{}
58         svcListCacheSignal chan os.Signal
59         svcListCacheMtx    sync.Mutex
60 )
61
62 type cachedSvcList struct {
63         arv    *arvadosclient.ArvadosClient
64         latest chan svcList
65         clear  chan struct{}
66 }
67
68 // Check for new services list every few minutes. Send the latest list
69 // to the "latest" channel as needed.
70 func (ent *cachedSvcList) poll() {
71         wakeup := make(chan struct{})
72
73         replace := make(chan svcList)
74         go func() {
75                 wakeup <- struct{}{}
76                 current := <-replace
77                 for {
78                         select {
79                         case <-ent.clear:
80                                 wakeup <- struct{}{}
81                                 // Wait here for the next success, in
82                                 // order to avoid returning stale
83                                 // results on the "latest" channel.
84                                 current = <-replace
85                         case current = <-replace:
86                         case ent.latest <- current:
87                         }
88                 }
89         }()
90
91         okDelay := 5 * time.Minute
92         errDelay := 3 * time.Second
93         timer := time.NewTimer(okDelay)
94         for {
95                 select {
96                 case <-timer.C:
97                 case <-wakeup:
98                         if !timer.Stop() {
99                                 // Lost race stopping timer; skip extra firing
100                                 <-timer.C
101                         }
102                 }
103                 var next svcList
104                 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
105                 if err != nil {
106                         if ent.arv.Logger != nil {
107                                 ent.arv.Logger.WithError(err).Warnf("error retrieving services list (retrying in %v)", errDelay)
108                         } else {
109                                 log.Printf("WARNING: Error retrieving services list: %s (retrying in %v)", err, errDelay)
110                         }
111                         timer.Reset(errDelay)
112                         continue
113                 }
114                 replace <- next
115                 timer.Reset(okDelay)
116         }
117 }
118
119 // discoverServices gets the list of available keep services from
120 // the API server.
121 //
122 // If a list of services is provided in the arvadosclient (e.g., from
123 // an environment variable or local config), that list is used
124 // instead.
125 //
126 // If an API call is made, the result is cached for 5 minutes or until
127 // ClearCache() is called, and during this interval it is reused by
128 // other KeepClients that use the same API server host.
129 func (kc *KeepClient) discoverServices() error {
130         if kc.disableDiscovery {
131                 return nil
132         }
133
134         if kc.Arvados.KeepServiceURIs != nil {
135                 kc.disableDiscovery = true
136                 kc.foundNonDiskSvc = true
137                 kc.replicasPerService = 0
138                 roots := make(map[string]string)
139                 for i, uri := range kc.Arvados.KeepServiceURIs {
140                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
141                 }
142                 kc.setServiceRoots(roots, roots, roots)
143                 return nil
144         }
145
146         if kc.Arvados.Cluster != nil && os.Getenv("ARVADOS_USE_KEEP_ACCESSIBLE_API") == "" {
147                 kc.disableDiscovery = true
148                 roots := make(map[string]string)
149                 for url, info := range kc.Arvados.Cluster.Services.Keepstore.InternalURLs {
150                         rvz := info.Rendezvous
151                         if rvz == "" {
152                                 rvz = url.String()
153                         }
154                         // If info.Rendezvous is 15 ascii alphanums,
155                         // we use it verbatim as the last 15 chars of
156                         // the UUID. Otherwise, we hash
157                         // info.Rendezvous (or, if empty, the URL) and
158                         // use the first 15 chars of the hash as the
159                         // last 15 chars of the UUID. This matches the
160                         // behavior of
161                         // services/api/app/models/keep_service.rb.
162                         rvzhash := len(rvz) != 15
163                         for i := 0; i < len(rvz) && !rvzhash; i++ {
164                                 rvzhash = !(rvz[i] >= '0' && rvz[i] <= '9' ||
165                                         rvz[i] >= 'a' && rvz[i] <= 'z' ||
166                                         rvz[i] >= 'A' && rvz[i] <= 'Z')
167                         }
168                         if rvzhash {
169                                 rvz = fmt.Sprintf("%x", md5.Sum([]byte(rvz)))[:15]
170                         }
171                         uuid := kc.Arvados.Cluster.ClusterID + "-bi6l4-" + rvz
172                         roots[uuid] = strings.TrimSuffix(url.String(), "/")
173                 }
174                 kc.setServiceRoots(roots, roots, nil)
175                 return nil
176         }
177
178         if kc.Arvados.ApiServer == "" {
179                 return fmt.Errorf("Arvados client is not configured (target API host is not set). Maybe env var ARVADOS_API_HOST should be set first?")
180         }
181
182         svcListCacheMtx.Lock()
183         cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
184         if !ok {
185                 arv := *kc.Arvados
186                 cacheEnt = cachedSvcList{
187                         latest: make(chan svcList),
188                         clear:  make(chan struct{}),
189                         arv:    &arv,
190                 }
191                 go cacheEnt.poll()
192                 svcListCache[kc.Arvados.ApiServer] = cacheEnt
193         }
194         svcListCacheMtx.Unlock()
195
196         select {
197         case <-time.After(time.Minute):
198                 return errors.New("timed out while getting initial list of keep services")
199         case sl := <-cacheEnt.latest:
200                 return kc.loadKeepServers(sl)
201         }
202 }
203
204 func (kc *KeepClient) RefreshServiceDiscovery() {
205         svcListCacheMtx.Lock()
206         ent, ok := svcListCache[kc.Arvados.ApiServer]
207         svcListCacheMtx.Unlock()
208         if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
209                 return
210         }
211         ent.clear <- struct{}{}
212 }
213
214 // LoadKeepServicesFromJSON gets list of available keep services from
215 // given JSON and disables automatic service discovery.
216 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
217         kc.disableDiscovery = true
218
219         var list svcList
220         dec := json.NewDecoder(strings.NewReader(services))
221         if err := dec.Decode(&list); err != nil {
222                 return err
223         }
224
225         return kc.loadKeepServers(list)
226 }
227
228 func (kc *KeepClient) loadKeepServers(list svcList) error {
229         listed := make(map[string]bool)
230         localRoots := make(map[string]string)
231         gatewayRoots := make(map[string]string)
232         writableLocalRoots := make(map[string]string)
233
234         // replicasPerService is 1 for disks; unknown or unlimited otherwise
235         kc.replicasPerService = 1
236
237         for _, service := range list.Items {
238                 scheme := "http"
239                 if service.SSL {
240                         scheme = "https"
241                 }
242                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
243
244                 // Skip duplicates
245                 if listed[url] {
246                         continue
247                 }
248                 listed[url] = true
249
250                 localRoots[service.Uuid] = url
251                 if service.ReadOnly == false {
252                         writableLocalRoots[service.Uuid] = url
253                         if service.SvcType != "disk" {
254                                 kc.replicasPerService = 0
255                         }
256                 }
257
258                 if service.SvcType != "disk" {
259                         kc.foundNonDiskSvc = true
260                 }
261
262                 // Gateway services are only used when specified by
263                 // UUID, so there's nothing to gain by filtering them
264                 // by service type. Including all accessible services
265                 // (gateway and otherwise) merely accommodates more
266                 // service configurations.
267                 gatewayRoots[service.Uuid] = url
268         }
269
270         kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
271         return nil
272 }