2798: Work in progress connecting data read from input Reader to POST requests.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 12 May 2014 20:54:23 +0000 (16:54 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 12 May 2014 20:54:23 +0000 (16:54 -0400)
sdk/go/src/arvados.org/keepclient/keepclient.go

index ba735235f12b524840f58a7cf5201e40c93e4e4d..073a76e48ea23be2bf895fed0f3f9ad8501518f3 100644 (file)
@@ -12,6 +12,7 @@ import (
 
 type KeepClient struct {
        Service_roots []string
+       ApiToken      string
 }
 
 type KeepDisk struct {
@@ -20,7 +21,16 @@ type KeepDisk struct {
        SSL      bool   `json:"service_ssl_flag"`
 }
 
-func KeepDisks() (service_roots []string, err error) {
+func MakeKeepClient() (kc *KeepClient, err error) {
+       kc := KeepClient{}
+       err := kc.DiscoverKeepDisks()
+       if err != nil {
+               return nil, err
+       }
+       return &kc, nil
+}
+
+func (this *KeepClient) DiscoverKeepDisks() error {
        tr := &http.Transport{
                TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
        }
@@ -32,7 +42,7 @@ func KeepDisks() (service_roots []string, err error) {
        }
 
        var resp *http.Response
-       req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
        if resp, err = client.Do(req); err != nil {
                return nil, err
        }
@@ -46,25 +56,17 @@ func KeepDisks() (service_roots []string, err error) {
                return nil, err
        }
 
-       service_roots = make([]string, len(m.Items))
+       this.service_roots = make([]string, len(m.Items))
        for index, element := range m.Items {
                n := ""
                if element.SSL {
                        n = "s"
                }
-               service_roots[index] = fmt.Sprintf("http%s://%s:%d",
+               this.service_roots[index] = fmt.Sprintf("http%s://%s:%d",
                        n, element.Hostname, element.Port)
        }
-       sort.Strings(service_roots)
-       return service_roots, nil
-}
-
-func MakeKeepClient() (kc *KeepClient, err error) {
-       sv, err := KeepDisks()
-       if err != nil {
-               return nil, err
-       }
-       return &KeepClient{sv}, nil
+       sort.Strings(this.service_roots)
+       return nil
 }
 
 func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
@@ -115,97 +117,263 @@ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
        return pseq
 }
 
-func ReadIntoBuffer(buffer []byte, r io.Reader, c chan []byte, reader_error chan error) {
+type ReaderSlice struct {
+       slice        []byte
+       reader_error error
+}
+
+type Source <-chan ReaderSlice
+type Sink chan<- ReaderSlice
+type Status chan error
+
+// Read repeatedly from the reader into the specified buffer, and report each
+// read to channel 'c'.  Completes when Reader 'r' reports an error and closes
+// channel 'c'.
+func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
+       defer close(c)
+
+       // Initially use entire buffer as scratch space
        ptr := buffer[:]
-       for {
+       for len(ptr) > 0 {
+               v // Read into the scratch space
                n, err := r.Read(ptr)
+
+               // End on error (includes EOF)
                if err != nil {
-                       reader_error <- err
+                       c <- ReaderSlice{nil, err}
                        return
                }
-               c <- ptr[:n]
+
+               // Make a slice with the contents of the read
+               c <- ReaderSlice{ptr[:n], nil}
+
+               // Adjust the scratch space slice
                ptr = ptr[n:]
        }
+       if len(ptr) == 0 {
+               c <- ReaderSlice{nil, io.ErrShortBuffer}
+       }
 }
 
-type Sink struct {
-       out io.Writer
-       err chan error
+// Take slices from 'source' channel and write them to Writer 'w'.  Reports read
+// or write errors on 'status'.  Completes when 'source' channel is closed.
+func SinkWriter(source Source, w io.Writer, status Status) {
+       can_write = true
+
+       for {
+               // Get the next block from the source
+               rs, valid := <-source
+
+               if valid {
+                       if rs.error != nil {
+                               // propagate reader status (should only be EOF)
+                               status <- rs.error
+                       } else if can_write {
+                               buf := rs.slice[:]
+                               for len(buf) > 0 {
+                                       n, err := w.Write(buf)
+                                       buf = buf[n:]
+                                       if err == io.ErrShortWrite {
+                                               // short write, so go around again
+                                       } else if err != nil {
+                                               // some other write error,
+                                               // propagate error and stop
+                                               // further writes
+                                               status <- err
+                                               can_write = false
+                                       }
+                               }
+                       }
+               } else {
+                       // source channel closed
+                       break
+               }
+       }
 }
 
-// Transfer data from a buffer into one or more 'sinks'.
-//
-// Forwards all data read to the writers in "Sinks", including any previous
-// reads into the buffer.  Either one of buffer or io.Reader must be valid, and
-// the other must be nil.  If 'source' is valid, it will read from the reader,
-// store the data in the buffer, and send the data to the sinks.  Otherwise
-// 'buffer' must be valid, and it will send the contents of the buffer to the
-// sinks.
-func Transfer(buffer []byte, source io.Reader, sinks chan Sink, errorchan chan error) {
+func closeSinks(sinks_slice []Sink) {
+       for _, s := range sinks_slice {
+               close(s)
+       }
+}
+
+// Transfer data from a source (either an already-filled buffer, or a reader)
+// into one or more 'sinks'.  If 'source' is valid, it will read from the
+// reader into the buffer and send the data to the sinks.  Otherwise 'buffer'
+// it will just send the contents of the buffer to the sinks.  Completes when
+// the 'sinks' channel is closed.
+func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
        // currently buffered data
-       var ptr []byte
+       var body []byte
 
        // for receiving slices from ReadIntoBuffer
-       var slices chan []byte
+       var slices chan []byte = nil
 
        // indicates whether the buffered data is complete
        var complete bool = false
 
-       // for receiving errors from ReadIntoBuffer
-       var reader_error chan error = nil
-
        if source != nil {
-               // allocate the scratch buffer at 64 MiB
-               buffer = make([]byte, 1024*1024*64)
-
-               // 'ptr' is a slice indicating the buffer slice that has been
-               // read so far
-               ptr = buffer[0:0]
+               // 'body' is the buffer slice representing the body content read so far
+               body = source_buffer[:0]
 
                // used to communicate slices of the buffer as read
-               slices := make(chan []byte)
-
-               // communicate read errors
-               reader_error = make(chan error)
+               reader_slices := make(chan []ReaderSlice)
 
                // Spin it off
-               go ReadIntoBuffer(buffer, source, slices, reader_error)
+               go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
        } else {
                // use the whole buffer
-               ptr = buffer[:]
+               body = source_buffer[:]
 
                // that's it
                complete = true
        }
 
        // list of sinks to send to
-       sinks_slice := make([]io.Writer, 0)
-
-       select {
-       case e := <-reader_error:
-               // barf
-       case s, valid := <-sinks:
-               if !valid {
-                       // sinks channel closed
-                       return
-               }
-               sinks_slice = append(sinks_slice, s)
-               go s.Write(ptr)
-       case bk := <-slices:
-               ptr = buffer[0 : len(ptr)+len(bk)]
-               for _, s := range sinks_slice {
-                       go s.Write(bk)
+       sinks_slice := make([]Sink, 0)
+       defer closeSinks(sinks_slice)
+
+       for {
+               select {
+               case s, valid := <-sinks:
+                       if valid {
+                               // add to the sinks slice
+                               sinks_slice = append(sinks_slice, s)
+
+                               // catch up the sink with the current body contents
+                               if len(body) > 0 {
+                                       s <- ReaderSlice{body, nil}
+                                       if complete {
+                                               s <- ReaderSlice{nil, io.EOF}
+                                       }
+                               }
+                       } else {
+                               // closed 'sinks' channel indicates we're done
+                               return
+                       }
+
+               case bk, valid := <-slices:
+                       if valid {
+                               if bk.err != nil {
+                                       reader_error <- bk.err
+                                       if bk.err == io.EOF {
+                                               // EOF indicates the reader is done
+                                               // sending, so our buffer is complete.
+                                               complete = true
+                                       } else {
+                                               // some other reader error
+                                               return
+                                       }
+                               }
+
+                               if bk.slice != nil {
+                                       // adjust body bounds now that another slice has been read
+                                       body = source_buffer[0 : len(body)+len(bk.slice)]
+                               }
+
+                               // send the new slice to the sinks
+                               for _, s := range sinks_slice {
+                                       s <- bk
+                               }
+
+                               if complete {
+                                       // got an EOF, so close the sinks
+                                       closeSinks(sinks_slice)
+
+                                       // truncate sinks slice
+                                       sinks_slice = sinks_slice[:0]
+                               }
+                       } else {
+                               // no more reads
+                               slices = nil
+                       }
                }
        }
 }
 
-func (this KeepClient) KeepPut(hash string, r io.Reader) {
-       //sv := this.ShuffledServiceRoots(hash)
-       //n := 0
+func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
+       pipereader, pipewriter := io.Pipe()
+
+       var req *http.Request
+       if req, err = http.NewRequest("POST", url, nil); err != nil {
+               write_status <- err
+       }
+       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+       req.Body = pipereader
+
+       // create channel to transfer slices from reader to writer
+       tr := make(chan ReaderSlice)
 
-       //success := make(chan int)
-       sinks := make(chan []io.Writer)
-       errorchan := make(chan error)
+       // start the writer goroutine
+       go SinkWriter(tr, pipewriter, write_status)
+
+       // now transfer the channel to the reader goroutine
+       sinks <- tr
+
+       var resp *http.Response
+
+       if resp, err = client.Do(req); err != nil {
+               return nil, err
+       }
+}
 
-       go Transfer(nil, r, reads, errorchan)
+var KeepWriteError = errors.new("Could not write sufficient replicas")
+
+func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
+       // Calculate the ordering to try writing to servers
+       sv := this.ShuffledServiceRoots(hash)
+
+       // The next server to try contacting
+       n := 0
+
+       // The number of active writers
+       active := 0
+
+       // Used to buffer reads from 'r'
+       buffer := make([]byte, 64*1024*1024)
+
+       // Used to send writers to the reader goroutine
+       sinks := make(chan Sink)
+       defer close(sinks)
+
+       // Used to communicate status from the reader goroutine
+       reader_status := make(chan error)
+
+       // Start the reader goroutine
+       go Transfer(buffer, r, sinks, reader_status)
+
+       // Used to communicate status from the writer goroutines
+       write_status := make(chan error)
+
+       for want_replicas > 0 {
+               for active < want_replicas {
+                       // Start some writers
+                       if n < len(sv) {
+                               go this.ConnectToKeepServer(sv[n], sinks, write_status)
+                               n += 1
+                               active += 1
+                       } else {
+                               return KeepWriteError
+                       }
+               }
+
+               // Now wait for something to happen.
+               select {
+               case status := <-reader_status:
+                       if status == io.EOF {
+                               // good news!
+                       } else {
+                               // bad news
+                               return status
+                       }
+               case status := <-write_status:
+                       if status == io.EOF {
+                               // good news!
+                               want_replicas -= 1
+                       } else {
+                               // writing to keep server failed for some reason.
+                       }
+                       active -= 1
+               }
+       }
 }