Merge branch 'master' into 1885-keep-proxy refs #1885
[arvados.git] / sdk / go / src / arvados.org / keepclient / support.go
1 /* Internal methods to support keepclient.go */
2 package keepclient
3
4 import (
5         "arvados.org/streamer"
6         "encoding/json"
7         "errors"
8         "fmt"
9         "io"
10         "log"
11         "net/http"
12         "os"
13         "sort"
14         "strconv"
15 )
16
17 type keepDisk struct {
18         Hostname string `json:"service_host"`
19         Port     int    `json:"service_port"`
20         SSL      bool   `json:"service_ssl_flag"`
21         SvcType  string `json:"service_type"`
22 }
23
24 func (this *KeepClient) discoverKeepServers() error {
25         if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
26                 this.Service_roots = make([]string, 1)
27                 this.Service_roots[0] = prx
28                 this.Using_proxy = true
29                 return nil
30         }
31
32         // Construct request of keep disk list
33         var req *http.Request
34         var err error
35
36         if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services/accessible?format=json", this.ApiServer), nil); err != nil {
37                 return err
38         }
39
40         // Add api token header
41         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
42         if this.External {
43                 req.Header.Add("X-External-Client", "1")
44         }
45
46         // Make the request
47         var resp *http.Response
48         if resp, err = this.Client.Do(req); err != nil {
49                 return err
50         }
51
52         if resp.StatusCode != 200 {
53                 // fall back on keep disks
54                 if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
55                         return err
56                 }
57                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
58                 if resp, err = this.Client.Do(req); err != nil {
59                         return err
60                 }
61         }
62
63         type svcList struct {
64                 Items []keepDisk `json:"items"`
65         }
66
67         // Decode json reply
68         dec := json.NewDecoder(resp.Body)
69         var m svcList
70         if err := dec.Decode(&m); err != nil {
71                 return err
72         }
73
74         listed := make(map[string]bool)
75         this.Service_roots = make([]string, 0, len(m.Items))
76
77         for _, element := range m.Items {
78                 n := ""
79
80                 if element.SSL {
81                         n = "s"
82                 }
83
84                 // Construct server URL
85                 url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
86
87                 // Skip duplicates
88                 if !listed[url] {
89                         listed[url] = true
90                         this.Service_roots = append(this.Service_roots, url)
91                 }
92                 if element.SvcType == "proxy" {
93                         this.Using_proxy = true
94                 }
95         }
96
97         // Must be sorted for ShuffledServiceRoots() to produce consistent
98         // results.
99         sort.Strings(this.Service_roots)
100
101         return nil
102 }
103
104 func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
105         // Build an ordering with which to query the Keep servers based on the
106         // contents of the hash.  "hash" is a hex-encoded number at least 8
107         // digits (32 bits) long
108
109         // seed used to calculate the next keep server from 'pool' to be added
110         // to 'pseq'
111         seed := hash
112
113         // Keep servers still to be added to the ordering
114         pool := make([]string, len(this.Service_roots))
115         copy(pool, this.Service_roots)
116
117         // output probe sequence
118         pseq = make([]string, 0, len(this.Service_roots))
119
120         // iterate while there are servers left to be assigned
121         for len(pool) > 0 {
122
123                 if len(seed) < 8 {
124                         // ran out of digits in the seed
125                         if len(pseq) < (len(hash) / 4) {
126                                 // the number of servers added to the probe
127                                 // sequence is less than the number of 4-digit
128                                 // slices in 'hash' so refill the seed with the
129                                 // last 4 digits.
130                                 seed = hash[len(hash)-4:]
131                         }
132                         seed += hash
133                 }
134
135                 // Take the next 8 digits (32 bytes) and interpret as an integer,
136                 // then modulus with the size of the remaining pool to get the next
137                 // selected server.
138                 probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
139                 probe %= uint64(len(pool))
140
141                 // Append the selected server to the probe sequence and remove it
142                 // from the pool.
143                 pseq = append(pseq, pool[probe])
144                 pool = append(pool[:probe], pool[probe+1:]...)
145
146                 // Remove the digits just used from the seed
147                 seed = seed[8:]
148         }
149         return pseq
150 }
151
152 type uploadStatus struct {
153         err             error
154         url             string
155         statusCode      int
156         replicas_stored int
157 }
158
159 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
160         upload_status chan<- uploadStatus, expectedLength int64) {
161
162         log.Printf("Uploading to %s", host)
163
164         var req *http.Request
165         var err error
166         var url = fmt.Sprintf("%s/%s", host, hash)
167         if req, err = http.NewRequest("PUT", url, nil); err != nil {
168                 upload_status <- uploadStatus{err, url, 0, 0}
169                 body.Close()
170                 return
171         }
172
173         if expectedLength > 0 {
174                 req.ContentLength = expectedLength
175         }
176
177         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
178         req.Header.Add("Content-Type", "application/octet-stream")
179
180         if this.Using_proxy {
181                 req.Header.Add("X-Keep-Desired-Replicas", fmt.Sprint(this.Want_replicas))
182         }
183
184         req.Body = body
185
186         var resp *http.Response
187         if resp, err = this.Client.Do(req); err != nil {
188                 upload_status <- uploadStatus{err, url, 0, 0}
189                 body.Close()
190                 return
191         }
192
193         rep := 1
194         if xr := resp.Header.Get("X-Keep-Replicas-Stored"); xr != "" {
195                 fmt.Sscanf(xr, "%d", &rep)
196         }
197
198         if resp.StatusCode == http.StatusOK {
199                 upload_status <- uploadStatus{nil, url, resp.StatusCode, rep}
200         } else {
201                 upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep}
202         }
203 }
204
205 func (this KeepClient) putReplicas(
206         hash string,
207         tr *streamer.AsyncStream,
208         expectedLength int64) (replicas int, err error) {
209
210         // Calculate the ordering for uploading to servers
211         sv := this.shuffledServiceRoots(hash)
212
213         // The next server to try contacting
214         next_server := 0
215
216         // The number of active writers
217         active := 0
218
219         // Used to communicate status from the upload goroutines
220         upload_status := make(chan uploadStatus)
221         defer close(upload_status)
222
223         // Desired number of replicas
224
225         remaining_replicas := this.Want_replicas
226
227         for remaining_replicas > 0 {
228                 for active < remaining_replicas {
229                         // Start some upload requests
230                         if next_server < len(sv) {
231                                 go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
232                                 next_server += 1
233                                 active += 1
234                         } else {
235                                 if active == 0 {
236                                         return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
237                                 } else {
238                                         break
239                                 }
240                         }
241                 }
242
243                 // Now wait for something to happen.
244                 status := <-upload_status
245                 if status.statusCode == 200 {
246                         // good news!
247                         remaining_replicas -= status.replicas_stored
248                 } else {
249                         // writing to keep server failed for some reason
250                         log.Printf("Keep server put to %v failed with '%v'",
251                                 status.url, status.err)
252                 }
253                 active -= 1
254                 log.Printf("Upload status code: %v remaining replicas: %v active: %v", status.statusCode, remaining_replicas, active)
255         }
256
257         return this.Want_replicas, nil
258 }