2798: Fixed inconsistencies with Python implementation of ShuffledServiceRoots.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Sun, 11 May 2014 19:11:59 +0000 (15:11 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Sun, 11 May 2014 19:11:59 +0000 (15:11 -0400)
Started working on KeepPut.

sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go
sdk/python/arvados/keep.py
sdk/python/run_test_server.py

index 6ca0eea908c89857b9010192481cff5987a34de8..9c1210dfd8882774175602396e47cf4e411106f2 100644 (file)
@@ -4,11 +4,16 @@ import (
        "crypto/tls"
        "encoding/json"
        "fmt"
+       "io"
        "net/http"
        "sort"
        "strconv"
 )
 
+type KeepClient struct {
+       Service_roots []string
+}
+
 type KeepDisk struct {
        Hostname string `json:"service_host"`
        Port     int    `json:"service_port"`
@@ -54,7 +59,15 @@ func KeepDisks() (service_roots []string, err error) {
        return service_roots, nil
 }
 
-func ShuffledServiceRoots(service_roots []string, hash string) (pseq []string) {
+func MakeKeepClient() (kc *KeepClient, err error) {
+       sv, err := KeepDisks()
+       if err != nil {
+               return nil, err
+       }
+       return &KeepClient{sv}, nil
+}
+
+func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
        // Build an ordering with which to query the Keep servers based on the
        // contents of the hash.  "hash" is a hex-encoded number at least 8
        // digits (32 bits) long
@@ -64,10 +77,11 @@ func ShuffledServiceRoots(service_roots []string, hash string) (pseq []string) {
        seed := hash
 
        // Keep servers still to be added to the ordering
-       pool := service_roots[:]
+       pool := make([]string, len(this.Service_roots))
+       copy(pool, this.Service_roots)
 
        // output probe sequence
-       pseq = make([]string, 0, len(service_roots))
+       pseq = make([]string, 0, len(this.Service_roots))
 
        // iterate while there are servers left to be assigned
        for len(pool) > 0 {
@@ -87,8 +101,8 @@ func ShuffledServiceRoots(service_roots []string, hash string) (pseq []string) {
                // Take the next 8 digits (32 bytes) and interpret as an integer,
                // then modulus with the size of the remaining pool to get the next
                // selected server.
-               probe, _ := strconv.ParseInt(seed[0:8], 16, 32)
-               probe %= int64(len(pool))
+               probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
+               probe %= uint64(len(pool))
 
                // Append the selected server to the probe sequence and remove it
                // from the pool.
@@ -100,3 +114,28 @@ func ShuffledServiceRoots(service_roots []string, hash string) (pseq []string) {
        }
        return pseq
 }
+
+func Fill(buffer []byte, r io.Reader, c chan []byte, errorchan chan error) {
+       ptr := buffer[:]
+
+       for {
+               n, err := r.Read(ptr)
+               if err != nil {
+                       errorchan <- err
+                       return
+               }
+               c <- ptr[:n]
+               ptr = ptr[n:]
+       }
+}
+
+func (this KeepClient) KeepPut(hash string, r io.Reader) {
+       //sv := this.ShuffledServiceRoots(hash)
+       //n := 0
+       buffer := make([]byte, 0, 1024*1024*64)
+       //success := make(chan int)
+       reads := make(chan []byte)
+       errorchan := make(chan error)
+
+       go Fill(buffer, r, reads, errorchan)
+}
index 4152eb357622e9e7c398a3c4f76987d225beb004..bc719c0e7035dfd50ef74dd2ebcc0a370eca059d 100644 (file)
@@ -1,6 +1,7 @@
 package keepclient
 
 import (
+       "fmt"
        . "gopkg.in/check.v1"
        "testing"
 )
@@ -22,10 +23,11 @@ func (s *MySuite) TestGetKeepDisks(c *C) {
        service_roots := []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}
 
        // "foo" acbd18db4cc2f85cedef654fccc4a4d8
-       //foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
-       ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8")
+       foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
+       c.Check(ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
 
        // "bar" 37b51d194a7513e45b56f6524f2d51f2
-       //bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+       bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+       c.Check(ShuffledServiceRoots(service_roots, "37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
 
 }
index fcb59ec805680167a9de22caf4d6dc27d30caa92..e414d267a1347a51c9ae48354082e14bf48da29d 100644 (file)
@@ -193,6 +193,8 @@ class KeepClient(object):
             # selected server.
             probe = int(seed[0:8], 16) % len(pool)
 
+            print seed[0:8], int(seed[0:8], 16), len(pool), probe
+
             # Append the selected server to the probe sequence and remove it
             # from the pool.
             pseq += [pool[probe]]
index 3c9d55b184b41fae8e3c40f4dc88810f5adc84af..9901e14123f92aea4672b52814481faa5327b409 100644 (file)
@@ -149,6 +149,10 @@ def run_keep():
     _start_keep(0)
     _start_keep(1)
 
+
+    os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
+    os.environ["ARVADOS_API_HOST_INSECURE"] = "true"
+
     authorize_with("admin")
     api = arvados.api('v1', cache=False)
     a = api.keep_disks().list().execute()
@@ -210,3 +214,5 @@ if __name__ == "__main__":
         run_keep()
     elif args.action == 'stop_keep':
         stop_keep()
+    else:
+        print('Unrecognized action "{}", actions are "start", "stop", "start_keep", "stop_keep"'.format(args.action))