package keepclient
import (
- //"net/http"
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "sort"
+ "strconv"
)
type KeepClient struct {
- apiServer string
- apiToken string
- apiInsecure bool
- Service_roots []string
++ ApiServer string
+ ApiToken string
++ ApiInsecure bool
++ Service_roots []string
}
- func InitKeepClient() *KeepClient {
- kc := KeepClient{os.Getenv("ARVADOS_API_HOST"),
- os.Getenv("ARVADOS_API_TOKEN"),
- os.Getenv("ARVADOS_API_HOST_INSECURE") != ""}
+ type KeepDisk struct {
+ Hostname string `json:"service_host"`
+ Port int `json:"service_port"`
+ SSL bool `json:"service_ssl_flag"`
+ }
+
+ func MakeKeepClient() (kc *KeepClient, err error) {
- kc := KeepClient{}
- err := kc.DiscoverKeepDisks()
- if err != nil {
++ kc = &KeepClient{
++ ApiServer: os.Getenv("ARVADOS_API_HOST"),
++ ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
++ ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")}
++
++ if err := kc.DiscoverKeepDisks(); err != nil {
+ return nil, err
+ }
- return &kc, nil
++
++ return kc, nil
+ }
+
+ func (this *KeepClient) DiscoverKeepDisks() error {
+ tr := &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
++ TLSClientConfig: &tls.Config{InsecureSkipVerify: this.ApiInsecure},
+ }
+ client := &http.Client{Transport: tr}
+
+ var req *http.Request
++ var err error
+ if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
- return nil, err
++ return err
+ }
+
+ var resp *http.Response
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ if resp, err = client.Do(req); err != nil {
- return nil, err
++ return err
+ }
+
+ type SvcList struct {
+ Items []KeepDisk `json:"items"`
+ }
+ dec := json.NewDecoder(resp.Body)
+ var m SvcList
+ if err := dec.Decode(&m); err != nil {
- return nil, err
++ return err
+ }
+
- this.service_roots = make([]string, len(m.Items))
++ this.Service_roots = make([]string, len(m.Items))
+ for index, element := range m.Items {
+ n := ""
+ if element.SSL {
+ n = "s"
+ }
- this.service_roots[index] = fmt.Sprintf("http%s://%s:%d",
++ this.Service_roots[index] = fmt.Sprintf("http%s://%s:%d",
+ n, element.Hostname, element.Port)
+ }
- sort.Strings(this.service_roots)
++ sort.Strings(this.Service_roots)
+ return nil
+ }
+
+ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
+ // Build an ordering with which to query the Keep servers based on the
+ // contents of the hash. "hash" is a hex-encoded number at least 8
+ // digits (32 bits) long
+
+ // seed used to calculate the next keep server from 'pool' to be added
+ // to 'pseq'
+ seed := hash
+
+ // Keep servers still to be added to the ordering
+ pool := make([]string, len(this.Service_roots))
+ copy(pool, this.Service_roots)
+
+ // output probe sequence
+ pseq = make([]string, 0, len(this.Service_roots))
+
+ // iterate while there are servers left to be assigned
+ for len(pool) > 0 {
+
+ if len(seed) < 8 {
+ // ran out of digits in the seed
+ if len(pseq) < (len(hash) / 4) {
+ // the number of servers added to the probe
+ // sequence is less than the number of 4-digit
+ // slices in 'hash' so refill the seed with the
+ // last 4 digits.
+ seed = hash[len(hash)-4:]
+ }
+ seed += hash
+ }
+
+ // Take the next 8 digits (32 bytes) and interpret as an integer,
+ // then modulus with the size of the remaining pool to get the next
+ // selected server.
+ probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
+ probe %= uint64(len(pool))
+
+ // Append the selected server to the probe sequence and remove it
+ // from the pool.
+ pseq = append(pseq, pool[probe])
+ pool = append(pool[:probe], pool[probe+1:]...)
+
+ // Remove the digits just used from the seed
+ seed = seed[8:]
+ }
+ return pseq
+ }
+
+ type ReaderSlice struct {
+ slice []byte
+ reader_error error
+ }
+
+ type Source <-chan ReaderSlice
+ type Sink chan<- ReaderSlice
+ type Status chan error
+
+ // Read repeatedly from the reader into the specified buffer, and report each
+ // read to channel 'c'. Completes when Reader 'r' reports an error and closes
+ // channel 'c'.
+ func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
+ defer close(c)
+
+ // Initially use entire buffer as scratch space
+ ptr := buffer[:]
+ for len(ptr) > 0 {
- v // Read into the scratch space
++ // Read into the scratch space
+ n, err := r.Read(ptr)
+
+ // End on error (includes EOF)
+ if err != nil {
+ c <- ReaderSlice{nil, err}
+ return
+ }
+
+ // Make a slice with the contents of the read
+ c <- ReaderSlice{ptr[:n], nil}
+
+ // Adjust the scratch space slice
+ ptr = ptr[n:]
+ }
+ if len(ptr) == 0 {
+ c <- ReaderSlice{nil, io.ErrShortBuffer}
+ }
+ }
+
+ // Take slices from 'source' channel and write them to Writer 'w'. Reports read
+ // or write errors on 'status'. Completes when 'source' channel is closed.
-func SinkWriter(source Source, w io.Writer, status Status) {
++/*func SinkWriter(source Source, w io.Writer, status Status) {
+ can_write = true
+
+ for {
+ // Get the next block from the source
+ rs, valid := <-source
+
+ if valid {
+ if rs.error != nil {
+ // propagate reader status (should only be EOF)
+ status <- rs.error
+ } else if can_write {
+ buf := rs.slice[:]
+ for len(buf) > 0 {
+ n, err := w.Write(buf)
+ buf = buf[n:]
+ if err == io.ErrShortWrite {
+ // short write, so go around again
+ } else if err != nil {
+ // some other write error,
+ // propagate error and stop
+ // further writes
+ status <- err
+ can_write = false
+ }
+ }
+ }
+ } else {
+ // source channel closed
+ break
+ }
+ }
-}
++}*/
+
+ func closeSinks(sinks_slice []Sink) {
+ for _, s := range sinks_slice {
+ close(s)
+ }
+ }
+
+ // Transfer data from a source (either an already-filled buffer, or a reader)
+ // into one or more 'sinks'. If 'source' is valid, it will read from the
+ // reader into the buffer and send the data to the sinks. Otherwise 'buffer'
+ // it will just send the contents of the buffer to the sinks. Completes when
+ // the 'sinks' channel is closed.
+ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
+ // currently buffered data
+ var body []byte
+
+ // for receiving slices from ReadIntoBuffer
+ var slices chan []byte = nil
+
+ // indicates whether the buffered data is complete
+ var complete bool = false
+
- if source != nil {
++ if source_reader != nil {
+ // 'body' is the buffer slice representing the body content read so far
+ body = source_buffer[:0]
+
+ // used to communicate slices of the buffer as read
+ reader_slices := make(chan []ReaderSlice)
+
+ // Spin it off
+ go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
+ } else {
+ // use the whole buffer
+ body = source_buffer[:]
+
+ // that's it
+ complete = true
+ }
+
+ // list of sinks to send to
+ sinks_slice := make([]Sink, 0)
+ defer closeSinks(sinks_slice)
+
+ for {
+ select {
+ case s, valid := <-sinks:
+ if valid {
+ // add to the sinks slice
+ sinks_slice = append(sinks_slice, s)
+
+ // catch up the sink with the current body contents
+ if len(body) > 0 {
+ s <- ReaderSlice{body, nil}
+ if complete {
+ s <- ReaderSlice{nil, io.EOF}
+ }
+ }
+ } else {
+ // closed 'sinks' channel indicates we're done
+ return
+ }
+
+ case bk, valid := <-slices:
+ if valid {
+ if bk.err != nil {
+ reader_error <- bk.err
+ if bk.err == io.EOF {
+ // EOF indicates the reader is done
+ // sending, so our buffer is complete.
+ complete = true
+ } else {
+ // some other reader error
+ return
+ }
+ }
+
+ if bk.slice != nil {
+ // adjust body bounds now that another slice has been read
+ body = source_buffer[0 : len(body)+len(bk.slice)]
+ }
+
+ // send the new slice to the sinks
+ for _, s := range sinks_slice {
+ s <- bk
+ }
+
+ if complete {
+ // got an EOF, so close the sinks
+ closeSinks(sinks_slice)
+
+ // truncate sinks slice
+ sinks_slice = sinks_slice[:0]
+ }
+ } else {
+ // no more reads
+ slices = nil
+ }
+ }
+ }
+ }
+
+ func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
+ pipereader, pipewriter := io.Pipe()
+
+ var req *http.Request
+ if req, err = http.NewRequest("POST", url, nil); err != nil {
+ write_status <- err
+ }
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ req.Body = pipereader
+
+ // create channel to transfer slices from reader to writer
+ tr := make(chan ReaderSlice)
+
+ // start the writer goroutine
+ go SinkWriter(tr, pipewriter, write_status)
+
+ // now transfer the channel to the reader goroutine
+ sinks <- tr
+
+ var resp *http.Response
+
+ if resp, err = client.Do(req); err != nil {
+ return nil, err
+ }
+ }
+
+ var KeepWriteError = errors.new("Could not write sufficient replicas")
+
+ func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
+ // Calculate the ordering to try writing to servers
+ sv := this.ShuffledServiceRoots(hash)
+
+ // The next server to try contacting
+ n := 0
+
+ // The number of active writers
+ active := 0
+
+ // Used to buffer reads from 'r'
+ buffer := make([]byte, 64*1024*1024)
+
+ // Used to send writers to the reader goroutine
+ sinks := make(chan Sink)
+ defer close(sinks)
+
+ // Used to communicate status from the reader goroutine
+ reader_status := make(chan error)
+
+ // Start the reader goroutine
+ go Transfer(buffer, r, sinks, reader_status)
+
+ // Used to communicate status from the writer goroutines
+ write_status := make(chan error)
+
+ for want_replicas > 0 {
+ for active < want_replicas {
+ // Start some writers
+ if n < len(sv) {
+ go this.ConnectToKeepServer(sv[n], sinks, write_status)
+ n += 1
+ active += 1
+ } else {
+ return KeepWriteError
+ }
+ }
- return &kc
+ // Now wait for something to happen.
+ select {
+ case status := <-reader_status:
+ if status == io.EOF {
+ // good news!
+ } else {
+ // bad news
+ return status
+ }
+ case status := <-write_status:
+ if status == io.EOF {
+ // good news!
+ want_replicas -= 1
+ } else {
+ // writing to keep server failed for some reason.
+ }
+ active -= 1
+ }
+ }
}
package keepclient
import (
+ "fmt"
. "gopkg.in/check.v1"
+ "os"
+ "os/exec"
"testing"
)
-// Hook up gocheck into the "go test" runner.
+// Gocheck boilerplate
func Test(t *testing.T) { TestingT(t) }
+// Gocheck boilerplate
+var _ = Suite(&MySuite{})
+
+// Our test fixture
type MySuite struct{}
-var _ = Suite(&MySuite{})
+func (s *MySuite) SetUpSuite(c *C) {
+ os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ exec.Command("python", "run_test_server.py", "start").Run()
+}
+
+func (s *MySuite) TearDownSuite(c *C) {
+ os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ exec.Command("python", "run_test_server.py", "stop").Run()
+}
+
+func (s *MySuite) TestInit(c *C) {
+ os.Setenv("ARVADOS_API_HOST", "localhost:3001")
+ os.Setenv("ARVADOS_API_TOKEN", "12345")
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "")
+ kc := InitKeepClient()
+ c.Assert(kc.apiServer, Equals, "localhost:3001")
+ c.Assert(kc.apiToken, Equals, "12345")
+ c.Assert(kc.apiInsecure, Equals, false)
+
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+ kc = InitKeepClient()
+ c.Assert(kc.apiServer, Equals, "localhost:3001")
+ c.Assert(kc.apiToken, Equals, "12345")
+ c.Assert(kc.apiInsecure, Equals, true)
+}
+
+ func (s *MySuite) TestGetKeepDisks(c *C) {
+ sr, err := KeepDisks()
+ c.Assert(err, Equals, nil)
+ c.Assert(len(sr), Equals, 2)
+ c.Assert(sr[0], Equals, "http://localhost:25107")
+ c.Assert(sr[1], Equals, "http://localhost:25108")
+
+ service_roots := []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}
+
+ // "foo" acbd18db4cc2f85cedef654fccc4a4d8
+ foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
+ c.Check(ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
+
+ // "bar" 37b51d194a7513e45b56f6524f2d51f2
+ bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+ c.Check(ShuffledServiceRoots(service_roots, "37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
-
+ }