Merge branch 'master' into origin-8019-crunchrun-log-throttle
[arvados.git] / sdk / go / keepclient / discover.go
1 package keepclient
2
3 import (
4         "encoding/json"
5         "fmt"
6         "log"
7         "net/http"
8         "os"
9         "os/signal"
10         "reflect"
11         "strings"
12         "syscall"
13         "time"
14 )
15
16 // DiscoverKeepServers gets list of available keep services from the
17 // API server.
18 //
19 // If a list of services is provided in the arvadosclient (e.g., from
20 // an environment variable or local config), that list is used
21 // instead.
22 func (this *KeepClient) DiscoverKeepServers() error {
23         if this.Arvados.KeepServiceURIs != nil {
24                 this.foundNonDiskSvc = true
25                 this.replicasPerService = 0
26                 if c, ok := this.Client.(*http.Client); ok {
27                         this.setClientSettingsNonDisk(c)
28                 }
29                 roots := make(map[string]string)
30                 for i, uri := range this.Arvados.KeepServiceURIs {
31                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
32                 }
33                 this.SetServiceRoots(roots, roots, roots)
34                 return nil
35         }
36
37         // ArvadosClient did not provide a services list. Ask API
38         // server for a list of accessible services.
39         var list svcList
40         err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
41         if err != nil {
42                 return err
43         }
44         return this.loadKeepServers(list)
45 }
46
47 // LoadKeepServicesFromJSON gets list of available keep services from given JSON
48 func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
49         var list svcList
50
51         // Load keep services from given json
52         dec := json.NewDecoder(strings.NewReader(services))
53         if err := dec.Decode(&list); err != nil {
54                 return err
55         }
56
57         return this.loadKeepServers(list)
58 }
59
60 // RefreshServices calls DiscoverKeepServers to refresh the keep
61 // service list on SIGHUP; when the given interval has elapsed since
62 // the last refresh; and (if the last refresh failed) the given
63 // errInterval has elapsed.
64 func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
65         var previousRoots = []map[string]string{}
66
67         timer := time.NewTimer(interval)
68         gotHUP := make(chan os.Signal, 1)
69         signal.Notify(gotHUP, syscall.SIGHUP)
70
71         for {
72                 select {
73                 case <-gotHUP:
74                 case <-timer.C:
75                 }
76                 timer.Reset(interval)
77
78                 if err := kc.DiscoverKeepServers(); err != nil {
79                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errInterval)
80                         timer.Reset(errInterval)
81                         continue
82                 }
83                 newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
84
85                 if !reflect.DeepEqual(previousRoots, newRoots) {
86                         DebugPrintf("DEBUG: Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
87                         previousRoots = newRoots
88                 }
89
90                 if len(newRoots[0]) == 0 {
91                         log.Printf("WARNING: No local services (retrying in %v)", errInterval)
92                         timer.Reset(errInterval)
93                 }
94         }
95 }
96
97 // loadKeepServers
98 func (this *KeepClient) loadKeepServers(list svcList) error {
99         listed := make(map[string]bool)
100         localRoots := make(map[string]string)
101         gatewayRoots := make(map[string]string)
102         writableLocalRoots := make(map[string]string)
103
104         // replicasPerService is 1 for disks; unknown or unlimited otherwise
105         this.replicasPerService = 1
106
107         for _, service := range list.Items {
108                 scheme := "http"
109                 if service.SSL {
110                         scheme = "https"
111                 }
112                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
113
114                 // Skip duplicates
115                 if listed[url] {
116                         continue
117                 }
118                 listed[url] = true
119
120                 localRoots[service.Uuid] = url
121                 if service.ReadOnly == false {
122                         writableLocalRoots[service.Uuid] = url
123                         if service.SvcType != "disk" {
124                                 this.replicasPerService = 0
125                         }
126                 }
127
128                 if service.SvcType != "disk" {
129                         this.foundNonDiskSvc = true
130                 }
131
132                 // Gateway services are only used when specified by
133                 // UUID, so there's nothing to gain by filtering them
134                 // by service type. Including all accessible services
135                 // (gateway and otherwise) merely accommodates more
136                 // service configurations.
137                 gatewayRoots[service.Uuid] = url
138         }
139
140         if client, ok := this.Client.(*http.Client); ok {
141                 if this.foundNonDiskSvc {
142                         this.setClientSettingsNonDisk(client)
143                 } else {
144                         this.setClientSettingsDisk(client)
145                 }
146         }
147
148         this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
149         return nil
150 }