2798: Work in progress connecting data read from input Reader to POST requests.
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient.go
1 package keepclient
2
3 import (
4         "crypto/tls"
5         "encoding/json"
6         "fmt"
7         "io"
8         "net/http"
9         "sort"
10         "strconv"
11 )
12
13 type KeepClient struct {
14         Service_roots []string
15         ApiToken      string
16 }
17
18 type KeepDisk struct {
19         Hostname string `json:"service_host"`
20         Port     int    `json:"service_port"`
21         SSL      bool   `json:"service_ssl_flag"`
22 }
23
24 func MakeKeepClient() (kc *KeepClient, err error) {
25         kc := KeepClient{}
26         err := kc.DiscoverKeepDisks()
27         if err != nil {
28                 return nil, err
29         }
30         return &kc, nil
31 }
32
33 func (this *KeepClient) DiscoverKeepDisks() error {
34         tr := &http.Transport{
35                 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
36         }
37         client := &http.Client{Transport: tr}
38
39         var req *http.Request
40         if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
41                 return nil, err
42         }
43
44         var resp *http.Response
45         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
46         if resp, err = client.Do(req); err != nil {
47                 return nil, err
48         }
49
50         type SvcList struct {
51                 Items []KeepDisk `json:"items"`
52         }
53         dec := json.NewDecoder(resp.Body)
54         var m SvcList
55         if err := dec.Decode(&m); err != nil {
56                 return nil, err
57         }
58
59         this.service_roots = make([]string, len(m.Items))
60         for index, element := range m.Items {
61                 n := ""
62                 if element.SSL {
63                         n = "s"
64                 }
65                 this.service_roots[index] = fmt.Sprintf("http%s://%s:%d",
66                         n, element.Hostname, element.Port)
67         }
68         sort.Strings(this.service_roots)
69         return nil
70 }
71
72 func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
73         // Build an ordering with which to query the Keep servers based on the
74         // contents of the hash.  "hash" is a hex-encoded number at least 8
75         // digits (32 bits) long
76
77         // seed used to calculate the next keep server from 'pool' to be added
78         // to 'pseq'
79         seed := hash
80
81         // Keep servers still to be added to the ordering
82         pool := make([]string, len(this.Service_roots))
83         copy(pool, this.Service_roots)
84
85         // output probe sequence
86         pseq = make([]string, 0, len(this.Service_roots))
87
88         // iterate while there are servers left to be assigned
89         for len(pool) > 0 {
90
91                 if len(seed) < 8 {
92                         // ran out of digits in the seed
93                         if len(pseq) < (len(hash) / 4) {
94                                 // the number of servers added to the probe
95                                 // sequence is less than the number of 4-digit
96                                 // slices in 'hash' so refill the seed with the
97                                 // last 4 digits.
98                                 seed = hash[len(hash)-4:]
99                         }
100                         seed += hash
101                 }
102
103                 // Take the next 8 digits (32 bytes) and interpret as an integer,
104                 // then modulus with the size of the remaining pool to get the next
105                 // selected server.
106                 probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
107                 probe %= uint64(len(pool))
108
109                 // Append the selected server to the probe sequence and remove it
110                 // from the pool.
111                 pseq = append(pseq, pool[probe])
112                 pool = append(pool[:probe], pool[probe+1:]...)
113
114                 // Remove the digits just used from the seed
115                 seed = seed[8:]
116         }
117         return pseq
118 }
119
120 type ReaderSlice struct {
121         slice        []byte
122         reader_error error
123 }
124
125 type Source <-chan ReaderSlice
126 type Sink chan<- ReaderSlice
127 type Status chan error
128
129 // Read repeatedly from the reader into the specified buffer, and report each
130 // read to channel 'c'.  Completes when Reader 'r' reports an error and closes
131 // channel 'c'.
132 func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
133         defer close(c)
134
135         // Initially use entire buffer as scratch space
136         ptr := buffer[:]
137         for len(ptr) > 0 {
138                 v // Read into the scratch space
139                 n, err := r.Read(ptr)
140
141                 // End on error (includes EOF)
142                 if err != nil {
143                         c <- ReaderSlice{nil, err}
144                         return
145                 }
146
147                 // Make a slice with the contents of the read
148                 c <- ReaderSlice{ptr[:n], nil}
149
150                 // Adjust the scratch space slice
151                 ptr = ptr[n:]
152         }
153         if len(ptr) == 0 {
154                 c <- ReaderSlice{nil, io.ErrShortBuffer}
155         }
156 }
157
158 // Take slices from 'source' channel and write them to Writer 'w'.  Reports read
159 // or write errors on 'status'.  Completes when 'source' channel is closed.
160 func SinkWriter(source Source, w io.Writer, status Status) {
161         can_write = true
162
163         for {
164                 // Get the next block from the source
165                 rs, valid := <-source
166
167                 if valid {
168                         if rs.error != nil {
169                                 // propagate reader status (should only be EOF)
170                                 status <- rs.error
171                         } else if can_write {
172                                 buf := rs.slice[:]
173                                 for len(buf) > 0 {
174                                         n, err := w.Write(buf)
175                                         buf = buf[n:]
176                                         if err == io.ErrShortWrite {
177                                                 // short write, so go around again
178                                         } else if err != nil {
179                                                 // some other write error,
180                                                 // propagate error and stop
181                                                 // further writes
182                                                 status <- err
183                                                 can_write = false
184                                         }
185                                 }
186                         }
187                 } else {
188                         // source channel closed
189                         break
190                 }
191         }
192 }
193
194 func closeSinks(sinks_slice []Sink) {
195         for _, s := range sinks_slice {
196                 close(s)
197         }
198 }
199
200 // Transfer data from a source (either an already-filled buffer, or a reader)
201 // into one or more 'sinks'.  If 'source' is valid, it will read from the
202 // reader into the buffer and send the data to the sinks.  Otherwise 'buffer'
203 // it will just send the contents of the buffer to the sinks.  Completes when
204 // the 'sinks' channel is closed.
205 func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
206         // currently buffered data
207         var body []byte
208
209         // for receiving slices from ReadIntoBuffer
210         var slices chan []byte = nil
211
212         // indicates whether the buffered data is complete
213         var complete bool = false
214
215         if source != nil {
216                 // 'body' is the buffer slice representing the body content read so far
217                 body = source_buffer[:0]
218
219                 // used to communicate slices of the buffer as read
220                 reader_slices := make(chan []ReaderSlice)
221
222                 // Spin it off
223                 go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
224         } else {
225                 // use the whole buffer
226                 body = source_buffer[:]
227
228                 // that's it
229                 complete = true
230         }
231
232         // list of sinks to send to
233         sinks_slice := make([]Sink, 0)
234         defer closeSinks(sinks_slice)
235
236         for {
237                 select {
238                 case s, valid := <-sinks:
239                         if valid {
240                                 // add to the sinks slice
241                                 sinks_slice = append(sinks_slice, s)
242
243                                 // catch up the sink with the current body contents
244                                 if len(body) > 0 {
245                                         s <- ReaderSlice{body, nil}
246                                         if complete {
247                                                 s <- ReaderSlice{nil, io.EOF}
248                                         }
249                                 }
250                         } else {
251                                 // closed 'sinks' channel indicates we're done
252                                 return
253                         }
254
255                 case bk, valid := <-slices:
256                         if valid {
257                                 if bk.err != nil {
258                                         reader_error <- bk.err
259                                         if bk.err == io.EOF {
260                                                 // EOF indicates the reader is done
261                                                 // sending, so our buffer is complete.
262                                                 complete = true
263                                         } else {
264                                                 // some other reader error
265                                                 return
266                                         }
267                                 }
268
269                                 if bk.slice != nil {
270                                         // adjust body bounds now that another slice has been read
271                                         body = source_buffer[0 : len(body)+len(bk.slice)]
272                                 }
273
274                                 // send the new slice to the sinks
275                                 for _, s := range sinks_slice {
276                                         s <- bk
277                                 }
278
279                                 if complete {
280                                         // got an EOF, so close the sinks
281                                         closeSinks(sinks_slice)
282
283                                         // truncate sinks slice
284                                         sinks_slice = sinks_slice[:0]
285                                 }
286                         } else {
287                                 // no more reads
288                                 slices = nil
289                         }
290                 }
291         }
292 }
293
294 func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
295         pipereader, pipewriter := io.Pipe()
296
297         var req *http.Request
298         if req, err = http.NewRequest("POST", url, nil); err != nil {
299                 write_status <- err
300         }
301         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
302         req.Body = pipereader
303
304         // create channel to transfer slices from reader to writer
305         tr := make(chan ReaderSlice)
306
307         // start the writer goroutine
308         go SinkWriter(tr, pipewriter, write_status)
309
310         // now transfer the channel to the reader goroutine
311         sinks <- tr
312
313         var resp *http.Response
314
315         if resp, err = client.Do(req); err != nil {
316                 return nil, err
317         }
318 }
319
320 var KeepWriteError = errors.new("Could not write sufficient replicas")
321
322 func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
323         // Calculate the ordering to try writing to servers
324         sv := this.ShuffledServiceRoots(hash)
325
326         // The next server to try contacting
327         n := 0
328
329         // The number of active writers
330         active := 0
331
332         // Used to buffer reads from 'r'
333         buffer := make([]byte, 64*1024*1024)
334
335         // Used to send writers to the reader goroutine
336         sinks := make(chan Sink)
337         defer close(sinks)
338
339         // Used to communicate status from the reader goroutine
340         reader_status := make(chan error)
341
342         // Start the reader goroutine
343         go Transfer(buffer, r, sinks, reader_status)
344
345         // Used to communicate status from the writer goroutines
346         write_status := make(chan error)
347
348         for want_replicas > 0 {
349                 for active < want_replicas {
350                         // Start some writers
351                         if n < len(sv) {
352                                 go this.ConnectToKeepServer(sv[n], sinks, write_status)
353                                 n += 1
354                                 active += 1
355                         } else {
356                                 return KeepWriteError
357                         }
358                 }
359
360                 // Now wait for something to happen.
361                 select {
362                 case status := <-reader_status:
363                         if status == io.EOF {
364                                 // good news!
365                         } else {
366                                 // bad news
367                                 return status
368                         }
369                 case status := <-write_status:
370                         if status == io.EOF {
371                                 // good news!
372                                 want_replicas -= 1
373                         } else {
374                                 // writing to keep server failed for some reason.
375                         }
376                         active -= 1
377                 }
378         }
379 }