8889c4bedae23f52d62667ba5b8747ed11c3f67b
[arvados.git] / sdk / go / keepclient / discover.go
1 package keepclient
2
3 import (
4         "encoding/json"
5         "fmt"
6         "log"
7         "os"
8         "os/signal"
9         "reflect"
10         "strings"
11         "syscall"
12         "time"
13 )
14
15 // DiscoverKeepServers gets list of available keep services from the
16 // API server.
17 //
18 // If a list of services is provided in the arvadosclient (e.g., from
19 // an environment variable or local config), that list is used
20 // instead.
21 func (this *KeepClient) DiscoverKeepServers() error {
22         if this.Arvados.KeepServiceURIs != nil {
23                 this.foundNonDiskSvc = true
24                 this.replicasPerService = 0
25                 roots := make(map[string]string)
26                 for i, uri := range this.Arvados.KeepServiceURIs {
27                         roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
28                 }
29                 this.SetServiceRoots(roots, roots, roots)
30                 return nil
31         }
32
33         // ArvadosClient did not provide a services list. Ask API
34         // server for a list of accessible services.
35         var list svcList
36         err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &list)
37         if err != nil {
38                 return err
39         }
40         return this.loadKeepServers(list)
41 }
42
43 // LoadKeepServicesFromJSON gets list of available keep services from given JSON
44 func (this *KeepClient) LoadKeepServicesFromJSON(services string) error {
45         var list svcList
46
47         // Load keep services from given json
48         dec := json.NewDecoder(strings.NewReader(services))
49         if err := dec.Decode(&list); err != nil {
50                 return err
51         }
52
53         return this.loadKeepServers(list)
54 }
55
56 // RefreshServices calls DiscoverKeepServers to refresh the keep
57 // service list on SIGHUP; when the given interval has elapsed since
58 // the last refresh; and (if the last refresh failed) the given
59 // errInterval has elapsed.
60 func (kc *KeepClient) RefreshServices(interval, errInterval time.Duration) {
61         var previousRoots = []map[string]string{}
62
63         timer := time.NewTimer(interval)
64         gotHUP := make(chan os.Signal, 1)
65         signal.Notify(gotHUP, syscall.SIGHUP)
66
67         for {
68                 select {
69                 case <-gotHUP:
70                 case <-timer.C:
71                 }
72                 timer.Reset(interval)
73
74                 if err := kc.DiscoverKeepServers(); err != nil {
75                         log.Printf("WARNING: Error retrieving services list: %v (retrying in %v)", err, errInterval)
76                         timer.Reset(errInterval)
77                         continue
78                 }
79                 newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()}
80
81                 if !reflect.DeepEqual(previousRoots, newRoots) {
82                         DebugPrintf("DEBUG: Updated services list: locals %v gateways %v", newRoots[0], newRoots[1])
83                         previousRoots = newRoots
84                 }
85
86                 if len(newRoots[0]) == 0 {
87                         log.Printf("WARNING: No local services (retrying in %v)", errInterval)
88                         timer.Reset(errInterval)
89                 }
90         }
91 }
92
93 // loadKeepServers
94 func (this *KeepClient) loadKeepServers(list svcList) error {
95         listed := make(map[string]bool)
96         localRoots := make(map[string]string)
97         gatewayRoots := make(map[string]string)
98         writableLocalRoots := make(map[string]string)
99
100         // replicasPerService is 1 for disks; unknown or unlimited otherwise
101         this.replicasPerService = 1
102
103         for _, service := range list.Items {
104                 scheme := "http"
105                 if service.SSL {
106                         scheme = "https"
107                 }
108                 url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
109
110                 // Skip duplicates
111                 if listed[url] {
112                         continue
113                 }
114                 listed[url] = true
115
116                 localRoots[service.Uuid] = url
117                 if service.ReadOnly == false {
118                         writableLocalRoots[service.Uuid] = url
119                         if service.SvcType != "disk" {
120                                 this.replicasPerService = 0
121                         }
122                 }
123
124                 if service.SvcType != "disk" {
125                         this.foundNonDiskSvc = true
126                 }
127
128                 // Gateway services are only used when specified by
129                 // UUID, so there's nothing to gain by filtering them
130                 // by service type. Including all accessible services
131                 // (gateway and otherwise) merely accommodates more
132                 // service configurations.
133                 gatewayRoots[service.Uuid] = url
134         }
135
136         this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
137         return nil
138 }