13 type KeepClient struct {
14 Service_roots []string
18 type KeepDisk struct {
19 Hostname string `json:"service_host"`
20 Port int `json:"service_port"`
21 SSL bool `json:"service_ssl_flag"`
24 func MakeKeepClient() (kc *KeepClient, err error) {
26 err := kc.DiscoverKeepDisks()
33 func (this *KeepClient) DiscoverKeepDisks() error {
34 tr := &http.Transport{
35 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
37 client := &http.Client{Transport: tr}
40 if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
44 var resp *http.Response
45 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
46 if resp, err = client.Do(req); err != nil {
51 Items []KeepDisk `json:"items"`
53 dec := json.NewDecoder(resp.Body)
55 if err := dec.Decode(&m); err != nil {
59 this.service_roots = make([]string, len(m.Items))
60 for index, element := range m.Items {
65 this.service_roots[index] = fmt.Sprintf("http%s://%s:%d",
66 n, element.Hostname, element.Port)
68 sort.Strings(this.service_roots)
72 func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
73 // Build an ordering with which to query the Keep servers based on the
74 // contents of the hash. "hash" is a hex-encoded number at least 8
75 // digits (32 bits) long
77 // seed used to calculate the next keep server from 'pool' to be added
81 // Keep servers still to be added to the ordering
82 pool := make([]string, len(this.Service_roots))
83 copy(pool, this.Service_roots)
85 // output probe sequence
86 pseq = make([]string, 0, len(this.Service_roots))
88 // iterate while there are servers left to be assigned
92 // ran out of digits in the seed
93 if len(pseq) < (len(hash) / 4) {
94 // the number of servers added to the probe
95 // sequence is less than the number of 4-digit
96 // slices in 'hash' so refill the seed with the
98 seed = hash[len(hash)-4:]
103 // Take the next 8 digits (32 bytes) and interpret as an integer,
104 // then modulus with the size of the remaining pool to get the next
106 probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
107 probe %= uint64(len(pool))
109 // Append the selected server to the probe sequence and remove it
111 pseq = append(pseq, pool[probe])
112 pool = append(pool[:probe], pool[probe+1:]...)
114 // Remove the digits just used from the seed
120 type ReaderSlice struct {
125 type Source <-chan ReaderSlice
126 type Sink chan<- ReaderSlice
127 type Status chan error
129 // Read repeatedly from the reader into the specified buffer, and report each
130 // read to channel 'c'. Completes when Reader 'r' reports an error and closes
132 func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
135 // Initially use entire buffer as scratch space
138 v // Read into the scratch space
139 n, err := r.Read(ptr)
141 // End on error (includes EOF)
143 c <- ReaderSlice{nil, err}
147 // Make a slice with the contents of the read
148 c <- ReaderSlice{ptr[:n], nil}
150 // Adjust the scratch space slice
154 c <- ReaderSlice{nil, io.ErrShortBuffer}
158 // Take slices from 'source' channel and write them to Writer 'w'. Reports read
159 // or write errors on 'status'. Completes when 'source' channel is closed.
160 func SinkWriter(source Source, w io.Writer, status Status) {
164 // Get the next block from the source
165 rs, valid := <-source
169 // propagate reader status (should only be EOF)
171 } else if can_write {
174 n, err := w.Write(buf)
176 if err == io.ErrShortWrite {
177 // short write, so go around again
178 } else if err != nil {
179 // some other write error,
180 // propagate error and stop
188 // source channel closed
194 func closeSinks(sinks_slice []Sink) {
195 for _, s := range sinks_slice {
200 // Transfer data from a source (either an already-filled buffer, or a reader)
201 // into one or more 'sinks'. If 'source' is valid, it will read from the
202 // reader into the buffer and send the data to the sinks. Otherwise 'buffer'
203 // it will just send the contents of the buffer to the sinks. Completes when
204 // the 'sinks' channel is closed.
205 func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
206 // currently buffered data
209 // for receiving slices from ReadIntoBuffer
210 var slices chan []byte = nil
212 // indicates whether the buffered data is complete
213 var complete bool = false
216 // 'body' is the buffer slice representing the body content read so far
217 body = source_buffer[:0]
219 // used to communicate slices of the buffer as read
220 reader_slices := make(chan []ReaderSlice)
223 go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
225 // use the whole buffer
226 body = source_buffer[:]
232 // list of sinks to send to
233 sinks_slice := make([]Sink, 0)
234 defer closeSinks(sinks_slice)
238 case s, valid := <-sinks:
240 // add to the sinks slice
241 sinks_slice = append(sinks_slice, s)
243 // catch up the sink with the current body contents
245 s <- ReaderSlice{body, nil}
247 s <- ReaderSlice{nil, io.EOF}
251 // closed 'sinks' channel indicates we're done
255 case bk, valid := <-slices:
258 reader_error <- bk.err
259 if bk.err == io.EOF {
260 // EOF indicates the reader is done
261 // sending, so our buffer is complete.
264 // some other reader error
270 // adjust body bounds now that another slice has been read
271 body = source_buffer[0 : len(body)+len(bk.slice)]
274 // send the new slice to the sinks
275 for _, s := range sinks_slice {
280 // got an EOF, so close the sinks
281 closeSinks(sinks_slice)
283 // truncate sinks slice
284 sinks_slice = sinks_slice[:0]
294 func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
295 pipereader, pipewriter := io.Pipe()
297 var req *http.Request
298 if req, err = http.NewRequest("POST", url, nil); err != nil {
301 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
302 req.Body = pipereader
304 // create channel to transfer slices from reader to writer
305 tr := make(chan ReaderSlice)
307 // start the writer goroutine
308 go SinkWriter(tr, pipewriter, write_status)
310 // now transfer the channel to the reader goroutine
313 var resp *http.Response
315 if resp, err = client.Do(req); err != nil {
320 var KeepWriteError = errors.new("Could not write sufficient replicas")
322 func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
323 // Calculate the ordering to try writing to servers
324 sv := this.ShuffledServiceRoots(hash)
326 // The next server to try contacting
329 // The number of active writers
332 // Used to buffer reads from 'r'
333 buffer := make([]byte, 64*1024*1024)
335 // Used to send writers to the reader goroutine
336 sinks := make(chan Sink)
339 // Used to communicate status from the reader goroutine
340 reader_status := make(chan error)
342 // Start the reader goroutine
343 go Transfer(buffer, r, sinks, reader_status)
345 // Used to communicate status from the writer goroutines
346 write_status := make(chan error)
348 for want_replicas > 0 {
349 for active < want_replicas {
350 // Start some writers
352 go this.ConnectToKeepServer(sv[n], sinks, write_status)
356 return KeepWriteError
360 // Now wait for something to happen.
362 case status := <-reader_status:
363 if status == io.EOF {
369 case status := <-write_status:
370 if status == io.EOF {
374 // writing to keep server failed for some reason.