9550: Allow overriding keep services discovery with ARVADOS_KEEP_SERVICES env var.
[arvados.git] / sdk / go / keepclient / discover.go
1 package keepclient
2
3 import (
4         "encoding/json"
5         "fmt"
6         "log"
7         "os"
8         "os/signal"
9         "reflect"
10         "strings"
11         "syscall"
12         "time"
13 )
14
15 // DiscoverKeepServers gets list of available keep services from the
16 // API server.
17 //
18 // If a list of services is provided in the arvadosclient (e.g., from
19 // an environment variable or local config), that list is used
20 // instead.
21 func (this *KeepClient) DiscoverKeepServers() error {
22         if this.Arvados.KeepServiceURIs != nil {
23                 this.foundNonDiskSvc = true
24                 this.replicasPerService = 0
25                 this.setClientSettingsNonDisk()
26                 roots := make(map[string]string)
27                 for i, uri := range this.Arvados.KeepServiceURIs {
28                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
29                 }
30                 this.SetServiceRoots(roots, roots, roots)
31                 return nil
32         }
33
34         var list svcList
35
36         // Get keep services from api server
37         err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
38         if err != nil {
39                 return err
40         }
41
42         return this.loadKeepServers(list)
43 }
44
45 // LoadKeepServicesFromJSON gets list of available keep services from given JSON
46 func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
47         var list svcList
48
49         // Load keep services from given json
50         dec := json.NewDecoder(strings.NewReader(services))
51         if err := dec.Decode(&list); err != nil {
52                 return err
53         }
54
55         return this.loadKeepServers(list)
56 }
57
58 // RefreshServices calls DiscoverKeepServers to refresh the keep
59 // service list on SIGHUP; when the given interval has elapsed since
60 // the last refresh; and (if the last refresh failed) the given
61 // errInterval has elapsed.
62 func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
63         var previousRoots = []map[string]string{}
64
65         timer := time.NewTimer(interval)
66         gotHUP := make(chan os.Signal, 1)
67         signal.Notify(gotHUP, syscall.SIGHUP)
68
69         for {
70                 select {
71                 case <-gotHUP:
72                 case <-timer.C:
73                 }
74                 timer.Reset(interval)
75
76                 if err := kc.DiscoverKeepServers(); err != nil {
77                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errInterval)
78                         timer.Reset(errInterval)
79                         continue
80                 }
81                 newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
82
83                 if !reflect.DeepEqual(previousRoots, newRoots) {
84                         DebugPrintf("DEBUG: Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
85                         previousRoots = newRoots
86                 }
87
88                 if len(newRoots[0]) == 0 {
89                         log.Printf("WARNING: No local services (retrying in %v)", errInterval)
90                         timer.Reset(errInterval)
91                 }
92         }
93 }
94
95 // loadKeepServers
96 func (this *KeepClient) loadKeepServers(list svcList) error {
97         listed := make(map[string]bool)
98         localRoots := make(map[string]string)
99         gatewayRoots := make(map[string]string)
100         writableLocalRoots := make(map[string]string)
101
102         // replicasPerService is 1 for disks; unknown or unlimited otherwise
103         this.replicasPerService = 1
104
105         for _, service := range list.Items {
106                 scheme := "http"
107                 if service.SSL {
108                         scheme = "https"
109                 }
110                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
111
112                 // Skip duplicates
113                 if listed[url] {
114                         continue
115                 }
116                 listed[url] = true
117
118                 localRoots[service.Uuid] = url
119                 if service.ReadOnly == false {
120                         writableLocalRoots[service.Uuid] = url
121                         if service.SvcType != "disk" {
122                                 this.replicasPerService = 0
123                         }
124                 }
125
126                 if service.SvcType != "disk" {
127                         this.foundNonDiskSvc = true
128                 }
129
130                 // Gateway services are only used when specified by
131                 // UUID, so there's nothing to gain by filtering them
132                 // by service type. Including all accessible services
133                 // (gateway and otherwise) merely accommodates more
134                 // service configurations.
135                 gatewayRoots[service.Uuid] = url
136         }
137
138         if this.foundNonDiskSvc {
139                 this.setClientSettingsNonDisk()
140         } else {
141                 this.setClientSettingsDisk()
142         }
143
144         this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
145         return nil
146 }