Merge branch '5353-node-sizes' closes #5353
[arvados.git] / sdk / go / keepclient / discover.go
1 package keepclient
2
3 import (
4         "encoding/json"
5         "fmt"
6         "log"
7         "os"
8         "os/signal"
9         "reflect"
10         "strings"
11         "syscall"
12         "time"
13 )
14
15 // DiscoverKeepServers gets list of available keep services from api server
16 func (this *KeepClient) DiscoverKeepServers() error {
17         var list svcList
18
19         // Get keep services from api server
20         err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
21         if err != nil {
22                 return err
23         }
24
25         return this.loadKeepServers(list)
26 }
27
28 // LoadKeepServicesFromJSON gets list of available keep services from given JSON
29 func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
30         var list svcList
31
32         // Load keep services from given json
33         dec := json.NewDecoder(strings.NewReader(services))
34         if err := dec.Decode(&list); err != nil {
35                 return err
36         }
37
38         return this.loadKeepServers(list)
39 }
40
41 // RefreshServices calls DiscoverKeepServers to refresh the keep
42 // service list on SIGHUP; when the given interval has elapsed since
43 // the last refresh; and (if the last refresh failed) the given
44 // errInterval has elapsed.
45 func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
46         var previousRoots = []map[string]string{}
47
48         timer := time.NewTimer(interval)
49         gotHUP := make(chan os.Signal, 1)
50         signal.Notify(gotHUP, syscall.SIGHUP)
51
52         for {
53                 select {
54                 case <-gotHUP:
55                 case <-timer.C:
56                 }
57                 timer.Reset(interval)
58
59                 if err := kc.DiscoverKeepServers(); err != nil {
60                         log.Println("Error retrieving services list: %v (retrying in %v)", err, errInterval)
61                         timer.Reset(errInterval)
62                         continue
63                 }
64                 newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
65
66                 if !reflect.DeepEqual(previousRoots, newRoots) {
67                         log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
68                         previousRoots = newRoots
69                 }
70
71                 if len(newRoots[0]) == 0 {
72                         log.Printf("WARNING: No local services (retrying in %v)", errInterval)
73                         timer.Reset(errInterval)
74                 }
75         }
76 }
77
78 // loadKeepServers
79 func (this *KeepClient) loadKeepServers(list svcList) error {
80         listed := make(map[string]bool)
81         localRoots := make(map[string]string)
82         gatewayRoots := make(map[string]string)
83         writableLocalRoots := make(map[string]string)
84
85         // replicasPerService is 1 for disks; unknown or unlimited otherwise
86         this.replicasPerService = 1
87         this.Using_proxy = false
88
89         for _, service := range list.Items {
90                 scheme := "http"
91                 if service.SSL {
92                         scheme = "https"
93                 }
94                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
95
96                 // Skip duplicates
97                 if listed[url] {
98                         continue
99                 }
100                 listed[url] = true
101
102                 localRoots[service.Uuid] = url
103                 if service.SvcType == "proxy" {
104                         this.Using_proxy = true
105                 }
106
107                 if service.ReadOnly == false {
108                         writableLocalRoots[service.Uuid] = url
109                         if service.SvcType != "disk" {
110                                 this.replicasPerService = 0
111                         }
112                 }
113
114                 // Gateway services are only used when specified by
115                 // UUID, so there's nothing to gain by filtering them
116                 // by service type. Including all accessible services
117                 // (gateway and otherwise) merely accommodates more
118                 // service configurations.
119                 gatewayRoots[service.Uuid] = url
120         }
121
122         if this.Using_proxy {
123                 this.setClientSettingsProxy()
124         } else {
125                 this.setClientSettingsDisk()
126         }
127
128         this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
129         return nil
130 }