Merge branch '2853-rendezvous' closes #2853
[arvados.git] / sdk / go / keepclient / support.go
1 /* Internal methods to support keepclient.go */
2 package keepclient
3
4 import (
5         "crypto/md5"
6         "git.curoverse.com/arvados.git/sdk/go/streamer"
7         "errors"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "log"
12         "net/http"
13         "os"
14         "strings"
15 )
16
17 type keepDisk struct {
18         Uuid     string `json:"uuid"`
19         Hostname string `json:"service_host"`
20         Port     int    `json:"service_port"`
21         SSL      bool   `json:"service_ssl_flag"`
22         SvcType  string `json:"service_type"`
23 }
24
25 func Md5String(s string) (string) {
26         return fmt.Sprintf("%x", md5.Sum([]byte(s)))
27 }
28
29 func (this *KeepClient) DiscoverKeepServers() error {
30         if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
31                 sr := map[string]string{"proxy":prx}
32                 this.SetServiceRoots(sr)
33                 this.Using_proxy = true
34                 return nil
35         }
36
37         type svcList struct {
38                 Items []keepDisk `json:"items"`
39         }
40         var m svcList
41
42         err := this.Arvados.Call("GET", "keep_services", "", "accessible", nil, &m)
43
44         if err != nil {
45                 if err := this.Arvados.List("keep_disks", nil, &m); err != nil {
46                         return err
47                 }
48         }
49
50         listed := make(map[string]bool)
51         service_roots := make(map[string]string)
52
53         for _, element := range m.Items {
54                 n := ""
55
56                 if element.SSL {
57                         n = "s"
58                 }
59
60                 // Construct server URL
61                 url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
62
63                 // Skip duplicates
64                 if !listed[url] {
65                         listed[url] = true
66                         service_roots[element.Uuid] = url
67                 }
68                 if element.SvcType == "proxy" {
69                         this.Using_proxy = true
70                 }
71         }
72
73         this.SetServiceRoots(service_roots)
74
75         return nil
76 }
77
78 type uploadStatus struct {
79         err             error
80         url             string
81         statusCode      int
82         replicas_stored int
83         response        string
84 }
85
86 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
87         upload_status chan<- uploadStatus, expectedLength int64) {
88
89         log.Printf("Uploading %s to %s", hash, host)
90
91         var req *http.Request
92         var err error
93         var url = fmt.Sprintf("%s/%s", host, hash)
94         if req, err = http.NewRequest("PUT", url, nil); err != nil {
95                 upload_status <- uploadStatus{err, url, 0, 0, ""}
96                 body.Close()
97                 return
98         }
99
100         if expectedLength > 0 {
101                 req.ContentLength = expectedLength
102         }
103
104         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
105         req.Header.Add("Content-Type", "application/octet-stream")
106
107         if this.Using_proxy {
108                 req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
109         }
110
111         req.Body = body
112
113         var resp *http.Response
114         if resp, err = this.Client.Do(req); err != nil {
115                 upload_status <- uploadStatus{err, url, 0, 0, ""}
116                 body.Close()
117                 return
118         }
119
120         rep := 1
121         if xr := resp.Header.Get(X_Keep_Replicas_Stored); xr != "" {
122                 fmt.Sscanf(xr, "%d", &rep)
123         }
124
125         defer resp.Body.Close()
126         defer io.Copy(ioutil.Discard, resp.Body)
127
128         respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
129         if err2 != nil && err2 != io.EOF {
130                 upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
131                 return
132         }
133
134         locator := strings.TrimSpace(string(respbody))
135
136         if resp.StatusCode == http.StatusOK {
137                 upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
138         } else {
139                 upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, locator}
140         }
141 }
142
143 func (this KeepClient) putReplicas(
144         hash string,
145         tr *streamer.AsyncStream,
146         expectedLength int64) (locator string, replicas int, err error) {
147
148         // Calculate the ordering for uploading to servers
149         sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
150
151         // The next server to try contacting
152         next_server := 0
153
154         // The number of active writers
155         active := 0
156
157         // Used to communicate status from the upload goroutines
158         upload_status := make(chan uploadStatus)
159         defer close(upload_status)
160
161         // Desired number of replicas
162
163         remaining_replicas := this.Want_replicas
164
165         for remaining_replicas > 0 {
166                 for active < remaining_replicas {
167                         // Start some upload requests
168                         if next_server < len(sv) {
169                                 go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
170                                 next_server += 1
171                                 active += 1
172                         } else {
173                                 if active == 0 {
174                                         return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
175                                 } else {
176                                         break
177                                 }
178                         }
179                 }
180
181                 // Now wait for something to happen.
182                 status := <-upload_status
183                 if status.statusCode == 200 {
184                         // good news!
185                         remaining_replicas -= status.replicas_stored
186                         locator = status.response
187                 } else {
188                         // writing to keep server failed for some reason
189                         log.Printf("Keep server put to %v failed with '%v'",
190                                 status.url, status.err)
191                 }
192                 active -= 1
193                 log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
194         }
195
196         return locator, this.Want_replicas, nil
197 }