3114: Merge branch 'master' into 3114-copy-to-project
[arvados.git] / sdk / go / keepclient / support.go
1 /* Internal methods to support keepclient.go */
2 package keepclient
3
4 import (
5         "git.curoverse.com/arvados.git/sdk/go/streamer"
6         "errors"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "net/http"
12         "os"
13         "strconv"
14         "strings"
15 )
16
17 type keepDisk struct {
18         Hostname string `json:"service_host"`
19         Port     int    `json:"service_port"`
20         SSL      bool   `json:"service_ssl_flag"`
21         SvcType  string `json:"service_type"`
22 }
23
24 func (this *KeepClient) DiscoverKeepServers() error {
25         if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
26                 this.SetServiceRoots([]string{prx})
27                 this.Using_proxy = true
28                 return nil
29         }
30
31         type svcList struct {
32                 Items []keepDisk `json:"items"`
33         }
34         var m svcList
35
36         err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &m)
37
38         if err != nil {
39                 if err := this.Arvados.List("keep_disks", nil, &m); err != nil {
40                         return err
41                 }
42         }
43
44         listed := make(map[string]bool)
45         service_roots := make([]string, 0, len(m.Items))
46
47         for _, element := range m.Items {
48                 n := ""
49
50                 if element.SSL {
51                         n = "s"
52                 }
53
54                 // Construct server URL
55                 url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
56
57                 // Skip duplicates
58                 if !listed[url] {
59                         listed[url] = true
60                         service_roots = append(service_roots, url)
61                 }
62                 if element.SvcType == "proxy" {
63                         this.Using_proxy = true
64                 }
65         }
66
67         this.SetServiceRoots(service_roots)
68
69         return nil
70 }
71
72 func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
73         // Build an ordering with which to query the Keep servers based on the
74         // contents of the hash.  "hash" is a hex-encoded number at least 8
75         // digits (32 bits) long
76
77         // seed used to calculate the next keep server from 'pool' to be added
78         // to 'pseq'
79         seed := hash
80
81         // Keep servers still to be added to the ordering
82         service_roots := this.ServiceRoots()
83         pool := make([]string, len(service_roots))
84         copy(pool, service_roots)
85
86         // output probe sequence
87         pseq = make([]string, 0, len(service_roots))
88
89         // iterate while there are servers left to be assigned
90         for len(pool) > 0 {
91
92                 if len(seed) < 8 {
93                         // ran out of digits in the seed
94                         if len(pseq) < (len(hash) / 4) {
95                                 // the number of servers added to the probe
96                                 // sequence is less than the number of 4-digit
97                                 // slices in 'hash' so refill the seed with the
98                                 // last 4 digits.
99                                 seed = hash[len(hash)-4:]
100                         }
101                         seed += hash
102                 }
103
104                 // Take the next 8 digits (32 bytes) and interpret as an integer,
105                 // then modulus with the size of the remaining pool to get the next
106                 // selected server.
107                 probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
108                 probe %= uint64(len(pool))
109
110                 // Append the selected server to the probe sequence and remove it
111                 // from the pool.
112                 pseq = append(pseq, pool[probe])
113                 pool = append(pool[:probe], pool[probe+1:]...)
114
115                 // Remove the digits just used from the seed
116                 seed = seed[8:]
117         }
118         return pseq
119 }
120
121 type uploadStatus struct {
122         err             error
123         url             string
124         statusCode      int
125         replicas_stored int
126         response        string
127 }
128
129 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
130         upload_status chan<- uploadStatus, expectedLength int64) {
131
132         log.Printf("Uploading %s to %s", hash, host)
133
134         var req *http.Request
135         var err error
136         var url = fmt.Sprintf("%s/%s", host, hash)
137         if req, err = http.NewRequest("PUT", url, nil); err != nil {
138                 upload_status <- uploadStatus{err, url, 0, 0, ""}
139                 body.Close()
140                 return
141         }
142
143         if expectedLength > 0 {
144                 req.ContentLength = expectedLength
145         }
146
147         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
148         req.Header.Add("Content-Type", "application/octet-stream")
149
150         if this.Using_proxy {
151                 req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
152         }
153
154         req.Body = body
155
156         var resp *http.Response
157         if resp, err = this.Client.Do(req); err != nil {
158                 upload_status <- uploadStatus{err, url, 0, 0, ""}
159                 body.Close()
160                 return
161         }
162
163         rep := 1
164         if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" {
165                 fmt.Sscanf(xr, "%d", &rep)
166         }
167
168         defer resp.Body.Close()
169         defer io.Copy(ioutil.Discard, resp.Body)
170
171         respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
172         if err2 != nil && err2 != io.EOF {
173                 upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
174                 return
175         }
176
177         locator := strings.TrimSpace(string(respbody))
178
179         if resp.StatusCode == http.StatusOK {
180                 upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
181         } else {
182                 upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, locator}
183         }
184 }
185
186 func (this KeepClient) putReplicas(
187         hash string,
188         tr *streamer.AsyncStream,
189         expectedLength int64) (locator string, replicas int, err error) {
190
191         // Calculate the ordering for uploading to servers
192         sv := this.shuffledServiceRoots(hash)
193
194         // The next server to try contacting
195         next_server := 0
196
197         // The number of active writers
198         active := 0
199
200         // Used to communicate status from the upload goroutines
201         upload_status := make(chan uploadStatus)
202         defer close(upload_status)
203
204         // Desired number of replicas
205
206         remaining_replicas := this.Want_replicas
207
208         for remaining_replicas > 0 {
209                 for active < remaining_replicas {
210                         // Start some upload requests
211                         if next_server < len(sv) {
212                                 go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
213                                 next_server += 1
214                                 active += 1
215                         } else {
216                                 if active == 0 {
217                                         return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
218                                 } else {
219                                         break
220                                 }
221                         }
222                 }
223
224                 // Now wait for something to happen.
225                 status := <-upload_status
226                 if status.statusCode == 200 {
227                         // good news!
228                         remaining_replicas -= status.replicas_stored
229                         locator = status.response
230                 } else {
231                         // writing to keep server failed for some reason
232                         log.Printf("Keep server put to %v failed with '%v'",
233                                 status.url, status.err)
234                 }
235                 active -= 1
236                 log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
237         }
238
239         return locator, this.Want_replicas, nil
240 }