2853: Use rendezvous hashing to select probe order.
[arvados.git] / sdk / go / keepclient / support.go
1 /* Internal methods to support keepclient.go */
2 package keepclient
3
4 import (
5         "git.curoverse.com/arvados.git/sdk/go/streamer"
6         "errors"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "net/http"
12         "os"
13         "strings"
14 )
15
16 type keepDisk struct {
17         Uuid     string `json:"uuid"`
18         Hostname string `json:"service_host"`
19         Port     int    `json:"service_port"`
20         SSL      bool   `json:"service_ssl_flag"`
21         SvcType  string `json:"service_type"`
22 }
23
24 func (this *KeepClient) DiscoverKeepServers() error {
25         if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
26                 sr := map[string]string{"proxy":prx}
27                 this.SetServiceRoots(sr)
28                 this.Using_proxy = true
29                 return nil
30         }
31
32         type svcList struct {
33                 Items []keepDisk `json:"items"`
34         }
35         var m svcList
36
37         err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &m)
38
39         if err != nil {
40                 if err := this.Arvados.List("keep_disks", nil, &m); err != nil {
41                         return err
42                 }
43         }
44
45         listed := make(map[string]bool)
46         service_roots := make(map[string]string)
47
48         for _, element := range m.Items {
49                 n := ""
50
51                 if element.SSL {
52                         n = "s"
53                 }
54
55                 // Construct server URL
56                 url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
57
58                 // Skip duplicates
59                 if !listed[url] {
60                         listed[url] = true
61                         service_roots[element.Uuid] = url
62                 }
63                 if element.SvcType == "proxy" {
64                         this.Using_proxy = true
65                 }
66         }
67
68         this.SetServiceRoots(service_roots)
69
70         return nil
71 }
72
73 func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
74         return NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
75 }
76
77 type uploadStatus struct {
78         err             error
79         url             string
80         statusCode      int
81         replicas_stored int
82         response        string
83 }
84
85 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
86         upload_status chan<- uploadStatus, expectedLength int64) {
87
88         log.Printf("Uploading %s to %s", hash, host)
89
90         var req *http.Request
91         var err error
92         var url = fmt.Sprintf("%s/%s", host, hash)
93         if req, err = http.NewRequest("PUT", url, nil); err != nil {
94                 upload_status <- uploadStatus{err, url, 0, 0, ""}
95                 body.Close()
96                 return
97         }
98
99         if expectedLength > 0 {
100                 req.ContentLength = expectedLength
101         }
102
103         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
104         req.Header.Add("Content-Type", "application/octet-stream")
105
106         if this.Using_proxy {
107                 req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
108         }
109
110         req.Body = body
111
112         var resp *http.Response
113         if resp, err = this.Client.Do(req); err != nil {
114                 upload_status <- uploadStatus{err, url, 0, 0, ""}
115                 body.Close()
116                 return
117         }
118
119         rep := 1
120         if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" {
121                 fmt.Sscanf(xr, "%d", &rep)
122         }
123
124         defer resp.Body.Close()
125         defer io.Copy(ioutil.Discard, resp.Body)
126
127         respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
128         if err2 != nil && err2 != io.EOF {
129                 upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
130                 return
131         }
132
133         locator := strings.TrimSpace(string(respbody))
134
135         if resp.StatusCode == http.StatusOK {
136                 upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
137         } else {
138                 upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, locator}
139         }
140 }
141
142 func (this KeepClient) putReplicas(
143         hash string,
144         tr *streamer.AsyncStream,
145         expectedLength int64) (locator string, replicas int, err error) {
146
147         // Calculate the ordering for uploading to servers
148         sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
149
150         // The next server to try contacting
151         next_server := 0
152
153         // The number of active writers
154         active := 0
155
156         // Used to communicate status from the upload goroutines
157         upload_status := make(chan uploadStatus)
158         defer close(upload_status)
159
160         // Desired number of replicas
161
162         remaining_replicas := this.Want_replicas
163
164         for remaining_replicas > 0 {
165                 for active < remaining_replicas {
166                         // Start some upload requests
167                         if next_server < len(sv) {
168                                 go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
169                                 next_server += 1
170                                 active += 1
171                         } else {
172                                 if active == 0 {
173                                         return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
174                                 } else {
175                                         break
176                                 }
177                         }
178                 }
179
180                 // Now wait for something to happen.
181                 status := <-upload_status
182                 if status.statusCode == 200 {
183                         // good news!
184                         remaining_replicas -= status.replicas_stored
185                         locator = status.response
186                 } else {
187                         // writing to keep server failed for some reason
188                         log.Printf("Keep server put to %v failed with '%v'",
189                                 status.url, status.err)
190                 }
191                 active -= 1
192                 log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
193         }
194
195         return locator, this.Want_replicas, nil
196 }