From 66d5cdb1f34d614e5ecf1da5ef6efcad3a5a51ab Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 12 May 2014 16:54:23 -0400 Subject: [PATCH] 2798: Work in progress connecting data read from input Reader to POST requests. --- .../src/arvados.org/keepclient/keepclient.go | 308 ++++++++++++++---- 1 file changed, 238 insertions(+), 70 deletions(-) diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go index ba735235f1..073a76e48e 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient.go @@ -12,6 +12,7 @@ import ( type KeepClient struct { Service_roots []string + ApiToken string } type KeepDisk struct { @@ -20,7 +21,16 @@ type KeepDisk struct { SSL bool `json:"service_ssl_flag"` } -func KeepDisks() (service_roots []string, err error) { +func MakeKeepClient() (kc *KeepClient, err error) { + kc := KeepClient{} + err := kc.DiscoverKeepDisks() + if err != nil { + return nil, err + } + return &kc, nil +} + +func (this *KeepClient) DiscoverKeepDisks() error { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } @@ -32,7 +42,7 @@ func KeepDisks() (service_roots []string, err error) { } var resp *http.Response - req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) if resp, err = client.Do(req); err != nil { return nil, err } @@ -46,25 +56,17 @@ func KeepDisks() (service_roots []string, err error) { return nil, err } - service_roots = make([]string, len(m.Items)) + this.service_roots = make([]string, len(m.Items)) for index, element := range m.Items { n := "" if element.SSL { n = "s" } - service_roots[index] = fmt.Sprintf("http%s://%s:%d", + this.service_roots[index] = fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port) } - sort.Strings(service_roots) - return service_roots, nil -} - -func MakeKeepClient() (kc *KeepClient, err error) { - sv, err := KeepDisks() - if err != nil { - return nil, err - } - return &KeepClient{sv}, nil + sort.Strings(this.service_roots) + return nil } func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) { @@ -115,97 +117,263 @@ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) { return pseq } -func ReadIntoBuffer(buffer []byte, r io.Reader, c chan []byte, reader_error chan error) { +type ReaderSlice struct { + slice []byte + reader_error error +} + +type Source <-chan ReaderSlice +type Sink chan<- ReaderSlice +type Status chan error + +// Read repeatedly from the reader into the specified buffer, and report each +// read to channel 'c'. Completes when Reader 'r' reports an error and closes +// channel 'c'. +func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) { + defer close(c) + + // Initially use entire buffer as scratch space ptr := buffer[:] - for { + for len(ptr) > 0 { + v // Read into the scratch space n, err := r.Read(ptr) + + // End on error (includes EOF) if err != nil { - reader_error <- err + c <- ReaderSlice{nil, err} return } - c <- ptr[:n] + + // Make a slice with the contents of the read + c <- ReaderSlice{ptr[:n], nil} + + // Adjust the scratch space slice ptr = ptr[n:] } + if len(ptr) == 0 { + c <- ReaderSlice{nil, io.ErrShortBuffer} + } } -type Sink struct { - out io.Writer - err chan error +// Take slices from 'source' channel and write them to Writer 'w'. Reports read +// or write errors on 'status'. Completes when 'source' channel is closed. +func SinkWriter(source Source, w io.Writer, status Status) { + can_write = true + + for { + // Get the next block from the source + rs, valid := <-source + + if valid { + if rs.error != nil { + // propagate reader status (should only be EOF) + status <- rs.error + } else if can_write { + buf := rs.slice[:] + for len(buf) > 0 { + n, err := w.Write(buf) + buf = buf[n:] + if err == io.ErrShortWrite { + // short write, so go around again + } else if err != nil { + // some other write error, + // propagate error and stop + // further writes + status <- err + can_write = false + } + } + } + } else { + // source channel closed + break + } + } } -// Transfer data from a buffer into one or more 'sinks'. -// -// Forwards all data read to the writers in "Sinks", including any previous -// reads into the buffer. Either one of buffer or io.Reader must be valid, and -// the other must be nil. If 'source' is valid, it will read from the reader, -// store the data in the buffer, and send the data to the sinks. Otherwise -// 'buffer' must be valid, and it will send the contents of the buffer to the -// sinks. -func Transfer(buffer []byte, source io.Reader, sinks chan Sink, errorchan chan error) { +func closeSinks(sinks_slice []Sink) { + for _, s := range sinks_slice { + close(s) + } +} + +// Transfer data from a source (either an already-filled buffer, or a reader) +// into one or more 'sinks'. If 'source' is valid, it will read from the +// reader into the buffer and send the data to the sinks. Otherwise 'buffer' +// it will just send the contents of the buffer to the sinks. Completes when +// the 'sinks' channel is closed. +func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) { // currently buffered data - var ptr []byte + var body []byte // for receiving slices from ReadIntoBuffer - var slices chan []byte + var slices chan []byte = nil // indicates whether the buffered data is complete var complete bool = false - // for receiving errors from ReadIntoBuffer - var reader_error chan error = nil - if source != nil { - // allocate the scratch buffer at 64 MiB - buffer = make([]byte, 1024*1024*64) - - // 'ptr' is a slice indicating the buffer slice that has been - // read so far - ptr = buffer[0:0] + // 'body' is the buffer slice representing the body content read so far + body = source_buffer[:0] // used to communicate slices of the buffer as read - slices := make(chan []byte) - - // communicate read errors - reader_error = make(chan error) + reader_slices := make(chan []ReaderSlice) // Spin it off - go ReadIntoBuffer(buffer, source, slices, reader_error) + go ReadIntoBuffer(source_buffer, source_reader, reader_slices) } else { // use the whole buffer - ptr = buffer[:] + body = source_buffer[:] // that's it complete = true } // list of sinks to send to - sinks_slice := make([]io.Writer, 0) - - select { - case e := <-reader_error: - // barf - case s, valid := <-sinks: - if !valid { - // sinks channel closed - return - } - sinks_slice = append(sinks_slice, s) - go s.Write(ptr) - case bk := <-slices: - ptr = buffer[0 : len(ptr)+len(bk)] - for _, s := range sinks_slice { - go s.Write(bk) + sinks_slice := make([]Sink, 0) + defer closeSinks(sinks_slice) + + for { + select { + case s, valid := <-sinks: + if valid { + // add to the sinks slice + sinks_slice = append(sinks_slice, s) + + // catch up the sink with the current body contents + if len(body) > 0 { + s <- ReaderSlice{body, nil} + if complete { + s <- ReaderSlice{nil, io.EOF} + } + } + } else { + // closed 'sinks' channel indicates we're done + return + } + + case bk, valid := <-slices: + if valid { + if bk.err != nil { + reader_error <- bk.err + if bk.err == io.EOF { + // EOF indicates the reader is done + // sending, so our buffer is complete. + complete = true + } else { + // some other reader error + return + } + } + + if bk.slice != nil { + // adjust body bounds now that another slice has been read + body = source_buffer[0 : len(body)+len(bk.slice)] + } + + // send the new slice to the sinks + for _, s := range sinks_slice { + s <- bk + } + + if complete { + // got an EOF, so close the sinks + closeSinks(sinks_slice) + + // truncate sinks slice + sinks_slice = sinks_slice[:0] + } + } else { + // no more reads + slices = nil + } } } } -func (this KeepClient) KeepPut(hash string, r io.Reader) { - //sv := this.ShuffledServiceRoots(hash) - //n := 0 +func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) { + pipereader, pipewriter := io.Pipe() + + var req *http.Request + if req, err = http.NewRequest("POST", url, nil); err != nil { + write_status <- err + } + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + req.Body = pipereader + + // create channel to transfer slices from reader to writer + tr := make(chan ReaderSlice) - //success := make(chan int) - sinks := make(chan []io.Writer) - errorchan := make(chan error) + // start the writer goroutine + go SinkWriter(tr, pipewriter, write_status) + + // now transfer the channel to the reader goroutine + sinks <- tr + + var resp *http.Response + + if resp, err = client.Do(req); err != nil { + return nil, err + } +} - go Transfer(nil, r, reads, errorchan) +var KeepWriteError = errors.new("Could not write sufficient replicas") + +func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error { + // Calculate the ordering to try writing to servers + sv := this.ShuffledServiceRoots(hash) + + // The next server to try contacting + n := 0 + + // The number of active writers + active := 0 + + // Used to buffer reads from 'r' + buffer := make([]byte, 64*1024*1024) + + // Used to send writers to the reader goroutine + sinks := make(chan Sink) + defer close(sinks) + + // Used to communicate status from the reader goroutine + reader_status := make(chan error) + + // Start the reader goroutine + go Transfer(buffer, r, sinks, reader_status) + + // Used to communicate status from the writer goroutines + write_status := make(chan error) + + for want_replicas > 0 { + for active < want_replicas { + // Start some writers + if n < len(sv) { + go this.ConnectToKeepServer(sv[n], sinks, write_status) + n += 1 + active += 1 + } else { + return KeepWriteError + } + } + + // Now wait for something to happen. + select { + case status := <-reader_status: + if status == io.EOF { + // good news! + } else { + // bad news + return status + } + case status := <-write_status: + if status == io.EOF { + // good news! + want_replicas -= 1 + } else { + // writing to keep server failed for some reason. + } + active -= 1 + } + } } -- 2.30.2