2798: Merged branch with code to read environment variables with branch working on...
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 13 May 2014 13:07:05 +0000 (09:07 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 13 May 2014 13:07:05 +0000 (09:07 -0400)
Merge remote-tracking branch 'origin/2798-go-keep-client' into 2798-go-keep-client

Conflicts:
sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go

1  2 
sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go

index 6d75425331e899e6a951b08ee337f971b67da234,073a76e48ea23be2bf895fed0f3f9ad8501518f3..f9dce5f7e86d55532b8ff2096a325f1baad83f75
  package keepclient
  
  import (
-       //"net/http"
+       "crypto/tls"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
 +      "os"
+       "sort"
+       "strconv"
  )
  
  type KeepClient struct {
-       apiServer   string
-       apiToken    string
-       apiInsecure bool
 -      Service_roots []string
++      ApiServer     string
+       ApiToken      string
++      ApiInsecure   bool
++      Service_roots []string
  }
  
- func InitKeepClient() *KeepClient {
-       kc := KeepClient{os.Getenv("ARVADOS_API_HOST"),
-               os.Getenv("ARVADOS_API_TOKEN"),
-               os.Getenv("ARVADOS_API_HOST_INSECURE") != ""}
+ type KeepDisk struct {
+       Hostname string `json:"service_host"`
+       Port     int    `json:"service_port"`
+       SSL      bool   `json:"service_ssl_flag"`
+ }
+ func MakeKeepClient() (kc *KeepClient, err error) {
 -      kc := KeepClient{}
 -      err := kc.DiscoverKeepDisks()
 -      if err != nil {
++      kc = &KeepClient{
++              ApiServer:   os.Getenv("ARVADOS_API_HOST"),
++              ApiToken:    os.Getenv("ARVADOS_API_TOKEN"),
++              ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")}
++
++      if err := kc.DiscoverKeepDisks(); err != nil {
+               return nil, err
+       }
 -      return &kc, nil
++
++      return kc, nil
+ }
+ func (this *KeepClient) DiscoverKeepDisks() error {
+       tr := &http.Transport{
 -              TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
++              TLSClientConfig: &tls.Config{InsecureSkipVerify: this.ApiInsecure},
+       }
+       client := &http.Client{Transport: tr}
+       var req *http.Request
++      var err error
+       if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
 -              return nil, err
++              return err
+       }
+       var resp *http.Response
+       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+       if resp, err = client.Do(req); err != nil {
 -              return nil, err
++              return err
+       }
+       type SvcList struct {
+               Items []KeepDisk `json:"items"`
+       }
+       dec := json.NewDecoder(resp.Body)
+       var m SvcList
+       if err := dec.Decode(&m); err != nil {
 -              return nil, err
++              return err
+       }
 -      this.service_roots = make([]string, len(m.Items))
++      this.Service_roots = make([]string, len(m.Items))
+       for index, element := range m.Items {
+               n := ""
+               if element.SSL {
+                       n = "s"
+               }
 -              this.service_roots[index] = fmt.Sprintf("http%s://%s:%d",
++              this.Service_roots[index] = fmt.Sprintf("http%s://%s:%d",
+                       n, element.Hostname, element.Port)
+       }
 -      sort.Strings(this.service_roots)
++      sort.Strings(this.Service_roots)
+       return nil
+ }
+ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
+       // Build an ordering with which to query the Keep servers based on the
+       // contents of the hash.  "hash" is a hex-encoded number at least 8
+       // digits (32 bits) long
+       // seed used to calculate the next keep server from 'pool' to be added
+       // to 'pseq'
+       seed := hash
+       // Keep servers still to be added to the ordering
+       pool := make([]string, len(this.Service_roots))
+       copy(pool, this.Service_roots)
+       // output probe sequence
+       pseq = make([]string, 0, len(this.Service_roots))
+       // iterate while there are servers left to be assigned
+       for len(pool) > 0 {
+               if len(seed) < 8 {
+                       // ran out of digits in the seed
+                       if len(pseq) < (len(hash) / 4) {
+                               // the number of servers added to the probe
+                               // sequence is less than the number of 4-digit
+                               // slices in 'hash' so refill the seed with the
+                               // last 4 digits.
+                               seed = hash[len(hash)-4:]
+                       }
+                       seed += hash
+               }
+               // Take the next 8 digits (32 bytes) and interpret as an integer,
+               // then modulus with the size of the remaining pool to get the next
+               // selected server.
+               probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
+               probe %= uint64(len(pool))
+               // Append the selected server to the probe sequence and remove it
+               // from the pool.
+               pseq = append(pseq, pool[probe])
+               pool = append(pool[:probe], pool[probe+1:]...)
+               // Remove the digits just used from the seed
+               seed = seed[8:]
+       }
+       return pseq
+ }
+ type ReaderSlice struct {
+       slice        []byte
+       reader_error error
+ }
+ type Source <-chan ReaderSlice
+ type Sink chan<- ReaderSlice
+ type Status chan error
+ // Read repeatedly from the reader into the specified buffer, and report each
+ // read to channel 'c'.  Completes when Reader 'r' reports an error and closes
+ // channel 'c'.
+ func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
+       defer close(c)
+       // Initially use entire buffer as scratch space
+       ptr := buffer[:]
+       for len(ptr) > 0 {
 -              v // Read into the scratch space
++              // Read into the scratch space
+               n, err := r.Read(ptr)
+               // End on error (includes EOF)
+               if err != nil {
+                       c <- ReaderSlice{nil, err}
+                       return
+               }
+               // Make a slice with the contents of the read
+               c <- ReaderSlice{ptr[:n], nil}
+               // Adjust the scratch space slice
+               ptr = ptr[n:]
+       }
+       if len(ptr) == 0 {
+               c <- ReaderSlice{nil, io.ErrShortBuffer}
+       }
+ }
+ // Take slices from 'source' channel and write them to Writer 'w'.  Reports read
+ // or write errors on 'status'.  Completes when 'source' channel is closed.
 -func SinkWriter(source Source, w io.Writer, status Status) {
++/*func SinkWriter(source Source, w io.Writer, status Status) {
+       can_write = true
+       for {
+               // Get the next block from the source
+               rs, valid := <-source
+               if valid {
+                       if rs.error != nil {
+                               // propagate reader status (should only be EOF)
+                               status <- rs.error
+                       } else if can_write {
+                               buf := rs.slice[:]
+                               for len(buf) > 0 {
+                                       n, err := w.Write(buf)
+                                       buf = buf[n:]
+                                       if err == io.ErrShortWrite {
+                                               // short write, so go around again
+                                       } else if err != nil {
+                                               // some other write error,
+                                               // propagate error and stop
+                                               // further writes
+                                               status <- err
+                                               can_write = false
+                                       }
+                               }
+                       }
+               } else {
+                       // source channel closed
+                       break
+               }
+       }
 -}
++}*/
+ func closeSinks(sinks_slice []Sink) {
+       for _, s := range sinks_slice {
+               close(s)
+       }
+ }
+ // Transfer data from a source (either an already-filled buffer, or a reader)
+ // into one or more 'sinks'.  If 'source' is valid, it will read from the
+ // reader into the buffer and send the data to the sinks.  Otherwise 'buffer'
+ // it will just send the contents of the buffer to the sinks.  Completes when
+ // the 'sinks' channel is closed.
+ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
+       // currently buffered data
+       var body []byte
+       // for receiving slices from ReadIntoBuffer
+       var slices chan []byte = nil
+       // indicates whether the buffered data is complete
+       var complete bool = false
 -      if source != nil {
++      if source_reader != nil {
+               // 'body' is the buffer slice representing the body content read so far
+               body = source_buffer[:0]
+               // used to communicate slices of the buffer as read
+               reader_slices := make(chan []ReaderSlice)
+               // Spin it off
+               go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
+       } else {
+               // use the whole buffer
+               body = source_buffer[:]
+               // that's it
+               complete = true
+       }
+       // list of sinks to send to
+       sinks_slice := make([]Sink, 0)
+       defer closeSinks(sinks_slice)
+       for {
+               select {
+               case s, valid := <-sinks:
+                       if valid {
+                               // add to the sinks slice
+                               sinks_slice = append(sinks_slice, s)
+                               // catch up the sink with the current body contents
+                               if len(body) > 0 {
+                                       s <- ReaderSlice{body, nil}
+                                       if complete {
+                                               s <- ReaderSlice{nil, io.EOF}
+                                       }
+                               }
+                       } else {
+                               // closed 'sinks' channel indicates we're done
+                               return
+                       }
+               case bk, valid := <-slices:
+                       if valid {
+                               if bk.err != nil {
+                                       reader_error <- bk.err
+                                       if bk.err == io.EOF {
+                                               // EOF indicates the reader is done
+                                               // sending, so our buffer is complete.
+                                               complete = true
+                                       } else {
+                                               // some other reader error
+                                               return
+                                       }
+                               }
+                               if bk.slice != nil {
+                                       // adjust body bounds now that another slice has been read
+                                       body = source_buffer[0 : len(body)+len(bk.slice)]
+                               }
+                               // send the new slice to the sinks
+                               for _, s := range sinks_slice {
+                                       s <- bk
+                               }
+                               if complete {
+                                       // got an EOF, so close the sinks
+                                       closeSinks(sinks_slice)
+                                       // truncate sinks slice
+                                       sinks_slice = sinks_slice[:0]
+                               }
+                       } else {
+                               // no more reads
+                               slices = nil
+                       }
+               }
+       }
+ }
+ func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
+       pipereader, pipewriter := io.Pipe()
+       var req *http.Request
+       if req, err = http.NewRequest("POST", url, nil); err != nil {
+               write_status <- err
+       }
+       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+       req.Body = pipereader
+       // create channel to transfer slices from reader to writer
+       tr := make(chan ReaderSlice)
+       // start the writer goroutine
+       go SinkWriter(tr, pipewriter, write_status)
+       // now transfer the channel to the reader goroutine
+       sinks <- tr
+       var resp *http.Response
+       if resp, err = client.Do(req); err != nil {
+               return nil, err
+       }
+ }
+ var KeepWriteError = errors.new("Could not write sufficient replicas")
+ func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
+       // Calculate the ordering to try writing to servers
+       sv := this.ShuffledServiceRoots(hash)
+       // The next server to try contacting
+       n := 0
+       // The number of active writers
+       active := 0
+       // Used to buffer reads from 'r'
+       buffer := make([]byte, 64*1024*1024)
+       // Used to send writers to the reader goroutine
+       sinks := make(chan Sink)
+       defer close(sinks)
+       // Used to communicate status from the reader goroutine
+       reader_status := make(chan error)
+       // Start the reader goroutine
+       go Transfer(buffer, r, sinks, reader_status)
+       // Used to communicate status from the writer goroutines
+       write_status := make(chan error)
+       for want_replicas > 0 {
+               for active < want_replicas {
+                       // Start some writers
+                       if n < len(sv) {
+                               go this.ConnectToKeepServer(sv[n], sinks, write_status)
+                               n += 1
+                               active += 1
+                       } else {
+                               return KeepWriteError
+                       }
+               }
  
-       return &kc
+               // Now wait for something to happen.
+               select {
+               case status := <-reader_status:
+                       if status == io.EOF {
+                               // good news!
+                       } else {
+                               // bad news
+                               return status
+                       }
+               case status := <-write_status:
+                       if status == io.EOF {
+                               // good news!
+                               want_replicas -= 1
+                       } else {
+                               // writing to keep server failed for some reason.
+                       }
+                       active -= 1
+               }
+       }
  }
index 733cc26a330fae88ff2639101c8b34dc74a237af,bc719c0e7035dfd50ef74dd2ebcc0a370eca059d..13a203d45206e5c2b22d3cfde5c7ce5fdad557ce
@@@ -1,43 -1,33 +1,62 @@@
  package keepclient
  
  import (
+       "fmt"
        . "gopkg.in/check.v1"
 +      "os"
 +      "os/exec"
        "testing"
  )
  
 -// Hook up gocheck into the "go test" runner.
 +// Gocheck boilerplate
  func Test(t *testing.T) { TestingT(t) }
  
 +// Gocheck boilerplate
 +var _ = Suite(&MySuite{})
 +
 +// Our test fixture
  type MySuite struct{}
  
 -var _ = Suite(&MySuite{})
 +func (s *MySuite) SetUpSuite(c *C) {
 +      os.Chdir(os.ExpandEnv("$GOPATH../python"))
 +      exec.Command("python", "run_test_server.py", "start").Run()
 +}
 +
 +func (s *MySuite) TearDownSuite(c *C) {
 +      os.Chdir(os.ExpandEnv("$GOPATH../python"))
 +      exec.Command("python", "run_test_server.py", "stop").Run()
 +}
 +
 +func (s *MySuite) TestInit(c *C) {
 +      os.Setenv("ARVADOS_API_HOST", "localhost:3001")
 +      os.Setenv("ARVADOS_API_TOKEN", "12345")
 +      os.Setenv("ARVADOS_API_HOST_INSECURE", "")
 +      kc := InitKeepClient()
 +      c.Assert(kc.apiServer, Equals, "localhost:3001")
 +      c.Assert(kc.apiToken, Equals, "12345")
 +      c.Assert(kc.apiInsecure, Equals, false)
 +
 +      os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
 +      kc = InitKeepClient()
 +      c.Assert(kc.apiServer, Equals, "localhost:3001")
 +      c.Assert(kc.apiToken, Equals, "12345")
 +      c.Assert(kc.apiInsecure, Equals, true)
 +}
+ func (s *MySuite) TestGetKeepDisks(c *C) {
+       sr, err := KeepDisks()
+       c.Assert(err, Equals, nil)
+       c.Assert(len(sr), Equals, 2)
+       c.Assert(sr[0], Equals, "http://localhost:25107")
+       c.Assert(sr[1], Equals, "http://localhost:25108")
+       service_roots := []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}
+       // "foo" acbd18db4cc2f85cedef654fccc4a4d8
+       foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
+       c.Check(ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
+       // "bar" 37b51d194a7513e45b56f6524f2d51f2
+       bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+       c.Check(ShuffledServiceRoots(service_roots, "37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
 -
+ }