c5413d4a4b067c041141b89109afd81a144ed2a6
[arvados.git] / sdk / go / keepclient / discover.go
1 package keepclient
2
3 import (
4         "encoding/json"
5         "fmt"
6         "log"
7         "os"
8         "os/signal"
9         "strings"
10         "sync"
11         "syscall"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 )
16
17 // ClearCache clears the Keep service discovery cache.
18 func ClearCache() {
19         svcListCacheMtx.Lock()
20         defer svcListCacheMtx.Unlock()
21         for _, ent := range svcListCache {
22                 ent.clear <- struct{}{}
23         }
24 }
25
26 // ClearCacheOnSIGHUP installs a signal handler that calls
27 // ClearCache when SIGHUP is received.
28 func ClearCacheOnSIGHUP() {
29         svcListCacheMtx.Lock()
30         defer svcListCacheMtx.Unlock()
31         if svcListCacheSignal != nil {
32                 return
33         }
34         svcListCacheSignal = make(chan os.Signal, 1)
35         signal.Notify(svcListCacheSignal, syscall.SIGHUP)
36         go func() {
37                 for range svcListCacheSignal {
38                         ClearCache()
39                 }
40         }()
41 }
42
43 var (
44         svcListCache       = map[string]cachedSvcList{}
45         svcListCacheSignal chan os.Signal
46         svcListCacheMtx    sync.Mutex
47 )
48
49 type cachedSvcList struct {
50         arv    *arvadosclient.ArvadosClient
51         latest chan svcList
52         clear  chan struct{}
53 }
54
55 // Check for new services list every few minutes. Send the latest list
56 // to the "latest" channel as needed.
57 func (ent *cachedSvcList) poll() {
58         wakeup := make(chan struct{})
59
60         replace := make(chan svcList)
61         go func() {
62                 wakeup <- struct{}{}
63                 current := <-replace
64                 for {
65                         select {
66                         case <-ent.clear:
67                                 wakeup <- struct{}{}
68                                 // Wait here for the next success, in
69                                 // order to avoid returning stale
70                                 // results on the "latest" channel.
71                                 current = <-replace
72                         case current = <-replace:
73                         case ent.latest <- current:
74                         }
75                 }
76         }()
77
78         okDelay := 5 * time.Minute
79         errDelay := 3 * time.Second
80         timer := time.NewTimer(okDelay)
81         for {
82                 select {
83                 case <-timer.C:
84                 case <-wakeup:
85                         if !timer.Stop() {
86                                 // Lost race stopping timer; skip extra firing
87                                 <-timer.C
88                         }
89                 }
90                 var next svcList
91                 err := ent.arv.Call("GET", "keep_services", "", "accessible", nil, &next)
92                 if err != nil {
93                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errDelay)
94                         timer.Reset(errDelay)
95                         continue
96                 }
97                 replace <- next
98                 timer.Reset(okDelay)
99         }
100 }
101
102 // discoverServices gets the list of available keep services from
103 // the API server.
104 //
105 // If a list of services is provided in the arvadosclient (e.g., from
106 // an environment variable or local config), that list is used
107 // instead.
108 //
109 // If an API call is made, the result is cached for 5 minutes or until
110 // ClearCache() is called, and during this interval it is reused by
111 // other KeepClients that use the same API server host.
112 func (kc *KeepClient) discoverServices() error {
113         if kc.disableDiscovery {
114                 return nil
115         }
116
117         if kc.Arvados.KeepServiceURIs != nil {
118                 kc.disableDiscovery = true
119                 kc.foundNonDiskSvc = true
120                 kc.replicasPerService = 0
121                 roots := make(map[string]string)
122                 for i, uri := range kc.Arvados.KeepServiceURIs {
123                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
124                 }
125                 kc.setServiceRoots(roots, roots, roots)
126                 return nil
127         }
128
129         svcListCacheMtx.Lock()
130         cacheEnt, ok := svcListCache[kc.Arvados.ApiServer]
131         if !ok {
132                 arv := *kc.Arvados
133                 cacheEnt = cachedSvcList{
134                         latest: make(chan svcList),
135                         clear:  make(chan struct{}),
136                         arv:    &arv,
137                 }
138                 go cacheEnt.poll()
139                 svcListCache[kc.Arvados.ApiServer] = cacheEnt
140         }
141         svcListCacheMtx.Unlock()
142
143         return kc.loadKeepServers(<-cacheEnt.latest)
144 }
145
146 // LoadKeepServicesFromJSON gets list of available keep services from
147 // given JSON and disables automatic service discovery.
148 func (kc *KeepClient) LoadKeepServicesFromJSON(services string) error {
149         kc.disableDiscovery = true
150
151         var list svcList
152         dec := json.NewDecoder(strings.NewReader(services))
153         if err := dec.Decode(&list); err != nil {
154                 return err
155         }
156
157         return kc.loadKeepServers(list)
158 }
159
160 func (kc *KeepClient) loadKeepServers(list svcList) error {
161         listed := make(map[string]bool)
162         localRoots := make(map[string]string)
163         gatewayRoots := make(map[string]string)
164         writableLocalRoots := make(map[string]string)
165
166         // replicasPerService is 1 for disks; unknown or unlimited otherwise
167         kc.replicasPerService = 1
168
169         for _, service := range list.Items {
170                 scheme := "http"
171                 if service.SSL {
172                         scheme = "https"
173                 }
174                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
175
176                 // Skip duplicates
177                 if listed[url] {
178                         continue
179                 }
180                 listed[url] = true
181
182                 localRoots[service.Uuid] = url
183                 if service.ReadOnly == false {
184                         writableLocalRoots[service.Uuid] = url
185                         if service.SvcType != "disk" {
186                                 kc.replicasPerService = 0
187                         }
188                 }
189
190                 if service.SvcType != "disk" {
191                         kc.foundNonDiskSvc = true
192                 }
193
194                 // Gateway services are only used when specified by
195                 // UUID, so there's nothing to gain by filtering them
196                 // by service type. Including all accessible services
197                 // (gateway and otherwise) merely accommodates more
198                 // service configurations.
199                 gatewayRoots[service.Uuid] = url
200         }
201
202         kc.setServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
203         return nil
204 }