3761: Expand DiscoverKeepServers method to return service_roots and use it in test
[arvados.git] / sdk / go / keepclient / keepclient.go
1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
2 package keepclient
3
4 import (
5         "crypto/md5"
6         "errors"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/streamer"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "regexp"
15         "strings"
16         "sync"
17         "sync/atomic"
18         "time"
19         "unsafe"
20 )
21
22 // A Keep "block" is 64MB.
23 const BLOCKSIZE = 64 * 1024 * 1024
24
25 var BlockNotFound = errors.New("Block not found")
26 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
27 var OversizeBlockError = errors.New("Block too big")
28 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
29 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
30
31 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
32 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
33
34 // Information about Arvados and Keep servers.
35 type KeepClient struct {
36         Arvados       *arvadosclient.ArvadosClient
37         Want_replicas int
38         Using_proxy   bool
39         service_roots *map[string]string
40         lock          sync.Mutex
41         Client        *http.Client
42 }
43
44 // Create a new KeepClient.  This will contact the API server to discover Keep
45 // servers.
46 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) {
47         kc = KeepClient{
48                 Arvados:       arv,
49                 Want_replicas: 2,
50                 Using_proxy:   false,
51                 Client:        &http.Client{},
52         }
53         _, err = (&kc).DiscoverKeepServers()
54
55         return kc, err
56 }
57
58 // Put a block given the block hash, a reader with the block data, and the
59 // expected length of that data.  The desired number of replicas is given in
60 // KeepClient.Want_replicas.  Returns the number of replicas that were written
61 // and if there was an error.  Note this will return InsufficientReplias
62 // whenever 0 <= replicas < this.Wants_replicas.
63 func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
64
65         // Buffer for reads from 'r'
66         var bufsize int
67         if expectedLength > 0 {
68                 if expectedLength > BLOCKSIZE {
69                         return "", 0, OversizeBlockError
70                 }
71                 bufsize = int(expectedLength)
72         } else {
73                 bufsize = BLOCKSIZE
74         }
75
76         t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
77         defer t.Close()
78
79         return this.putReplicas(hash, t, expectedLength)
80 }
81
82 // Put a block given the block hash and a byte buffer.  The desired number of
83 // replicas is given in KeepClient.Want_replicas.  Returns the number of
84 // replicas that were written and if there was an error.  Note this will return
85 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
86 func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
87         t := streamer.AsyncStreamFromSlice(buf)
88         defer t.Close()
89
90         return this.putReplicas(hash, t, int64(len(buf)))
91 }
92
93 // Put a block given a buffer.  The hash will be computed.  The desired number
94 // of replicas is given in KeepClient.Want_replicas.  Returns the number of
95 // replicas that were written and if there was an error.  Note this will return
96 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
97 func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
98         hash := fmt.Sprintf("%x", md5.Sum(buffer))
99         return this.PutHB(hash, buffer)
100 }
101
102 // Put a block, given a Reader.  This will read the entire reader into a buffer
103 // to compute the hash.  The desired number of replicas is given in
104 // KeepClient.Want_replicas.  Returns the number of replicas that were written
105 // and if there was an error.  Note this will return InsufficientReplias
106 // whenever 0 <= replicas < this.Wants_replicas.  Also nhote that if the block
107 // hash and data size are available, PutHR() is more efficient.
108 func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
109         if buffer, err := ioutil.ReadAll(r); err != nil {
110                 return "", 0, err
111         } else {
112                 return this.PutB(buffer)
113         }
114 }
115
116 // Get a block given a hash.  Return a reader, the expected data length, the
117 // URL the block was fetched from, and if there was an error.  If the block
118 // checksum does not match, the final Read() on the reader returned by this
119 // method will return a BadChecksum error instead of EOF.
120 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
121         contentLength int64, url string, err error) {
122         return this.AuthorizedGet(hash, "", "")
123 }
124
125 // Get a block given a hash, with additional authorization provided by
126 // signature and timestamp.  Return a reader, the expected data length, the URL
127 // the block was fetched from, and if there was an error.  If the block
128 // checksum does not match, the final Read() on the reader returned by this
129 // method will return a BadChecksum error instead of EOF.
130 func (this KeepClient) AuthorizedGet(hash string,
131         signature string,
132         timestamp string) (reader io.ReadCloser,
133         contentLength int64, url string, err error) {
134
135         // Take the hash of locator and timestamp in order to identify this
136         // specific transaction in log statements.
137         requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
138
139         // Calculate the ordering for asking servers
140         sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
141
142         for _, host := range sv {
143                 var req *http.Request
144                 var err error
145                 var url string
146                 if signature != "" {
147                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
148                                 signature, timestamp)
149                 } else {
150                         url = fmt.Sprintf("%s/%s", host, hash)
151                 }
152                 if req, err = http.NewRequest("GET", url, nil); err != nil {
153                         continue
154                 }
155
156                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
157
158                 log.Printf("[%v] Begin download %s", requestId, url)
159
160                 var resp *http.Response
161                 if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
162                         statusCode := -1
163                         var respbody []byte
164                         if resp != nil {
165                                 statusCode = resp.StatusCode
166                                 if resp.Body != nil {
167                                         respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
168                                 }
169                         }
170                         response := strings.TrimSpace(string(respbody))
171                         log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"",
172                                 requestId, url, statusCode, err, response)
173                         continue
174                 }
175
176                 if resp.StatusCode == http.StatusOK {
177                         log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode)
178                         return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
179                 }
180         }
181
182         return nil, 0, "", BlockNotFound
183 }
184
185 // Determine if a block with the given hash is available and readable, but does
186 // not return the block contents.
187 func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
188         return this.AuthorizedAsk(hash, "", "")
189 }
190
191 // Determine if a block with the given hash is available and readable with the
192 // given signature and timestamp, but does not return the block contents.
193 func (this KeepClient) AuthorizedAsk(hash string, signature string,
194         timestamp string) (contentLength int64, url string, err error) {
195         // Calculate the ordering for asking servers
196         sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
197
198         for _, host := range sv {
199                 var req *http.Request
200                 var err error
201                 if signature != "" {
202                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
203                                 signature, timestamp)
204                 } else {
205                         url = fmt.Sprintf("%s/%s", host, hash)
206                 }
207
208                 if req, err = http.NewRequest("HEAD", url, nil); err != nil {
209                         continue
210                 }
211
212                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
213
214                 var resp *http.Response
215                 if resp, err = this.Client.Do(req); err != nil {
216                         continue
217                 }
218
219                 if resp.StatusCode == http.StatusOK {
220                         return resp.ContentLength, url, nil
221                 }
222         }
223
224         return 0, "", BlockNotFound
225
226 }
227
228 // Atomically read the service_roots field.
229 func (this *KeepClient) ServiceRoots() map[string]string {
230         r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
231         return *r
232 }
233
234 // Atomically update the service_roots field.  Enables you to update
235 // service_roots without disrupting any GET or PUT operations that might
236 // already be in progress.
237 func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
238         roots := make(map[string]string)
239         for uuid, root := range new_roots {
240                 roots[uuid] = root
241         }
242         atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
243                 unsafe.Pointer(&roots))
244 }
245
246 type Locator struct {
247         Hash      string
248         Size      int
249         Signature string
250         Timestamp string
251 }
252
253 func MakeLocator2(hash string, hints string) (locator Locator) {
254         locator.Hash = hash
255         if hints != "" {
256                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
257                 for _, hint := range strings.Split(hints, "+") {
258                         if hint != "" {
259                                 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
260                                         fmt.Sscanf(hint, "%d", &locator.Size)
261                                 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
262                                         locator.Signature = m[1]
263                                         locator.Timestamp = m[2]
264                                 } else if match, _ := regexp.MatchString("^[:upper:]", hint); match {
265                                         // Any unknown hint that starts with an uppercase letter is
266                                         // presumed to be valid and ignored, to permit forward compatibility.
267                                 } else {
268                                         // Unknown format; not a valid locator.
269                                         return Locator{"", 0, "", ""}
270                                 }
271                         }
272                 }
273         }
274         return locator
275 }
276
277 func MakeLocator(path string) Locator {
278         pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
279         if err != nil {
280                 log.Print("Don't like regexp", err)
281         }
282
283         sm := pathpattern.FindStringSubmatch(path)
284         if sm == nil {
285                 log.Print("Failed match ", path)
286                 return Locator{"", 0, "", ""}
287         }
288
289         return MakeLocator2(sm[1], sm[2])
290 }