Merge branch 'master' into 1885-keep-proxy
[arvados.git] / sdk / go / src / arvados.org / keepclient / support.go
1 /* Internal methods to support keepclient.go */
2 package keepclient
3
4 import (
5         "arvados.org/streamer"
6         "encoding/json"
7         "errors"
8         "fmt"
9         "io"
10         "log"
11         "net/http"
12         "os"
13         "strconv"
14 )
15
16 type keepDisk struct {
17         Hostname string `json:"service_host"`
18         Port     int    `json:"service_port"`
19         SSL      bool   `json:"service_ssl_flag"`
20         SvcType  string `json:"service_type"`
21 }
22
23 func (this *KeepClient) DiscoverKeepServers() error {
24         if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
25                 this.SetServiceRoots([]string{prx})
26                 this.Using_proxy = true
27                 return nil
28         }
29
30         // Construct request of keep disk list
31         var req *http.Request
32         var err error
33
34         if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services/accessible?format=json", this.ApiServer), nil); err != nil {
35                 return err
36         }
37
38         // Add api token header
39         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
40         if this.External {
41                 req.Header.Add("X-External-Client", "1")
42         }
43
44         // Make the request
45         var resp *http.Response
46         if resp, err = this.Client.Do(req); err != nil {
47                 return err
48         }
49
50         if resp.StatusCode != 200 {
51                 // fall back on keep disks
52                 if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
53                         return err
54                 }
55                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
56                 if resp, err = this.Client.Do(req); err != nil {
57                         return err
58                 }
59         }
60
61         type svcList struct {
62                 Items []keepDisk `json:"items"`
63         }
64
65         // Decode json reply
66         dec := json.NewDecoder(resp.Body)
67         var m svcList
68         if err := dec.Decode(&m); err != nil {
69                 return err
70         }
71
72         listed := make(map[string]bool)
73         service_roots := make([]string, 0, len(m.Items))
74
75         for _, element := range m.Items {
76                 n := ""
77
78                 if element.SSL {
79                         n = "s"
80                 }
81
82                 // Construct server URL
83                 url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
84
85                 // Skip duplicates
86                 if !listed[url] {
87                         listed[url] = true
88                         service_roots = append(service_roots, url)
89                 }
90                 if element.SvcType == "proxy" {
91                         this.Using_proxy = true
92                 }
93         }
94
95         this.SetServiceRoots(service_roots)
96
97         return nil
98 }
99
100 func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
101         // Build an ordering with which to query the Keep servers based on the
102         // contents of the hash.  "hash" is a hex-encoded number at least 8
103         // digits (32 bits) long
104
105         // seed used to calculate the next keep server from 'pool' to be added
106         // to 'pseq'
107         seed := hash
108
109         // Keep servers still to be added to the ordering
110         service_roots := this.ServiceRoots()
111         pool := make([]string, len(service_roots))
112         copy(pool, service_roots)
113
114         // output probe sequence
115         pseq = make([]string, 0, len(service_roots))
116
117         // iterate while there are servers left to be assigned
118         for len(pool) > 0 {
119
120                 if len(seed) < 8 {
121                         // ran out of digits in the seed
122                         if len(pseq) < (len(hash) / 4) {
123                                 // the number of servers added to the probe
124                                 // sequence is less than the number of 4-digit
125                                 // slices in 'hash' so refill the seed with the
126                                 // last 4 digits.
127                                 seed = hash[len(hash)-4:]
128                         }
129                         seed += hash
130                 }
131
132                 // Take the next 8 digits (32 bytes) and interpret as an integer,
133                 // then modulus with the size of the remaining pool to get the next
134                 // selected server.
135                 probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
136                 probe %= uint64(len(pool))
137
138                 // Append the selected server to the probe sequence and remove it
139                 // from the pool.
140                 pseq = append(pseq, pool[probe])
141                 pool = append(pool[:probe], pool[probe+1:]...)
142
143                 // Remove the digits just used from the seed
144                 seed = seed[8:]
145         }
146         return pseq
147 }
148
149 type uploadStatus struct {
150         err             error
151         url             string
152         statusCode      int
153         replicas_stored int
154 }
155
156 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
157         upload_status chan<- uploadStatus, expectedLength int64) {
158
159         log.Printf("Uploading %s to %s", hash, host)
160
161         var req *http.Request
162         var err error
163         var url = fmt.Sprintf("%s/%s", host, hash)
164         if req, err = http.NewRequest("PUT", url, nil); err != nil {
165                 upload_status <- uploadStatus{err, url, 0, 0}
166                 body.Close()
167                 return
168         }
169
170         if expectedLength > 0 {
171                 req.ContentLength = expectedLength
172         }
173
174         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
175         req.Header.Add("Content-Type", "application/octet-stream")
176
177         if this.Using_proxy {
178                 req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
179         }
180
181         req.Body = body
182
183         var resp *http.Response
184         if resp, err = this.Client.Do(req); err != nil {
185                 upload_status <- uploadStatus{err, url, 0, 0}
186                 body.Close()
187                 return
188         }
189
190         rep := 1
191         if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" {
192                 fmt.Sscanf(xr, "%d", &rep)
193         }
194
195         if resp.StatusCode == http.StatusOK {
196                 upload_status <- uploadStatus{nil, url, resp.StatusCode, rep}
197         } else {
198                 upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep}
199         }
200 }
201
202 func (this KeepClient) putReplicas(
203         hash string,
204         tr *streamer.AsyncStream,
205         expectedLength int64) (replicas int, err error) {
206
207         // Calculate the ordering for uploading to servers
208         sv := this.shuffledServiceRoots(hash)
209
210         // The next server to try contacting
211         next_server := 0
212
213         // The number of active writers
214         active := 0
215
216         // Used to communicate status from the upload goroutines
217         upload_status := make(chan uploadStatus)
218         defer close(upload_status)
219
220         // Desired number of replicas
221
222         remaining_replicas := this.Want_replicas
223
224         for remaining_replicas > 0 {
225                 for active < remaining_replicas {
226                         // Start some upload requests
227                         if next_server < len(sv) {
228                                 go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
229                                 next_server += 1
230                                 active += 1
231                         } else {
232                                 if active == 0 {
233                                         return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
234                                 } else {
235                                         break
236                                 }
237                         }
238                 }
239
240                 // Now wait for something to happen.
241                 status := <-upload_status
242                 if status.statusCode == 200 {
243                         // good news!
244                         remaining_replicas -= status.replicas_stored
245                 } else {
246                         // writing to keep server failed for some reason
247                         log.Printf("Keep server put to %v failed with '%v'",
248                                 status.url, status.err)
249                 }
250                 active -= 1
251                 log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
252         }
253
254         return this.Want_replicas, nil
255 }