Merge branch 'patch-1' of https://github.com/mr-c/arvados into mr-c-patch-1
[arvados.git] / sdk / go / keepclient / discover.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "log"
12         "os"
13         "os/signal"
14         "strings"
15         "sync"
16         "syscall"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
20 )
21
22 // RefreshServiceDiscovery clears the Keep service discovery cache.
23 func RefreshServiceDiscovery() {
24         var wg sync.WaitGroup
25         defer wg.Wait()
26         svcListCacheMtx.Lock()
27         defer svcListCacheMtx.Unlock()
28         for _, ent := range svcListCache {
29                 wg.Add(1)
30                 clear := ent.clear
31                 go func() {
32                         clear <- struct{}{}
33                         wg.Done()
34                 }()
35         }
36 }
37
38 // RefreshServiceDiscoveryOnSIGHUP installs a signal handler that calls
39 // RefreshServiceDiscovery when SIGHUP is received.
40 func RefreshServiceDiscoveryOnSIGHUP() {
41         svcListCacheMtx.Lock()
42         defer svcListCacheMtx.Unlock()
43         if svcListCacheSignal != nil {
44                 return
45         }
46         svcListCacheSignal = make(chan os.Signal, 1)
47         signal.Notify(svcListCacheSignal, syscall.SIGHUP)
48         go func() {
49                 for range svcListCacheSignal {
50                         RefreshServiceDiscovery()
51                 }
52         }()
53 }
54
55 var (
56         svcListCache       = map[string]cachedSvcList{}
57         svcListCacheSignal chan os.Signal
58         svcListCacheMtx    sync.Mutex
59 )
60
61 type cachedSvcList struct {
62         arv    *arvadosclient.ArvadosClient
63         latest chan svcList
64         clear  chan struct{}
65 }
66
67 // Check for new services list every few minutes. Send the latest list
68 // to the "latest" channel as needed.
69 func (ent *cachedSvcList) poll() {
70         wakeup := make(chan struct{})
71
72         replace := make(chan svcList)
73         go func() {
74                 wakeup <- struct{}{}
75                 current := <-replace
76                 for {
77                         select {
78                         case <-ent.clear:
79                                 wakeup <- struct{}{}
80                                 // Wait here for the next success, in
81                                 // order to avoid returning stale
82                                 // results on the "latest" channel.
83                                 current = <-replace
84                         case current = <-replace:
85                         case ent.latest <- current:
86                         }
87                 }
88         }()
89
90         okDelay := 5 * time.Minute
91         errDelay := 3 * time.Second
92         timer := time.NewTimer(okDelay)
93         for {
94                 select {
95                 case <-timer.C:
96                 case <-wakeup:
97                         if !timer.Stop() {
98                                 // Lost race stopping timer; skip extra firing
99                                 <-timer.C
100                         }
101                 }
102                 var next svcList
103                 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
104                 if err != nil {
105                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
106                         timer.Reset(errDelay)
107                         continue
108                 }
109                 replace <- next
110                 timer.Reset(okDelay)
111         }
112 }
113
114 // discoverServices gets the list of available keep services from
115 // the API server.
116 //
117 // If a list of services is provided in the arvadosclient (e.g., from
118 // an environment variable or local config), that list is used
119 // instead.
120 //
121 // If an API call is made, the result is cached for 5 minutes or until
122 // ClearCache() is called, and during this interval it is reused by
123 // other KeepClients that use the same API server host.
124 func (kc *KeepClient) discoverServices() error {
125         if kc.disableDiscovery {
126                 return nil
127         }
128
129         if kc.Arvados.KeepServiceURIs != nil {
130                 kc.disableDiscovery = true
131                 kc.foundNonDiskSvc = true
132                 kc.replicasPerService = 0
133                 roots := make(map[string]string)
134                 for i, uri := range kc.Arvados.KeepServiceURIs {
135                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
136                 }
137                 kc.setServiceRoots(roots, roots, roots)
138                 return nil
139         }
140
141         svcListCacheMtx.Lock()
142         cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
143         if !ok {
144                 arv := *kc.Arvados
145                 cacheEnt = cachedSvcList{
146                         latest: make(chan svcList),
147                         clear:  make(chan struct{}),
148                         arv:    &arv,
149                 }
150                 go cacheEnt.poll()
151                 svcListCache[kc.Arvados.ApiServer] = cacheEnt
152         }
153         svcListCacheMtx.Unlock()
154
155         select {
156         case <-time.After(time.Minute):
157                 return errors.New("timed out while getting initial list of keep services")
158         case sl := <-cacheEnt.latest:
159                 return kc.loadKeepServers(sl)
160         }
161 }
162
163 func (kc *KeepClient) RefreshServiceDiscovery() {
164         svcListCacheMtx.Lock()
165         ent, ok := svcListCache[kc.Arvados.ApiServer]
166         svcListCacheMtx.Unlock()
167         if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
168                 return
169         }
170         ent.clear <- struct{}{}
171 }
172
173 // LoadKeepServicesFromJSON gets list of available keep services from
174 // given JSON and disables automatic service discovery.
175 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
176         kc.disableDiscovery = true
177
178         var list svcList
179         dec := json.NewDecoder(strings.NewReader(services))
180         if err := dec.Decode(&list); err != nil {
181                 return err
182         }
183
184         return kc.loadKeepServers(list)
185 }
186
187 func (kc *KeepClient) loadKeepServers(list svcList) error {
188         listed := make(map[string]bool)
189         localRoots := make(map[string]string)
190         gatewayRoots := make(map[string]string)
191         writableLocalRoots := make(map[string]string)
192
193         // replicasPerService is 1 for disks; unknown or unlimited otherwise
194         kc.replicasPerService = 1
195
196         for _, service := range list.Items {
197                 scheme := "http"
198                 if service.SSL {
199                         scheme = "https"
200                 }
201                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
202
203                 // Skip duplicates
204                 if listed[url] {
205                         continue
206                 }
207                 listed[url] = true
208
209                 localRoots[service.Uuid] = url
210                 if service.ReadOnly == false {
211                         writableLocalRoots[service.Uuid] = url
212                         if service.SvcType != "disk" {
213                                 kc.replicasPerService = 0
214                         }
215                 }
216
217                 if service.SvcType != "disk" {
218                         kc.foundNonDiskSvc = true
219                 }
220
221                 // Gateway services are only used when specified by
222                 // UUID, so there's nothing to gain by filtering them
223                 // by service type. Including all accessible services
224                 // (gateway and otherwise) merely accommodates more
225                 // service configurations.
226                 gatewayRoots[service.Uuid] = url
227         }
228
229         kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
230         return nil
231 }