7167: update the progress message to say "est. time remaining" instead of "ETA" which...
[arvados.git] / sdk / go / keepclient / keepclient.go
1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
2 package keepclient
3
4 import (
5         "bytes"
6         "crypto/md5"
7         "crypto/tls"
8         "errors"
9         "fmt"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/streamer"
12         "io"
13         "io/ioutil"
14         "log"
15         "net/http"
16         "regexp"
17         "strconv"
18         "strings"
19         "sync"
20 )
21
22 // A Keep "block" is 64MB.
23 const BLOCKSIZE = 64 * 1024 * 1024
24
25 var BlockNotFound = errors.New("Block not found")
26 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
27 var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
28 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
29 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
30 var InvalidLocatorError = errors.New("Invalid locator")
31
32 // ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server
33 var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
34
35 // ErrIncompleteIndex is returned when the Index response does not end with a new empty line
36 var ErrIncompleteIndex = errors.New("Got incomplete index")
37
38 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
39 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
40
41 // Information about Arvados and Keep servers.
42 type KeepClient struct {
43         Arvados            *arvadosclient.ArvadosClient
44         Want_replicas      int
45         Using_proxy        bool
46         localRoots         *map[string]string
47         writableLocalRoots *map[string]string
48         gatewayRoots       *map[string]string
49         lock               sync.RWMutex
50         Client             *http.Client
51
52         // set to 1 if all writable services are of disk type, otherwise 0
53         replicasPerService int
54 }
55
56 // MakeKeepClient creates a new KeepClient by contacting the API server to discover Keep servers.
57 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
58         kc := New(arv)
59         return kc, kc.DiscoverKeepServers()
60 }
61
62 // New func creates a new KeepClient struct.
63 // This func does not discover keep servers. It is the caller's responsibility.
64 func New(arv *arvadosclient.ArvadosClient) *KeepClient {
65         kc := &KeepClient{
66                 Arvados:       arv,
67                 Want_replicas: 2,
68                 Using_proxy:   false,
69                 Client: &http.Client{Transport: &http.Transport{
70                         TLSClientConfig: &tls.Config{InsecureSkipVerify: arv.ApiInsecure}}},
71         }
72         return kc
73 }
74
75 // Put a block given the block hash, a reader, and the number of bytes
76 // to read from the reader (which must be between 0 and BLOCKSIZE).
77 //
78 // Returns the locator for the written block, the number of replicas
79 // written, and an error.
80 //
81 // Returns an InsufficientReplicas error if 0 <= replicas <
82 // kc.Wants_replicas.
83 func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
84         // Buffer for reads from 'r'
85         var bufsize int
86         if dataBytes > 0 {
87                 if dataBytes > BLOCKSIZE {
88                         return "", 0, OversizeBlockError
89                 }
90                 bufsize = int(dataBytes)
91         } else {
92                 bufsize = BLOCKSIZE
93         }
94
95         t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
96         defer t.Close()
97
98         return kc.putReplicas(hash, t, dataBytes)
99 }
100
101 // PutHB writes a block to Keep. The hash of the bytes is given in
102 // hash, and the data is given in buf.
103 //
104 // Return values are the same as for PutHR.
105 func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
106         t := streamer.AsyncStreamFromSlice(buf)
107         defer t.Close()
108         return kc.putReplicas(hash, t, int64(len(buf)))
109 }
110
111 // PutB writes a block to Keep. It computes the hash itself.
112 //
113 // Return values are the same as for PutHR.
114 func (kc *KeepClient) PutB(buffer []byte) (string, int, error) {
115         hash := fmt.Sprintf("%x", md5.Sum(buffer))
116         return kc.PutHB(hash, buffer)
117 }
118
119 // PutR writes a block to Keep. It first reads all data from r into a buffer
120 // in order to compute the hash.
121 //
122 // Return values are the same as for PutHR.
123 //
124 // If the block hash and data size are known, PutHR is more efficient.
125 func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
126         if buffer, err := ioutil.ReadAll(r); err != nil {
127                 return "", 0, err
128         } else {
129                 return kc.PutB(buffer)
130         }
131 }
132
133 // Get() retrieves a block, given a locator. Returns a reader, the
134 // expected data length, the URL the block is being fetched from, and
135 // an error.
136 //
137 // If the block checksum does not match, the final Read() on the
138 // reader returned by this method will return a BadChecksum error
139 // instead of EOF.
140 func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
141         var errs []string
142         for _, host := range kc.getSortedRoots(locator) {
143                 url := host + "/" + locator
144                 req, err := http.NewRequest("GET", url, nil)
145                 if err != nil {
146                         errs = append(errs, fmt.Sprintf("%s: %v", url, err))
147                         continue
148                 }
149                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
150                 resp, err := kc.Client.Do(req)
151                 if err != nil {
152                         errs = append(errs, fmt.Sprintf("%s: %v", url, err))
153                         continue
154                 } else if resp.StatusCode != http.StatusOK {
155                         respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
156                         resp.Body.Close()
157                         errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
158                                 url, resp.StatusCode, bytes.TrimSpace(respbody)))
159                         continue
160                 }
161                 return HashCheckingReader{
162                         Reader: resp.Body,
163                         Hash:   md5.New(),
164                         Check:  locator[0:32],
165                 }, resp.ContentLength, url, nil
166         }
167         log.Printf("DEBUG: GET %s failed: %v", locator, errs)
168         return nil, 0, "", BlockNotFound
169 }
170
171 // Ask() verifies that a block with the given hash is available and
172 // readable, according to at least one Keep service. Unlike Get, it
173 // does not retrieve the data or verify that the data content matches
174 // the hash specified by the locator.
175 //
176 // Returns the data size (content length) reported by the Keep service
177 // and the URI reporting the data size.
178 func (kc *KeepClient) Ask(locator string) (int64, string, error) {
179         for _, host := range kc.getSortedRoots(locator) {
180                 url := host + "/" + locator
181                 req, err := http.NewRequest("HEAD", url, nil)
182                 if err != nil {
183                         continue
184                 }
185                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
186                 if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK {
187                         return resp.ContentLength, url, nil
188                 }
189         }
190         return 0, "", BlockNotFound
191 }
192
193 // GetIndex retrieves a list of blocks stored on the given server whose hashes
194 // begin with the given prefix. The returned reader will return an error (other
195 // than EOF) if the complete index cannot be retrieved.
196 //
197 // This is meant to be used only by system components and admin tools.
198 // It will return an error unless the client is using a "data manager token"
199 // recognized by the Keep services.
200 func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) {
201         url := kc.LocalRoots()[keepServiceUUID]
202         if url == "" {
203                 return nil, ErrNoSuchKeepServer
204         }
205
206         url += "/index"
207         if prefix != "" {
208                 url += "/" + prefix
209         }
210
211         req, err := http.NewRequest("GET", url, nil)
212         if err != nil {
213                 return nil, err
214         }
215
216         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
217         resp, err := kc.Client.Do(req)
218         if err != nil {
219                 return nil, err
220         }
221
222         defer resp.Body.Close()
223
224         if resp.StatusCode != http.StatusOK {
225                 return nil, fmt.Errorf("Got http status code: %d", resp.StatusCode)
226         }
227
228         var respBody []byte
229         respBody, err = ioutil.ReadAll(resp.Body)
230         if err != nil {
231                 return nil, err
232         }
233
234         // Got index; verify that it is complete
235         // The response should be "\n" if no locators matched the prefix
236         // Else, it should be a list of locators followed by a blank line
237         if !bytes.Equal(respBody, []byte("\n")) && !bytes.HasSuffix(respBody, []byte("\n\n")) {
238                 return nil, ErrIncompleteIndex
239         }
240
241         // Got complete index; strip the trailing newline and send
242         return bytes.NewReader(respBody[0 : len(respBody)-1]), nil
243 }
244
245 // LocalRoots() returns the map of local (i.e., disk and proxy) Keep
246 // services: uuid -> baseURI.
247 func (kc *KeepClient) LocalRoots() map[string]string {
248         kc.lock.RLock()
249         defer kc.lock.RUnlock()
250         return *kc.localRoots
251 }
252
253 // GatewayRoots() returns the map of Keep remote gateway services:
254 // uuid -> baseURI.
255 func (kc *KeepClient) GatewayRoots() map[string]string {
256         kc.lock.RLock()
257         defer kc.lock.RUnlock()
258         return *kc.gatewayRoots
259 }
260
261 // WritableLocalRoots() returns the map of writable local Keep services:
262 // uuid -> baseURI.
263 func (kc *KeepClient) WritableLocalRoots() map[string]string {
264         kc.lock.RLock()
265         defer kc.lock.RUnlock()
266         return *kc.writableLocalRoots
267 }
268
269 // SetServiceRoots updates the localRoots and gatewayRoots maps,
270 // without risk of disrupting operations that are already in progress.
271 //
272 // The KeepClient makes its own copy of the supplied maps, so the
273 // caller can reuse/modify them after SetServiceRoots returns, but
274 // they should not be modified by any other goroutine while
275 // SetServiceRoots is running.
276 func (kc *KeepClient) SetServiceRoots(newLocals, newWritableLocals map[string]string, newGateways map[string]string) {
277         locals := make(map[string]string)
278         for uuid, root := range newLocals {
279                 locals[uuid] = root
280         }
281
282         writables := make(map[string]string)
283         for uuid, root := range newWritableLocals {
284                 writables[uuid] = root
285         }
286
287         gateways := make(map[string]string)
288         for uuid, root := range newGateways {
289                 gateways[uuid] = root
290         }
291
292         kc.lock.Lock()
293         defer kc.lock.Unlock()
294         kc.localRoots = &locals
295         kc.writableLocalRoots = &writables
296         kc.gatewayRoots = &gateways
297 }
298
299 // getSortedRoots returns a list of base URIs of Keep services, in the
300 // order they should be attempted in order to retrieve content for the
301 // given locator.
302 func (kc *KeepClient) getSortedRoots(locator string) []string {
303         var found []string
304         for _, hint := range strings.Split(locator, "+") {
305                 if len(hint) < 7 || hint[0:2] != "K@" {
306                         // Not a service hint.
307                         continue
308                 }
309                 if len(hint) == 7 {
310                         // +K@abcde means fetch from proxy at
311                         // keep.abcde.arvadosapi.com
312                         found = append(found, "https://keep."+hint[2:]+".arvadosapi.com")
313                 } else if len(hint) == 29 {
314                         // +K@abcde-abcde-abcdeabcdeabcde means fetch
315                         // from gateway with given uuid
316                         if gwURI, ok := kc.GatewayRoots()[hint[2:]]; ok {
317                                 found = append(found, gwURI)
318                         }
319                         // else this hint is no use to us; carry on.
320                 }
321         }
322         // After trying all usable service hints, fall back to local roots.
323         found = append(found, NewRootSorter(kc.LocalRoots(), locator[0:32]).GetSortedRoots()...)
324         return found
325 }
326
327 type Locator struct {
328         Hash  string
329         Size  int      // -1 if data size is not known
330         Hints []string // Including the size hint, if any
331 }
332
333 func (loc *Locator) String() string {
334         s := loc.Hash
335         if len(loc.Hints) > 0 {
336                 s = s + "+" + strings.Join(loc.Hints, "+")
337         }
338         return s
339 }
340
341 var locatorMatcher = regexp.MustCompile("^([0-9a-f]{32})([+](.*))?$")
342
343 func MakeLocator(path string) (*Locator, error) {
344         sm := locatorMatcher.FindStringSubmatch(path)
345         if sm == nil {
346                 return nil, InvalidLocatorError
347         }
348         loc := Locator{Hash: sm[1], Size: -1}
349         if sm[2] != "" {
350                 loc.Hints = strings.Split(sm[3], "+")
351         } else {
352                 loc.Hints = []string{}
353         }
354         if len(loc.Hints) > 0 {
355                 if size, err := strconv.Atoi(loc.Hints[0]); err == nil {
356                         loc.Size = size
357                 }
358         }
359         return &loc, nil
360 }