13 type KeepClient struct {
14 Service_roots []string
17 type KeepDisk struct {
18 Hostname string `json:"service_host"`
19 Port int `json:"service_port"`
20 SSL bool `json:"service_ssl_flag"`
23 func KeepDisks() (service_roots []string, err error) {
24 tr := &http.Transport{
25 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
27 client := &http.Client{Transport: tr}
30 if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
34 var resp *http.Response
35 req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
36 if resp, err = client.Do(req); err != nil {
41 Items []KeepDisk `json:"items"`
43 dec := json.NewDecoder(resp.Body)
45 if err := dec.Decode(&m); err != nil {
49 service_roots = make([]string, len(m.Items))
50 for index, element := range m.Items {
55 service_roots[index] = fmt.Sprintf("http%s://%s:%d",
56 n, element.Hostname, element.Port)
58 sort.Strings(service_roots)
59 return service_roots, nil
62 func MakeKeepClient() (kc *KeepClient, err error) {
63 sv, err := KeepDisks()
67 return &KeepClient{sv}, nil
70 func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
71 // Build an ordering with which to query the Keep servers based on the
72 // contents of the hash. "hash" is a hex-encoded number at least 8
73 // digits (32 bits) long
75 // seed used to calculate the next keep server from 'pool' to be added
79 // Keep servers still to be added to the ordering
80 pool := make([]string, len(this.Service_roots))
81 copy(pool, this.Service_roots)
83 // output probe sequence
84 pseq = make([]string, 0, len(this.Service_roots))
86 // iterate while there are servers left to be assigned
90 // ran out of digits in the seed
91 if len(pseq) < (len(hash) / 4) {
92 // the number of servers added to the probe
93 // sequence is less than the number of 4-digit
94 // slices in 'hash' so refill the seed with the
96 seed = hash[len(hash)-4:]
101 // Take the next 8 digits (32 bytes) and interpret as an integer,
102 // then modulus with the size of the remaining pool to get the next
104 probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
105 probe %= uint64(len(pool))
107 // Append the selected server to the probe sequence and remove it
109 pseq = append(pseq, pool[probe])
110 pool = append(pool[:probe], pool[probe+1:]...)
112 // Remove the digits just used from the seed
118 func Fill(buffer []byte, r io.Reader, c chan []byte, errorchan chan error) {
122 n, err := r.Read(ptr)
132 func (this KeepClient) KeepPut(hash string, r io.Reader) {
133 //sv := this.ShuffledServiceRoots(hash)
135 buffer := make([]byte, 0, 1024*1024*64)
136 //success := make(chan int)
137 reads := make(chan []byte)
138 errorchan := make(chan error)
140 go Fill(buffer, r, reads, errorchan)