Merge branch '13994-proxy-remote'
[arvados.git] / sdk / go / keepclient / discover.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "encoding/json"
9         "fmt"
10         "log"
11         "os"
12         "os/signal"
13         "strings"
14         "sync"
15         "syscall"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
19 )
20
21 // ClearCache clears the Keep service discovery cache.
22 func RefreshServiceDiscovery() {
23         svcListCacheMtx.Lock()
24         defer svcListCacheMtx.Unlock()
25         var wg sync.WaitGroup
26         for _, ent := range svcListCache {
27                 wg.Add(1)
28                 go func() {
29                         ent.clear <- struct{}{}
30                         wg.Done()
31                 }()
32         }
33         wg.Wait()
34 }
35
36 // ClearCacheOnSIGHUP installs a signal handler that calls
37 // ClearCache when SIGHUP is received.
38 func RefreshServiceDiscoveryOnSIGHUP() {
39         svcListCacheMtx.Lock()
40         defer svcListCacheMtx.Unlock()
41         if svcListCacheSignal != nil {
42                 return
43         }
44         svcListCacheSignal = make(chan os.Signal, 1)
45         signal.Notify(svcListCacheSignal, syscall.SIGHUP)
46         go func() {
47                 for range svcListCacheSignal {
48                         RefreshServiceDiscovery()
49                 }
50         }()
51 }
52
53 var (
54         svcListCache       = map[string]cachedSvcList{}
55         svcListCacheSignal chan os.Signal
56         svcListCacheMtx    sync.Mutex
57 )
58
59 type cachedSvcList struct {
60         arv    *arvadosclient.ArvadosClient
61         latest chan svcList
62         clear  chan struct{}
63 }
64
65 // Check for new services list every few minutes. Send the latest list
66 // to the "latest" channel as needed.
67 func (ent *cachedSvcList) poll() {
68         wakeup := make(chan struct{})
69
70         replace := make(chan svcList)
71         go func() {
72                 wakeup <- struct{}{}
73                 current := <-replace
74                 for {
75                         select {
76                         case <-ent.clear:
77                                 wakeup <- struct{}{}
78                                 // Wait here for the next success, in
79                                 // order to avoid returning stale
80                                 // results on the "latest" channel.
81                                 current = <-replace
82                         case current = <-replace:
83                         case ent.latest <- current:
84                         }
85                 }
86         }()
87
88         okDelay := 5 * time.Minute
89         errDelay := 3 * time.Second
90         timer := time.NewTimer(okDelay)
91         for {
92                 select {
93                 case <-timer.C:
94                 case <-wakeup:
95                         if !timer.Stop() {
96                                 // Lost race stopping timer; skip extra firing
97                                 <-timer.C
98                         }
99                 }
100                 var next svcList
101                 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
102                 if err != nil {
103                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
104                         timer.Reset(errDelay)
105                         continue
106                 }
107                 replace <- next
108                 timer.Reset(okDelay)
109         }
110 }
111
112 // discoverServices gets the list of available keep services from
113 // the API server.
114 //
115 // If a list of services is provided in the arvadosclient (e.g., from
116 // an environment variable or local config), that list is used
117 // instead.
118 //
119 // If an API call is made, the result is cached for 5 minutes or until
120 // ClearCache() is called, and during this interval it is reused by
121 // other KeepClients that use the same API server host.
122 func (kc *KeepClient) discoverServices() error {
123         if kc.disableDiscovery {
124                 return nil
125         }
126
127         if kc.Arvados.KeepServiceURIs != nil {
128                 kc.disableDiscovery = true
129                 kc.foundNonDiskSvc = true
130                 kc.replicasPerService = 0
131                 roots := make(map[string]string)
132                 for i, uri := range kc.Arvados.KeepServiceURIs {
133                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
134                 }
135                 kc.setServiceRoots(roots, roots, roots)
136                 return nil
137         }
138
139         svcListCacheMtx.Lock()
140         cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
141         if !ok {
142                 arv := *kc.Arvados
143                 cacheEnt = cachedSvcList{
144                         latest: make(chan svcList),
145                         clear:  make(chan struct{}),
146                         arv:    &arv,
147                 }
148                 go cacheEnt.poll()
149                 svcListCache[kc.Arvados.ApiServer] = cacheEnt
150         }
151         svcListCacheMtx.Unlock()
152
153         return kc.loadKeepServers(<-cacheEnt.latest)
154 }
155
156 // LoadKeepServicesFromJSON gets list of available keep services from
157 // given JSON and disables automatic service discovery.
158 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
159         kc.disableDiscovery = true
160
161         var list svcList
162         dec := json.NewDecoder(strings.NewReader(services))
163         if err := dec.Decode(&list); err != nil {
164                 return err
165         }
166
167         return kc.loadKeepServers(list)
168 }
169
170 func (kc *KeepClient) loadKeepServers(list svcList) error {
171         listed := make(map[string]bool)
172         localRoots := make(map[string]string)
173         gatewayRoots := make(map[string]string)
174         writableLocalRoots := make(map[string]string)
175
176         // replicasPerService is 1 for disks; unknown or unlimited otherwise
177         kc.replicasPerService = 1
178
179         for _, service := range list.Items {
180                 scheme := "http"
181                 if service.SSL {
182                         scheme = "https"
183                 }
184                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
185
186                 // Skip duplicates
187                 if listed[url] {
188                         continue
189                 }
190                 listed[url] = true
191
192                 localRoots[service.Uuid] = url
193                 if service.ReadOnly == false {
194                         writableLocalRoots[service.Uuid] = url
195                         if service.SvcType != "disk" {
196                                 kc.replicasPerService = 0
197                         }
198                 }
199
200                 if service.SvcType != "disk" {
201                         kc.foundNonDiskSvc = true
202                 }
203
204                 // Gateway services are only used when specified by
205                 // UUID, so there's nothing to gain by filtering them
206                 // by service type. Including all accessible services
207                 // (gateway and otherwise) merely accommodates more
208                 // service configurations.
209                 gatewayRoots[service.Uuid] = url
210         }
211
212         kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
213         return nil
214 }