15652: Eliminate a malloc when writing a one-segment block.
[arvados.git] / sdk / go / keepclient / discover.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "log"
12         "os"
13         "os/signal"
14         "strings"
15         "sync"
16         "syscall"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
20 )
21
22 // ClearCache clears the Keep service discovery cache.
23 func RefreshServiceDiscovery() {
24         var wg sync.WaitGroup
25         defer wg.Wait()
26         svcListCacheMtx.Lock()
27         defer svcListCacheMtx.Unlock()
28         for _, ent := range svcListCache {
29                 wg.Add(1)
30                 go func() {
31                         ent.clear <- struct{}{}
32                         wg.Done()
33                 }()
34         }
35 }
36
37 // ClearCacheOnSIGHUP installs a signal handler that calls
38 // ClearCache when SIGHUP is received.
39 func RefreshServiceDiscoveryOnSIGHUP() {
40         svcListCacheMtx.Lock()
41         defer svcListCacheMtx.Unlock()
42         if svcListCacheSignal != nil {
43                 return
44         }
45         svcListCacheSignal = make(chan os.Signal, 1)
46         signal.Notify(svcListCacheSignal, syscall.SIGHUP)
47         go func() {
48                 for range svcListCacheSignal {
49                         RefreshServiceDiscovery()
50                 }
51         }()
52 }
53
54 var (
55         svcListCache       = map[string]cachedSvcList{}
56         svcListCacheSignal chan os.Signal
57         svcListCacheMtx    sync.Mutex
58 )
59
60 type cachedSvcList struct {
61         arv    *arvadosclient.ArvadosClient
62         latest chan svcList
63         clear  chan struct{}
64 }
65
66 // Check for new services list every few minutes. Send the latest list
67 // to the "latest" channel as needed.
68 func (ent *cachedSvcList) poll() {
69         wakeup := make(chan struct{})
70
71         replace := make(chan svcList)
72         go func() {
73                 wakeup <- struct{}{}
74                 current := <-replace
75                 for {
76                         select {
77                         case <-ent.clear:
78                                 wakeup <- struct{}{}
79                                 // Wait here for the next success, in
80                                 // order to avoid returning stale
81                                 // results on the "latest" channel.
82                                 current = <-replace
83                         case current = <-replace:
84                         case ent.latest <- current:
85                         }
86                 }
87         }()
88
89         okDelay := 5 * time.Minute
90         errDelay := 3 * time.Second
91         timer := time.NewTimer(okDelay)
92         for {
93                 select {
94                 case <-timer.C:
95                 case <-wakeup:
96                         if !timer.Stop() {
97                                 // Lost race stopping timer; skip extra firing
98                                 <-timer.C
99                         }
100                 }
101                 var next svcList
102                 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
103                 if err != nil {
104                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
105                         timer.Reset(errDelay)
106                         continue
107                 }
108                 replace <- next
109                 timer.Reset(okDelay)
110         }
111 }
112
113 // discoverServices gets the list of available keep services from
114 // the API server.
115 //
116 // If a list of services is provided in the arvadosclient (e.g., from
117 // an environment variable or local config), that list is used
118 // instead.
119 //
120 // If an API call is made, the result is cached for 5 minutes or until
121 // ClearCache() is called, and during this interval it is reused by
122 // other KeepClients that use the same API server host.
123 func (kc *KeepClient) discoverServices() error {
124         if kc.disableDiscovery {
125                 return nil
126         }
127
128         if kc.Arvados.KeepServiceURIs != nil {
129                 kc.disableDiscovery = true
130                 kc.foundNonDiskSvc = true
131                 kc.replicasPerService = 0
132                 roots := make(map[string]string)
133                 for i, uri := range kc.Arvados.KeepServiceURIs {
134                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
135                 }
136                 kc.setServiceRoots(roots, roots, roots)
137                 return nil
138         }
139
140         svcListCacheMtx.Lock()
141         cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
142         if !ok {
143                 arv := *kc.Arvados
144                 cacheEnt = cachedSvcList{
145                         latest: make(chan svcList),
146                         clear:  make(chan struct{}),
147                         arv:    &arv,
148                 }
149                 go cacheEnt.poll()
150                 svcListCache[kc.Arvados.ApiServer] = cacheEnt
151         }
152         svcListCacheMtx.Unlock()
153
154         select {
155         case <-time.After(time.Minute):
156                 return errors.New("timed out while getting initial list of keep services")
157         case sl := <-cacheEnt.latest:
158                 return kc.loadKeepServers(sl)
159         }
160 }
161
162 func (kc *KeepClient) RefreshServiceDiscovery() {
163         svcListCacheMtx.Lock()
164         ent, ok := svcListCache[kc.Arvados.ApiServer]
165         svcListCacheMtx.Unlock()
166         if !ok || kc.Arvados.KeepServiceURIs != nil || kc.disableDiscovery {
167                 return
168         }
169         ent.clear <- struct{}{}
170 }
171
172 // LoadKeepServicesFromJSON gets list of available keep services from
173 // given JSON and disables automatic service discovery.
174 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
175         kc.disableDiscovery = true
176
177         var list svcList
178         dec := json.NewDecoder(strings.NewReader(services))
179         if err := dec.Decode(&list); err != nil {
180                 return err
181         }
182
183         return kc.loadKeepServers(list)
184 }
185
186 func (kc *KeepClient) loadKeepServers(list svcList) error {
187         listed := make(map[string]bool)
188         localRoots := make(map[string]string)
189         gatewayRoots := make(map[string]string)
190         writableLocalRoots := make(map[string]string)
191
192         // replicasPerService is 1 for disks; unknown or unlimited otherwise
193         kc.replicasPerService = 1
194
195         for _, service := range list.Items {
196                 scheme := "http"
197                 if service.SSL {
198                         scheme = "https"
199                 }
200                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
201
202                 // Skip duplicates
203                 if listed[url] {
204                         continue
205                 }
206                 listed[url] = true
207
208                 localRoots[service.Uuid] = url
209                 if service.ReadOnly == false {
210                         writableLocalRoots[service.Uuid] = url
211                         if service.SvcType != "disk" {
212                                 kc.replicasPerService = 0
213                         }
214                 }
215
216                 if service.SvcType != "disk" {
217                         kc.foundNonDiskSvc = true
218                 }
219
220                 // Gateway services are only used when specified by
221                 // UUID, so there's nothing to gain by filtering them
222                 // by service type. Including all accessible services
223                 // (gateway and otherwise) merely accommodates more
224                 // service configurations.
225                 gatewayRoots[service.Uuid] = url
226         }
227
228         kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
229         return nil
230 }