"crypto/tls"
"encoding/json"
"fmt"
+ "io"
"net/http"
"sort"
+ "strconv"
)
+type KeepClient struct {
+ Service_roots []string
+}
+
type KeepDisk struct {
Hostname string `json:"service_host"`
Port int `json:"service_port"`
return service_roots, nil
}
-/*
-func ProbeSequence(service_roots []string) (pseq []string) {
- pseq = make([]string, 0, len(disks))
- pool := disks[:]
+func MakeKeepClient() (kc *KeepClient, err error) {
+ sv, err := KeepDisks()
+ if err != nil {
+ return nil, err
+ }
+ return &KeepClient{sv}, nil
+}
+
+func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
+ // Build an ordering with which to query the Keep servers based on the
+ // contents of the hash. "hash" is a hex-encoded number at least 8
+ // digits (32 bits) long
+
+ // seed used to calculate the next keep server from 'pool' to be added
+ // to 'pseq'
+ seed := hash
+
+ // Keep servers still to be added to the ordering
+ pool := make([]string, len(this.Service_roots))
+ copy(pool, this.Service_roots)
+
+ // output probe sequence
+ pseq = make([]string, 0, len(this.Service_roots))
+
+ // iterate while there are servers left to be assigned
+ for len(pool) > 0 {
+
+ if len(seed) < 8 {
+ // ran out of digits in the seed
+ if len(pseq) < (len(hash) / 4) {
+ // the number of servers added to the probe
+ // sequence is less than the number of 4-digit
+ // slices in 'hash' so refill the seed with the
+ // last 4 digits.
+ seed = hash[len(hash)-4:]
+ }
+ seed += hash
+ }
+
+ // Take the next 8 digits (32 bytes) and interpret as an integer,
+ // then modulus with the size of the remaining pool to get the next
+ // selected server.
+ probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
+ probe %= uint64(len(pool))
+
+ // Append the selected server to the probe sequence and remove it
+ // from the pool.
+ pseq = append(pseq, pool[probe])
+ pool = append(pool[:probe], pool[probe+1:]...)
+
+ // Remove the digits just used from the seed
+ seed = seed[8:]
+ }
+ return pseq
+}
+
+func ReadIntoBuffer(buffer []byte, r io.Reader, c chan []byte, reader_error chan error) {
+ ptr := buffer[:]
+ for {
+ n, err := r.Read(ptr)
+ if err != nil {
+ reader_error <- err
+ return
+ }
+ c <- ptr[:n]
+ ptr = ptr[n:]
+ }
+}
+
+type Sink struct {
+ out io.Writer
+ err chan error
+}
+
+// Transfer data from a buffer into one or more 'sinks'.
+//
+// Forwards all data read to the writers in "Sinks", including any previous
+// reads into the buffer. Either one of buffer or io.Reader must be valid, and
+// the other must be nil. If 'source' is valid, it will read from the reader,
+// store the data in the buffer, and send the data to the sinks. Otherwise
+// 'buffer' must be valid, and it will send the contents of the buffer to the
+// sinks.
+func Transfer(buffer []byte, source io.Reader, sinks chan Sink, errorchan chan error) {
+ // currently buffered data
+ var ptr []byte
+
+ // for receiving slices from ReadIntoBuffer
+ var slices chan []byte
+
+ // indicates whether the buffered data is complete
+ var complete bool = false
+
+ // for receiving errors from ReadIntoBuffer
+ var reader_error chan error = nil
+
+ if source != nil {
+ // allocate the scratch buffer at 64 MiB
+ buffer = make([]byte, 1024*1024*64)
+
+ // 'ptr' is a slice indicating the buffer slice that has been
+ // read so far
+ ptr = buffer[0:0]
+
+ // used to communicate slices of the buffer as read
+ slices := make(chan []byte)
+
+ // communicate read errors
+ reader_error = make(chan error)
+
+ // Spin it off
+ go ReadIntoBuffer(buffer, source, slices, reader_error)
+ } else {
+ // use the whole buffer
+ ptr = buffer[:]
+
+ // that's it
+ complete = true
+ }
+
+ // list of sinks to send to
+ sinks_slice := make([]io.Writer, 0)
+
+ select {
+ case e := <-reader_error:
+ // barf
+ case s, valid := <-sinks:
+ if !valid {
+ // sinks channel closed
+ return
+ }
+ sinks_slice = append(sinks_slice, s)
+ go s.Write(ptr)
+ case bk := <-slices:
+ ptr = buffer[0 : len(ptr)+len(bk)]
+ for _, s := range sinks_slice {
+ go s.Write(bk)
+ }
+ }
+}
+
+func (this KeepClient) KeepPut(hash string, r io.Reader) {
+ //sv := this.ShuffledServiceRoots(hash)
+ //n := 0
+
+ //success := make(chan int)
+ sinks := make(chan []io.Writer)
+ errorchan := make(chan error)
+ go Transfer(nil, r, reads, errorchan)
}
-*/