2798: Fixed inconsistencies with Python implementation of ShuffledServiceRoots.
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient.go
1 package keepclient
2
3 import (
4         "crypto/tls"
5         "encoding/json"
6         "fmt"
7         "io"
8         "net/http"
9         "sort"
10         "strconv"
11 )
12
13 type KeepClient struct {
14         Service_roots []string
15 }
16
17 type KeepDisk struct {
18         Hostname string `json:"service_host"`
19         Port     int    `json:"service_port"`
20         SSL      bool   `json:"service_ssl_flag"`
21 }
22
23 func KeepDisks() (service_roots []string, err error) {
24         tr := &http.Transport{
25                 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
26         }
27         client := &http.Client{Transport: tr}
28
29         var req *http.Request
30         if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
31                 return nil, err
32         }
33
34         var resp *http.Response
35         req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
36         if resp, err = client.Do(req); err != nil {
37                 return nil, err
38         }
39
40         type SvcList struct {
41                 Items []KeepDisk `json:"items"`
42         }
43         dec := json.NewDecoder(resp.Body)
44         var m SvcList
45         if err := dec.Decode(&m); err != nil {
46                 return nil, err
47         }
48
49         service_roots = make([]string, len(m.Items))
50         for index, element := range m.Items {
51                 n := ""
52                 if element.SSL {
53                         n = "s"
54                 }
55                 service_roots[index] = fmt.Sprintf("http%s://%s:%d",
56                         n, element.Hostname, element.Port)
57         }
58         sort.Strings(service_roots)
59         return service_roots, nil
60 }
61
62 func MakeKeepClient() (kc *KeepClient, err error) {
63         sv, err := KeepDisks()
64         if err != nil {
65                 return nil, err
66         }
67         return &KeepClient{sv}, nil
68 }
69
70 func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
71         // Build an ordering with which to query the Keep servers based on the
72         // contents of the hash.  "hash" is a hex-encoded number at least 8
73         // digits (32 bits) long
74
75         // seed used to calculate the next keep server from 'pool' to be added
76         // to 'pseq'
77         seed := hash
78
79         // Keep servers still to be added to the ordering
80         pool := make([]string, len(this.Service_roots))
81         copy(pool, this.Service_roots)
82
83         // output probe sequence
84         pseq = make([]string, 0, len(this.Service_roots))
85
86         // iterate while there are servers left to be assigned
87         for len(pool) > 0 {
88
89                 if len(seed) < 8 {
90                         // ran out of digits in the seed
91                         if len(pseq) < (len(hash) / 4) {
92                                 // the number of servers added to the probe
93                                 // sequence is less than the number of 4-digit
94                                 // slices in 'hash' so refill the seed with the
95                                 // last 4 digits.
96                                 seed = hash[len(hash)-4:]
97                         }
98                         seed += hash
99                 }
100
101                 // Take the next 8 digits (32 bytes) and interpret as an integer,
102                 // then modulus with the size of the remaining pool to get the next
103                 // selected server.
104                 probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
105                 probe %= uint64(len(pool))
106
107                 // Append the selected server to the probe sequence and remove it
108                 // from the pool.
109                 pseq = append(pseq, pool[probe])
110                 pool = append(pool[:probe], pool[probe+1:]...)
111
112                 // Remove the digits just used from the seed
113                 seed = seed[8:]
114         }
115         return pseq
116 }
117
118 func Fill(buffer []byte, r io.Reader, c chan []byte, errorchan chan error) {
119         ptr := buffer[:]
120
121         for {
122                 n, err := r.Read(ptr)
123                 if err != nil {
124                         errorchan <- err
125                         return
126                 }
127                 c <- ptr[:n]
128                 ptr = ptr[n:]
129         }
130 }
131
132 func (this KeepClient) KeepPut(hash string, r io.Reader) {
133         //sv := this.ShuffledServiceRoots(hash)
134         //n := 0
135         buffer := make([]byte, 0, 1024*1024*64)
136         //success := make(chan int)
137         reads := make(chan []byte)
138         errorchan := make(chan error)
139
140         go Fill(buffer, r, reads, errorchan)
141 }