Started working on KeepPut.
"crypto/tls"
"encoding/json"
"fmt"
+ "io"
"net/http"
"sort"
"strconv"
)
+type KeepClient struct {
+ Service_roots []string
+}
+
type KeepDisk struct {
Hostname string `json:"service_host"`
Port int `json:"service_port"`
return service_roots, nil
}
-func ShuffledServiceRoots(service_roots []string, hash string) (pseq []string) {
+func MakeKeepClient() (kc *KeepClient, err error) {
+ sv, err := KeepDisks()
+ if err != nil {
+ return nil, err
+ }
+ return &KeepClient{sv}, nil
+}
+
+func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
// Build an ordering with which to query the Keep servers based on the
// contents of the hash. "hash" is a hex-encoded number at least 8
// digits (32 bits) long
seed := hash
// Keep servers still to be added to the ordering
- pool := service_roots[:]
+ pool := make([]string, len(this.Service_roots))
+ copy(pool, this.Service_roots)
// output probe sequence
- pseq = make([]string, 0, len(service_roots))
+ pseq = make([]string, 0, len(this.Service_roots))
// iterate while there are servers left to be assigned
for len(pool) > 0 {
// Take the next 8 digits (32 bytes) and interpret as an integer,
// then modulus with the size of the remaining pool to get the next
// selected server.
- probe, _ := strconv.ParseInt(seed[0:8], 16, 32)
- probe %= int64(len(pool))
+ probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
+ probe %= uint64(len(pool))
// Append the selected server to the probe sequence and remove it
// from the pool.
}
return pseq
}
+
+func Fill(buffer []byte, r io.Reader, c chan []byte, errorchan chan error) {
+ ptr := buffer[:]
+
+ for {
+ n, err := r.Read(ptr)
+ if err != nil {
+ errorchan <- err
+ return
+ }
+ c <- ptr[:n]
+ ptr = ptr[n:]
+ }
+}
+
+func (this KeepClient) KeepPut(hash string, r io.Reader) {
+ //sv := this.ShuffledServiceRoots(hash)
+ //n := 0
+ buffer := make([]byte, 0, 1024*1024*64)
+ //success := make(chan int)
+ reads := make(chan []byte)
+ errorchan := make(chan error)
+
+ go Fill(buffer, r, reads, errorchan)
+}
package keepclient
import (
+ "fmt"
. "gopkg.in/check.v1"
"testing"
)
service_roots := []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}
// "foo" acbd18db4cc2f85cedef654fccc4a4d8
- //foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
- ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8")
+ foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
+ c.Check(ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
// "bar" 37b51d194a7513e45b56f6524f2d51f2
- //bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+ bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+ c.Check(ShuffledServiceRoots(service_roots, "37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
}
# selected server.
probe = int(seed[0:8], 16) % len(pool)
+ print seed[0:8], int(seed[0:8], 16), len(pool), probe
+
# Append the selected server to the probe sequence and remove it
# from the pool.
pseq += [pool[probe]]
_start_keep(0)
_start_keep(1)
+
+ os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
+ os.environ["ARVADOS_API_HOST_INSECURE"] = "true"
+
authorize_with("admin")
api = arvados.api('v1', cache=False)
a = api.keep_disks().list().execute()
run_keep()
elif args.action == 'stop_keep':
stop_keep()
+ else:
+ print('Unrecognized action "{}", actions are "start", "stop", "start_keep", "stop_keep"'.format(args.action))