Merge branch '17830-reqid-header-propagation-fix' into main. Closes #17830
[arvados.git] / sdk / go / keepclient / discover.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "log"
12         "os"
13         "os/signal"
14         "strings"
15         "sync"
16         "syscall"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
20 )
21
22 // RefreshServiceDiscovery clears the Keep service discovery cache.
23 func RefreshServiceDiscovery() {
24         var wg sync.WaitGroup
25         defer wg.Wait()
26         svcListCacheMtx.Lock()
27         defer svcListCacheMtx.Unlock()
28         for _, ent := range svcListCache {
29                 wg.Add(1)
30                 clear := ent.clear
31                 go func() {
32                         clear <- struct{}{}
33                         wg.Done()
34                 }()
35         }
36 }
37
38 // RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls
39 // RefreshServiceDiscovery when SIGHUP is received.
40 func RefreshServiceDiscoveryOnSIGHUP() {
41         svcListCacheMtx.Lock()
42         defer svcListCacheMtx.Unlock()
43         if svcListCacheSignal != nil {
44                 return
45         }
46         svcListCacheSignal = make(chan os.Signal, 1)
47         signal.Notify(svcListCacheSignal, syscall.SIGHUP)
48         go func() {
49                 for range svcListCacheSignal {
50                         RefreshServiceDiscovery()
51                 }
52         }()
53 }
54
55 var (
56         svcListCache       = map[string]cachedSvcList{}
57         svcListCacheSignal chan os.Signal
58         svcListCacheMtx    sync.Mutex
59 )
60
61 type cachedSvcList struct {
62         arv    *arvadosclient.ArvadosClient
63         latest chan svcList
64         clear  chan struct{}
65 }
66
67 // Check for new services list every few minutes. Send the latest list
68 // to the "latest" channel as needed.
69 func (ent *cachedSvcList) poll() {
70         wakeup := make(chan struct{})
71
72         replace := make(chan svcList)
73         go func() {
74                 wakeup <- struct{}{}
75                 current := <-replace
76                 for {
77                         select {
78                         case <-ent.clear:
79                                 wakeup <- struct{}{}
80                                 // Wait here for the next success, in
81                                 // order to avoid returning stale
82                                 // results on the "latest" channel.
83                                 current = <-replace
84                         case current = <-replace:
85                         case ent.latest <- current:
86                         }
87                 }
88         }()
89
90         okDelay := 5 * time.Minute
91         errDelay := 3 * time.Second
92         timer := time.NewTimer(okDelay)
93         for {
94                 select {
95                 case <-timer.C:
96                 case <-wakeup:
97                         if !timer.Stop() {
98                                 // Lost race stopping timer; skip extra firing
99                                 <-timer.C
100                         }
101                 }
102                 var next svcList
103                 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
104                 if err != nil {
105                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
106                         timer.Reset(errDelay)
107                         continue
108                 }
109                 replace <- next
110                 timer.Reset(okDelay)
111         }
112 }
113
114 // discoverServices gets the list of available keep services from
115 // the API server.
116 //
117 // If a list of services is provided in the arvadosclient (e.g., from
118 // an environment variable or local config), that list is used
119 // instead.
120 //
121 // If an API call is made, the result is cached for 5 minutes or until
122 // ClearCache() is called, and during this interval it is reused by
123 // other KeepClients that use the same API server host.
124 func (kc *KeepClient) discoverServices() error {
125         if kc.disableDiscovery {
126                 return nil
127         }
128
129         if kc.Arvados.KeepServiceURIs != nil {
130                 kc.disableDiscovery = true
131                 kc.foundNonDiskSvc = true
132                 kc.replicasPerService = 0
133                 roots := make(map[string]string)
134                 for i, uri := range kc.Arvados.KeepServiceURIs {
135                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
136                 }
137                 kc.setServiceRoots(roots, roots, roots)
138                 return nil
139         }
140
141         if kc.Arvados.ApiServer == "" {
142                 return fmt.Errorf("Arvados client is not configured (target API host is not set). Maybe env var ARVADOS_API_HOST should be set first?")
143         }
144
145         svcListCacheMtx.Lock()
146         cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
147         if !ok {
148                 arv := *kc.Arvados
149                 cacheEnt = cachedSvcList{
150                         latest: make(chan svcList),
151                         clear:  make(chan struct{}),
152                         arv:    &arv,
153                 }
154                 go cacheEnt.poll()
155                 svcListCache[kc.Arvados.ApiServer] = cacheEnt
156         }
157         svcListCacheMtx.Unlock()
158
159         select {
160         case <-time.After(time.Minute):
161                 return errors.New("timed out while getting initial list of keep services")
162         case sl := <-cacheEnt.latest:
163                 return kc.loadKeepServers(sl)
164         }
165 }
166
167 func (kc *KeepClient) RefreshServiceDiscovery() {
168         svcListCacheMtx.Lock()
169         ent, ok := svcListCache[kc.Arvados.ApiServer]
170         svcListCacheMtx.Unlock()
171         if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
172                 return
173         }
174         ent.clear <- struct{}{}
175 }
176
177 // LoadKeepServicesFromJSON gets list of available keep services from
178 // given JSON and disables automatic service discovery.
179 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
180         kc.disableDiscovery = true
181
182         var list svcList
183         dec := json.NewDecoder(strings.NewReader(services))
184         if err := dec.Decode(&list); err != nil {
185                 return err
186         }
187
188         return kc.loadKeepServers(list)
189 }
190
191 func (kc *KeepClient) loadKeepServers(list svcList) error {
192         listed := make(map[string]bool)
193         localRoots := make(map[string]string)
194         gatewayRoots := make(map[string]string)
195         writableLocalRoots := make(map[string]string)
196
197         // replicasPerService is 1 for disks; unknown or unlimited otherwise
198         kc.replicasPerService = 1
199
200         for _, service := range list.Items {
201                 scheme := "http"
202                 if service.SSL {
203                         scheme = "https"
204                 }
205                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
206
207                 // Skip duplicates
208                 if listed[url] {
209                         continue
210                 }
211                 listed[url] = true
212
213                 localRoots[service.Uuid] = url
214                 if service.ReadOnly == false {
215                         writableLocalRoots[service.Uuid] = url
216                         if service.SvcType != "disk" {
217                                 kc.replicasPerService = 0
218                         }
219                 }
220
221                 if service.SvcType != "disk" {
222                         kc.foundNonDiskSvc = true
223                 }
224
225                 // Gateway services are only used when specified by
226                 // UUID, so there's nothing to gain by filtering them
227                 // by service type. Including all accessible services
228                 // (gateway and otherwise) merely accommodates more
229                 // service configurations.
230                 gatewayRoots[service.Uuid] = url
231         }
232
233         kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
234         return nil
235 }