2798: Working on Transfer function, which will be the core function for
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient.go
index 2bb5ff3eb19e163c4e1a52fca1c79507a2189f4b..ba735235f12b524840f58a7cf5201e40c93e4e4d 100644 (file)
@@ -4,10 +4,16 @@ import (
        "crypto/tls"
        "encoding/json"
        "fmt"
+       "io"
        "net/http"
        "sort"
+       "strconv"
 )
 
+type KeepClient struct {
+       Service_roots []string
+}
+
 type KeepDisk struct {
        Hostname string `json:"service_host"`
        Port     int    `json:"service_port"`
@@ -53,10 +59,153 @@ func KeepDisks() (service_roots []string, err error) {
        return service_roots, nil
 }
 
-/*
-func ProbeSequence(service_roots []string) (pseq []string) {
-       pseq = make([]string, 0, len(disks))
-       pool := disks[:]
+func MakeKeepClient() (kc *KeepClient, err error) {
+       sv, err := KeepDisks()
+       if err != nil {
+               return nil, err
+       }
+       return &KeepClient{sv}, nil
+}
+
+func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
+       // Build an ordering with which to query the Keep servers based on the
+       // contents of the hash.  "hash" is a hex-encoded number at least 8
+       // digits (32 bits) long
+
+       // seed used to calculate the next keep server from 'pool' to be added
+       // to 'pseq'
+       seed := hash
+
+       // Keep servers still to be added to the ordering
+       pool := make([]string, len(this.Service_roots))
+       copy(pool, this.Service_roots)
+
+       // output probe sequence
+       pseq = make([]string, 0, len(this.Service_roots))
+
+       // iterate while there are servers left to be assigned
+       for len(pool) > 0 {
+
+               if len(seed) < 8 {
+                       // ran out of digits in the seed
+                       if len(pseq) < (len(hash) / 4) {
+                               // the number of servers added to the probe
+                               // sequence is less than the number of 4-digit
+                               // slices in 'hash' so refill the seed with the
+                               // last 4 digits.
+                               seed = hash[len(hash)-4:]
+                       }
+                       seed += hash
+               }
+
+               // Take the next 8 digits (32 bytes) and interpret as an integer,
+               // then modulus with the size of the remaining pool to get the next
+               // selected server.
+               probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
+               probe %= uint64(len(pool))
+
+               // Append the selected server to the probe sequence and remove it
+               // from the pool.
+               pseq = append(pseq, pool[probe])
+               pool = append(pool[:probe], pool[probe+1:]...)
+
+               // Remove the digits just used from the seed
+               seed = seed[8:]
+       }
+       return pseq
+}
+
+func ReadIntoBuffer(buffer []byte, r io.Reader, c chan []byte, reader_error chan error) {
+       ptr := buffer[:]
+       for {
+               n, err := r.Read(ptr)
+               if err != nil {
+                       reader_error <- err
+                       return
+               }
+               c <- ptr[:n]
+               ptr = ptr[n:]
+       }
+}
+
+type Sink struct {
+       out io.Writer
+       err chan error
+}
+
+// Transfer data from a buffer into one or more 'sinks'.
+//
+// Forwards all data read to the writers in "Sinks", including any previous
+// reads into the buffer.  Either one of buffer or io.Reader must be valid, and
+// the other must be nil.  If 'source' is valid, it will read from the reader,
+// store the data in the buffer, and send the data to the sinks.  Otherwise
+// 'buffer' must be valid, and it will send the contents of the buffer to the
+// sinks.
+func Transfer(buffer []byte, source io.Reader, sinks chan Sink, errorchan chan error) {
+       // currently buffered data
+       var ptr []byte
+
+       // for receiving slices from ReadIntoBuffer
+       var slices chan []byte
+
+       // indicates whether the buffered data is complete
+       var complete bool = false
+
+       // for receiving errors from ReadIntoBuffer
+       var reader_error chan error = nil
+
+       if source != nil {
+               // allocate the scratch buffer at 64 MiB
+               buffer = make([]byte, 1024*1024*64)
+
+               // 'ptr' is a slice indicating the buffer slice that has been
+               // read so far
+               ptr = buffer[0:0]
+
+               // used to communicate slices of the buffer as read
+               slices := make(chan []byte)
+
+               // communicate read errors
+               reader_error = make(chan error)
+
+               // Spin it off
+               go ReadIntoBuffer(buffer, source, slices, reader_error)
+       } else {
+               // use the whole buffer
+               ptr = buffer[:]
+
+               // that's it
+               complete = true
+       }
+
+       // list of sinks to send to
+       sinks_slice := make([]io.Writer, 0)
+
+       select {
+       case e := <-reader_error:
+               // barf
+       case s, valid := <-sinks:
+               if !valid {
+                       // sinks channel closed
+                       return
+               }
+               sinks_slice = append(sinks_slice, s)
+               go s.Write(ptr)
+       case bk := <-slices:
+               ptr = buffer[0 : len(ptr)+len(bk)]
+               for _, s := range sinks_slice {
+                       go s.Write(bk)
+               }
+       }
+}
+
+func (this KeepClient) KeepPut(hash string, r io.Reader) {
+       //sv := this.ShuffledServiceRoots(hash)
+       //n := 0
+
+       //success := make(chan int)
+       sinks := make(chan []io.Writer)
+       errorchan := make(chan error)
 
+       go Transfer(nil, r, reads, errorchan)
 }
-*/