Add 'sdk/java-v2/' from commit '55f103e336ca9fb8bf1720d2ef4ee8dd4e221118'
[arvados.git] / sdk / go / keepclient / discover.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "encoding/json"
9         "fmt"
10         "log"
11         "os"
12         "os/signal"
13         "strings"
14         "sync"
15         "syscall"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
19 )
20
21 // ClearCache clears the Keep service discovery cache.
22 func RefreshServiceDiscovery() {
23         var wg sync.WaitGroup
24         defer wg.Wait()
25         svcListCacheMtx.Lock()
26         defer svcListCacheMtx.Unlock()
27         for _, ent := range svcListCache {
28                 wg.Add(1)
29                 go func() {
30                         ent.clear <- struct{}{}
31                         wg.Done()
32                 }()
33         }
34 }
35
36 // ClearCacheOnSIGHUP installs a signal handler that calls
37 // ClearCache when SIGHUP is received.
38 func RefreshServiceDiscoveryOnSIGHUP() {
39         svcListCacheMtx.Lock()
40         defer svcListCacheMtx.Unlock()
41         if svcListCacheSignal != nil {
42                 return
43         }
44         svcListCacheSignal = make(chan os.Signal, 1)
45         signal.Notify(svcListCacheSignal, syscall.SIGHUP)
46         go func() {
47                 for range svcListCacheSignal {
48                         RefreshServiceDiscovery()
49                 }
50         }()
51 }
52
53 var (
54         svcListCache       = map[string]cachedSvcList{}
55         svcListCacheSignal chan os.Signal
56         svcListCacheMtx    sync.Mutex
57 )
58
59 type cachedSvcList struct {
60         arv    *arvadosclient.ArvadosClient
61         latest chan svcList
62         clear  chan struct{}
63 }
64
65 // Check for new services list every few minutes. Send the latest list
66 // to the "latest" channel as needed.
67 func (ent *cachedSvcList) poll() {
68         wakeup := make(chan struct{})
69
70         replace := make(chan svcList)
71         go func() {
72                 wakeup <- struct{}{}
73                 current := <-replace
74                 for {
75                         select {
76                         case <-ent.clear:
77                                 wakeup <- struct{}{}
78                                 // Wait here for the next success, in
79                                 // order to avoid returning stale
80                                 // results on the "latest" channel.
81                                 current = <-replace
82                         case current = <-replace:
83                         case ent.latest <- current:
84                         }
85                 }
86         }()
87
88         okDelay := 5 * time.Minute
89         errDelay := 3 * time.Second
90         timer := time.NewTimer(okDelay)
91         for {
92                 select {
93                 case <-timer.C:
94                 case <-wakeup:
95                         if !timer.Stop() {
96                                 // Lost race stopping timer; skip extra firing
97                                 <-timer.C
98                         }
99                 }
100                 var next svcList
101                 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
102                 if err != nil {
103                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
104                         timer.Reset(errDelay)
105                         continue
106                 }
107                 replace <- next
108                 timer.Reset(okDelay)
109         }
110 }
111
112 // discoverServices gets the list of available keep services from
113 // the API server.
114 //
115 // If a list of services is provided in the arvadosclient (e.g., from
116 // an environment variable or local config), that list is used
117 // instead.
118 //
119 // If an API call is made, the result is cached for 5 minutes or until
120 // ClearCache() is called, and during this interval it is reused by
121 // other KeepClients that use the same API server host.
122 func (kc *KeepClient) discoverServices() error {
123         if kc.disableDiscovery {
124                 return nil
125         }
126
127         if kc.Arvados.KeepServiceURIs != nil {
128                 kc.disableDiscovery = true
129                 kc.foundNonDiskSvc = true
130                 kc.replicasPerService = 0
131                 roots := make(map[string]string)
132                 for i, uri := range kc.Arvados.KeepServiceURIs {
133                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
134                 }
135                 kc.setServiceRoots(roots, roots, roots)
136                 return nil
137         }
138
139         svcListCacheMtx.Lock()
140         cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
141         if !ok {
142                 arv := *kc.Arvados
143                 cacheEnt = cachedSvcList{
144                         latest: make(chan svcList),
145                         clear:  make(chan struct{}),
146                         arv:    &arv,
147                 }
148                 go cacheEnt.poll()
149                 svcListCache[kc.Arvados.ApiServer] = cacheEnt
150         }
151         svcListCacheMtx.Unlock()
152
153         return kc.loadKeepServers(<-cacheEnt.latest)
154 }
155
156 func (kc *KeepClient) RefreshServiceDiscovery() {
157         svcListCacheMtx.Lock()
158         ent, ok := svcListCache[kc.Arvados.ApiServer]
159         svcListCacheMtx.Unlock()
160         if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
161                 return
162         }
163         ent.clear <- struct{}{}
164 }
165
166 // LoadKeepServicesFromJSON gets list of available keep services from
167 // given JSON and disables automatic service discovery.
168 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
169         kc.disableDiscovery = true
170
171         var list svcList
172         dec := json.NewDecoder(strings.NewReader(services))
173         if err := dec.Decode(&list); err != nil {
174                 return err
175         }
176
177         return kc.loadKeepServers(list)
178 }
179
180 func (kc *KeepClient) loadKeepServers(list svcList) error {
181         listed := make(map[string]bool)
182         localRoots := make(map[string]string)
183         gatewayRoots := make(map[string]string)
184         writableLocalRoots := make(map[string]string)
185
186         // replicasPerService is 1 for disks; unknown or unlimited otherwise
187         kc.replicasPerService = 1
188
189         for _, service := range list.Items {
190                 scheme := "http"
191                 if service.SSL {
192                         scheme = "https"
193                 }
194                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
195
196                 // Skip duplicates
197                 if listed[url] {
198                         continue
199                 }
200                 listed[url] = true
201
202                 localRoots[service.Uuid] = url
203                 if service.ReadOnly == false {
204                         writableLocalRoots[service.Uuid] = url
205                         if service.SvcType != "disk" {
206                                 kc.replicasPerService = 0
207                         }
208                 }
209
210                 if service.SvcType != "disk" {
211                         kc.foundNonDiskSvc = true
212                 }
213
214                 // Gateway services are only used when specified by
215                 // UUID, so there's nothing to gain by filtering them
216                 // by service type. Including all accessible services
217                 // (gateway and otherwise) merely accommodates more
218                 // service configurations.
219                 gatewayRoots[service.Uuid] = url
220         }
221
222         kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
223         return nil
224 }