Go Keep client correctly closes response body on client requests, should fix
[arvados.git] / sdk / go / src / arvados.org / keepclient / support.go
1 /* Internal methods to support keepclient.go */
2 package keepclient
3
4 import (
5         "arvados.org/streamer"
6         "encoding/json"
7         "errors"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "log"
12         "net/http"
13         "os"
14         "strconv"
15         "strings"
16 )
17
18 type keepDisk struct {
19         Hostname string `json:"service_host"`
20         Port     int    `json:"service_port"`
21         SSL      bool   `json:"service_ssl_flag"`
22         SvcType  string `json:"service_type"`
23 }
24
25 func (this *KeepClient) DiscoverKeepServers() error {
26         if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
27                 this.SetServiceRoots([]string{prx})
28                 this.Using_proxy = true
29                 return nil
30         }
31
32         // Construct request of keep disk list
33         var req *http.Request
34         var err error
35
36         if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services/accessible?format=json", this.ApiServer), nil); err != nil {
37                 return err
38         }
39
40         // Add api token header
41         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
42         if this.External {
43                 req.Header.Add("X-External-Client", "1")
44         }
45
46         // Make the request
47         var resp *http.Response
48         if resp, err = this.Client.Do(req); err != nil {
49                 return err
50         }
51
52         if resp.StatusCode != http.StatusOK {
53                 // fall back on keep disks
54                 if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
55                         return err
56                 }
57                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
58                 if resp, err = this.Client.Do(req); err != nil {
59                         return err
60                 }
61                 if resp.StatusCode != http.StatusOK {
62                         return errors.New(resp.Status)
63                 }
64         }
65
66         // 'defer' is a stack, so it will drain the Body before closing it.
67         defer resp.Body.Close()
68         defer io.Copy(ioutil.Discard, resp.Body)
69
70         type svcList struct {
71                 Items []keepDisk `json:"items"`
72         }
73
74         // Decode json reply
75         dec := json.NewDecoder(resp.Body)
76         var m svcList
77         if err := dec.Decode(&m); err != nil {
78                 return err
79         }
80
81         listed := make(map[string]bool)
82         service_roots := make([]string, 0, len(m.Items))
83
84         for _, element := range m.Items {
85                 n := ""
86
87                 if element.SSL {
88                         n = "s"
89                 }
90
91                 // Construct server URL
92                 url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
93
94                 // Skip duplicates
95                 if !listed[url] {
96                         listed[url] = true
97                         service_roots = append(service_roots, url)
98                 }
99                 if element.SvcType == "proxy" {
100                         this.Using_proxy = true
101                 }
102         }
103
104         this.SetServiceRoots(service_roots)
105
106         return nil
107 }
108
109 func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
110         // Build an ordering with which to query the Keep servers based on the
111         // contents of the hash.  "hash" is a hex-encoded number at least 8
112         // digits (32 bits) long
113
114         // seed used to calculate the next keep server from 'pool' to be added
115         // to 'pseq'
116         seed := hash
117
118         // Keep servers still to be added to the ordering
119         service_roots := this.ServiceRoots()
120         pool := make([]string, len(service_roots))
121         copy(pool, service_roots)
122
123         // output probe sequence
124         pseq = make([]string, 0, len(service_roots))
125
126         // iterate while there are servers left to be assigned
127         for len(pool) > 0 {
128
129                 if len(seed) < 8 {
130                         // ran out of digits in the seed
131                         if len(pseq) < (len(hash) / 4) {
132                                 // the number of servers added to the probe
133                                 // sequence is less than the number of 4-digit
134                                 // slices in 'hash' so refill the seed with the
135                                 // last 4 digits.
136                                 seed = hash[len(hash)-4:]
137                         }
138                         seed += hash
139                 }
140
141                 // Take the next 8 digits (32 bytes) and interpret as an integer,
142                 // then modulus with the size of the remaining pool to get the next
143                 // selected server.
144                 probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
145                 probe %= uint64(len(pool))
146
147                 // Append the selected server to the probe sequence and remove it
148                 // from the pool.
149                 pseq = append(pseq, pool[probe])
150                 pool = append(pool[:probe], pool[probe+1:]...)
151
152                 // Remove the digits just used from the seed
153                 seed = seed[8:]
154         }
155         return pseq
156 }
157
158 type uploadStatus struct {
159         err             error
160         url             string
161         statusCode      int
162         replicas_stored int
163         response        string
164 }
165
166 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
167         upload_status chan<- uploadStatus, expectedLength int64) {
168
169         log.Printf("Uploading %s to %s", hash, host)
170
171         var req *http.Request
172         var err error
173         var url = fmt.Sprintf("%s/%s", host, hash)
174         if req, err = http.NewRequest("PUT", url, nil); err != nil {
175                 upload_status <- uploadStatus{err, url, 0, 0, ""}
176                 body.Close()
177                 return
178         }
179
180         if expectedLength > 0 {
181                 req.ContentLength = expectedLength
182         }
183
184         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
185         req.Header.Add("Content-Type", "application/octet-stream")
186
187         if this.Using_proxy {
188                 req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
189         }
190
191         req.Body = body
192
193         var resp *http.Response
194         if resp, err = this.Client.Do(req); err != nil {
195                 upload_status <- uploadStatus{err, url, 0, 0, ""}
196                 body.Close()
197                 return
198         }
199
200         rep := 1
201         if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" {
202                 fmt.Sscanf(xr, "%d", &rep)
203         }
204
205         defer resp.Body.Close()
206         defer io.Copy(ioutil.Discard, resp.Body)
207
208         respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
209         if err2 != nil && err2 != io.EOF {
210                 upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
211                 return
212         }
213
214         locator := strings.TrimSpace(string(respbody))
215
216         if resp.StatusCode == http.StatusOK {
217                 upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
218         } else {
219                 upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, locator}
220         }
221 }
222
223 func (this KeepClient) putReplicas(
224         hash string,
225         tr *streamer.AsyncStream,
226         expectedLength int64) (locator string, replicas int, err error) {
227
228         // Calculate the ordering for uploading to servers
229         sv := this.shuffledServiceRoots(hash)
230
231         // The next server to try contacting
232         next_server := 0
233
234         // The number of active writers
235         active := 0
236
237         // Used to communicate status from the upload goroutines
238         upload_status := make(chan uploadStatus)
239         defer close(upload_status)
240
241         // Desired number of replicas
242
243         remaining_replicas := this.Want_replicas
244
245         for remaining_replicas > 0 {
246                 for active < remaining_replicas {
247                         // Start some upload requests
248                         if next_server < len(sv) {
249                                 go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
250                                 next_server += 1
251                                 active += 1
252                         } else {
253                                 if active == 0 {
254                                         return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
255                                 } else {
256                                         break
257                                 }
258                         }
259                 }
260
261                 // Now wait for something to happen.
262                 status := <-upload_status
263                 if status.statusCode == 200 {
264                         // good news!
265                         remaining_replicas -= status.replicas_stored
266                         locator = status.response
267                 } else {
268                         // writing to keep server failed for some reason
269                         log.Printf("Keep server put to %v failed with '%v'",
270                                 status.url, status.err)
271                 }
272                 active -= 1
273                 log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
274         }
275
276         return locator, this.Want_replicas, nil
277 }