Merge branch '21611-log-chunk-delay'
[arvados.git] / sdk / go / keepclient / discover.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "log"
12         "os"
13         "os/signal"
14         "strings"
15         "sync"
16         "syscall"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
20 )
21
22 // RefreshServiceDiscovery clears the Keep service discovery cache.
23 func RefreshServiceDiscovery() {
24         var wg sync.WaitGroup
25         defer wg.Wait()
26         svcListCacheMtx.Lock()
27         defer svcListCacheMtx.Unlock()
28         for _, ent := range svcListCache {
29                 wg.Add(1)
30                 clear := ent.clear
31                 go func() {
32                         clear <- struct{}{}
33                         wg.Done()
34                 }()
35         }
36 }
37
38 // RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls
39 // RefreshServiceDiscovery when SIGHUP is received.
40 func RefreshServiceDiscoveryOnSIGHUP() {
41         svcListCacheMtx.Lock()
42         defer svcListCacheMtx.Unlock()
43         if svcListCacheSignal != nil {
44                 return
45         }
46         svcListCacheSignal = make(chan os.Signal, 1)
47         signal.Notify(svcListCacheSignal, syscall.SIGHUP)
48         go func() {
49                 for range svcListCacheSignal {
50                         RefreshServiceDiscovery()
51                 }
52         }()
53 }
54
55 var (
56         svcListCache       = map[string]cachedSvcList{}
57         svcListCacheSignal chan os.Signal
58         svcListCacheMtx    sync.Mutex
59 )
60
61 type cachedSvcList struct {
62         arv    *arvadosclient.ArvadosClient
63         latest chan svcList
64         clear  chan struct{}
65 }
66
67 // Check for new services list every few minutes. Send the latest list
68 // to the "latest" channel as needed.
69 func (ent *cachedSvcList) poll() {
70         wakeup := make(chan struct{})
71
72         replace := make(chan svcList)
73         go func() {
74                 wakeup <- struct{}{}
75                 current := <-replace
76                 for {
77                         select {
78                         case <-ent.clear:
79                                 wakeup <- struct{}{}
80                                 // Wait here for the next success, in
81                                 // order to avoid returning stale
82                                 // results on the "latest" channel.
83                                 current = <-replace
84                         case current = <-replace:
85                         case ent.latest <- current:
86                         }
87                 }
88         }()
89
90         okDelay := 5 * time.Minute
91         errDelay := 3 * time.Second
92         timer := time.NewTimer(okDelay)
93         for {
94                 select {
95                 case <-timer.C:
96                 case <-wakeup:
97                         if !timer.Stop() {
98                                 // Lost race stopping timer; skip extra firing
99                                 <-timer.C
100                         }
101                 }
102                 var next svcList
103                 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
104                 if err != nil {
105                         if ent.arv.Logger != nil {
106                                 ent.arv.Logger.WithError(err).Warnf("error retrieving services list (retrying in %v)", errDelay)
107                         } else {
108                                 log.Printf("WARNING: Error retrieving services list: %s (retrying in %v)", err, errDelay)
109                         }
110                         timer.Reset(errDelay)
111                         continue
112                 }
113                 replace <- next
114                 timer.Reset(okDelay)
115         }
116 }
117
118 // discoverServices gets the list of available keep services from
119 // the API server.
120 //
121 // If a list of services is provided in the arvadosclient (e.g., from
122 // an environment variable or local config), that list is used
123 // instead.
124 //
125 // If an API call is made, the result is cached for 5 minutes or until
126 // ClearCache() is called, and during this interval it is reused by
127 // other KeepClients that use the same API server host.
128 func (kc *KeepClient) discoverServices() error {
129         if kc.disableDiscovery {
130                 return nil
131         }
132
133         if kc.Arvados.KeepServiceURIs != nil {
134                 kc.disableDiscovery = true
135                 kc.foundNonDiskSvc = true
136                 kc.replicasPerService = 0
137                 roots := make(map[string]string)
138                 for i, uri := range kc.Arvados.KeepServiceURIs {
139                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
140                 }
141                 kc.setServiceRoots(roots, roots, roots)
142                 return nil
143         }
144
145         if kc.Arvados.ApiServer == "" {
146                 return fmt.Errorf("Arvados client is not configured (target API host is not set). Maybe env var ARVADOS_API_HOST should be set first?")
147         }
148
149         svcListCacheMtx.Lock()
150         cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
151         if !ok {
152                 arv := *kc.Arvados
153                 cacheEnt = cachedSvcList{
154                         latest: make(chan svcList),
155                         clear:  make(chan struct{}),
156                         arv:    &arv,
157                 }
158                 go cacheEnt.poll()
159                 svcListCache[kc.Arvados.ApiServer] = cacheEnt
160         }
161         svcListCacheMtx.Unlock()
162
163         select {
164         case <-time.After(time.Minute):
165                 return errors.New("timed out while getting initial list of keep services")
166         case sl := <-cacheEnt.latest:
167                 return kc.loadKeepServers(sl)
168         }
169 }
170
171 func (kc *KeepClient) RefreshServiceDiscovery() {
172         svcListCacheMtx.Lock()
173         ent, ok := svcListCache[kc.Arvados.ApiServer]
174         svcListCacheMtx.Unlock()
175         if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
176                 return
177         }
178         ent.clear <- struct{}{}
179 }
180
181 // LoadKeepServicesFromJSON gets list of available keep services from
182 // given JSON and disables automatic service discovery.
183 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
184         kc.disableDiscovery = true
185
186         var list svcList
187         dec := json.NewDecoder(strings.NewReader(services))
188         if err := dec.Decode(&list); err != nil {
189                 return err
190         }
191
192         return kc.loadKeepServers(list)
193 }
194
195 func (kc *KeepClient) loadKeepServers(list svcList) error {
196         listed := make(map[string]bool)
197         localRoots := make(map[string]string)
198         gatewayRoots := make(map[string]string)
199         writableLocalRoots := make(map[string]string)
200
201         // replicasPerService is 1 for disks; unknown or unlimited otherwise
202         kc.replicasPerService = 1
203
204         for _, service := range list.Items {
205                 scheme := "http"
206                 if service.SSL {
207                         scheme = "https"
208                 }
209                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
210
211                 // Skip duplicates
212                 if listed[url] {
213                         continue
214                 }
215                 listed[url] = true
216
217                 localRoots[service.Uuid] = url
218                 if service.ReadOnly == false {
219                         writableLocalRoots[service.Uuid] = url
220                         if service.SvcType != "disk" {
221                                 kc.replicasPerService = 0
222                         }
223                 }
224
225                 if service.SvcType != "disk" {
226                         kc.foundNonDiskSvc = true
227                 }
228
229                 // Gateway services are only used when specified by
230                 // UUID, so there's nothing to gain by filtering them
231                 // by service type. Including all accessible services
232                 // (gateway and otherwise) merely accommodates more
233                 // service configurations.
234                 gatewayRoots[service.Uuid] = url
235         }
236
237         kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
238         return nil
239 }