Merge branch '13993-federated-collection' refs #13993
[arvados.git] / sdk / go / keepclient / discover.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "encoding/json"
9         "fmt"
10         "log"
11         "os"
12         "os/signal"
13         "strings"
14         "sync"
15         "syscall"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
19 )
20
21 // ClearCache clears the Keep service discovery cache.
22 func RefreshServiceDiscovery() {
23         svcListCacheMtx.Lock()
24         defer svcListCacheMtx.Unlock()
25         for _, ent := range svcListCache {
26                 ent.clear <- struct{}{}
27         }
28 }
29
30 // ClearCacheOnSIGHUP installs a signal handler that calls
31 // ClearCache when SIGHUP is received.
32 func RefreshServiceDiscoveryOnSIGHUP() {
33         svcListCacheMtx.Lock()
34         defer svcListCacheMtx.Unlock()
35         if svcListCacheSignal != nil {
36                 return
37         }
38         svcListCacheSignal = make(chan os.Signal, 1)
39         signal.Notify(svcListCacheSignal, syscall.SIGHUP)
40         go func() {
41                 for range svcListCacheSignal {
42                         RefreshServiceDiscovery()
43                 }
44         }()
45 }
46
47 var (
48         svcListCache       = map[string]cachedSvcList{}
49         svcListCacheSignal chan os.Signal
50         svcListCacheMtx    sync.Mutex
51 )
52
53 type cachedSvcList struct {
54         arv    *arvadosclient.ArvadosClient
55         latest chan svcList
56         clear  chan struct{}
57 }
58
59 // Check for new services list every few minutes. Send the latest list
60 // to the "latest" channel as needed.
61 func (ent *cachedSvcList) poll() {
62         wakeup := make(chan struct{})
63
64         replace := make(chan svcList)
65         go func() {
66                 wakeup <- struct{}{}
67                 current := <-replace
68                 for {
69                         select {
70                         case <-ent.clear:
71                                 wakeup <- struct{}{}
72                                 // Wait here for the next success, in
73                                 // order to avoid returning stale
74                                 // results on the "latest" channel.
75                                 current = <-replace
76                         case current = <-replace:
77                         case ent.latest <- current:
78                         }
79                 }
80         }()
81
82         okDelay := 5 * time.Minute
83         errDelay := 3 * time.Second
84         timer := time.NewTimer(okDelay)
85         for {
86                 select {
87                 case <-timer.C:
88                 case <-wakeup:
89                         if !timer.Stop() {
90                                 // Lost race stopping timer; skip extra firing
91                                 <-timer.C
92                         }
93                 }
94                 var next svcList
95                 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
96                 if err != nil {
97                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
98                         timer.Reset(errDelay)
99                         continue
100                 }
101                 replace <- next
102                 timer.Reset(okDelay)
103         }
104 }
105
106 // discoverServices gets the list of available keep services from
107 // the API server.
108 //
109 // If a list of services is provided in the arvadosclient (e.g., from
110 // an environment variable or local config), that list is used
111 // instead.
112 //
113 // If an API call is made, the result is cached for 5 minutes or until
114 // ClearCache() is called, and during this interval it is reused by
115 // other KeepClients that use the same API server host.
116 func (kc *KeepClient) discoverServices() error {
117         if kc.disableDiscovery {
118                 return nil
119         }
120
121         if kc.Arvados.KeepServiceURIs != nil {
122                 kc.disableDiscovery = true
123                 kc.foundNonDiskSvc = true
124                 kc.replicasPerService = 0
125                 roots := make(map[string]string)
126                 for i, uri := range kc.Arvados.KeepServiceURIs {
127                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
128                 }
129                 kc.setServiceRoots(roots, roots, roots)
130                 return nil
131         }
132
133         svcListCacheMtx.Lock()
134         cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
135         if !ok {
136                 arv := *kc.Arvados
137                 cacheEnt = cachedSvcList{
138                         latest: make(chan svcList),
139                         clear:  make(chan struct{}),
140                         arv:    &arv,
141                 }
142                 go cacheEnt.poll()
143                 svcListCache[kc.Arvados.ApiServer] = cacheEnt
144         }
145         svcListCacheMtx.Unlock()
146
147         return kc.loadKeepServers(<-cacheEnt.latest)
148 }
149
150 // LoadKeepServicesFromJSON gets list of available keep services from
151 // given JSON and disables automatic service discovery.
152 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
153         kc.disableDiscovery = true
154
155         var list svcList
156         dec := json.NewDecoder(strings.NewReader(services))
157         if err := dec.Decode(&list); err != nil {
158                 return err
159         }
160
161         return kc.loadKeepServers(list)
162 }
163
164 func (kc *KeepClient) loadKeepServers(list svcList) error {
165         listed := make(map[string]bool)
166         localRoots := make(map[string]string)
167         gatewayRoots := make(map[string]string)
168         writableLocalRoots := make(map[string]string)
169
170         // replicasPerService is 1 for disks; unknown or unlimited otherwise
171         kc.replicasPerService = 1
172
173         for _, service := range list.Items {
174                 scheme := "http"
175                 if service.SSL {
176                         scheme = "https"
177                 }
178                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
179
180                 // Skip duplicates
181                 if listed[url] {
182                         continue
183                 }
184                 listed[url] = true
185
186                 localRoots[service.Uuid] = url
187                 if service.ReadOnly == false {
188                         writableLocalRoots[service.Uuid] = url
189                         if service.SvcType != "disk" {
190                                 kc.replicasPerService = 0
191                         }
192                 }
193
194                 if service.SvcType != "disk" {
195                         kc.foundNonDiskSvc = true
196                 }
197
198                 // Gateway services are only used when specified by
199                 // UUID, so there's nothing to gain by filtering them
200                 // by service type. Including all accessible services
201                 // (gateway and otherwise) merely accommodates more
202                 // service configurations.
203                 gatewayRoots[service.Uuid] = url
204         }
205
206         kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
207         return nil
208 }