14399: Don't insert error text in the middle of an index response.
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/health"
25         "git.curoverse.com/arvados.git/sdk/go/httpserver"
26         "github.com/gorilla/mux"
27         "github.com/prometheus/client_golang/prometheus"
28 )
29
30 type router struct {
31         *mux.Router
32         limiter     httpserver.RequestCounter
33         cluster     *arvados.Cluster
34         remoteProxy remoteProxy
35         metrics     *nodeMetrics
36 }
37
38 // MakeRESTRouter returns a new router that forwards all Keep requests
39 // to the appropriate handlers.
40 func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
41         rtr := &router{
42                 Router:  mux.NewRouter(),
43                 cluster: cluster,
44                 metrics: &nodeMetrics{reg: reg},
45         }
46
47         rtr.HandleFunc(
48                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
49         rtr.HandleFunc(
50                 `/{hash:[0-9a-f]{32}}+{hints}`,
51                 rtr.handleGET).Methods("GET", "HEAD")
52
53         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
54         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
55         // List all blocks stored here. Privileged client only.
56         rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
57         // List blocks stored here whose hash has the given prefix.
58         // Privileged client only.
59         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
60
61         // Internals/debugging info (runtime.MemStats)
62         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
63
64         // List volumes: path, device number, bytes used/avail.
65         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
66
67         // List mounts: UUID, readonly, tier, device ID, ...
68         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
69         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
70         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
71
72         // Replace the current pull queue.
73         rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
74
75         // Replace the current trash queue.
76         rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
77
78         // Untrash moves blocks from trash back into store
79         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
80
81         rtr.Handle("/_health/{check}", &health.Handler{
82                 Token:  theConfig.ManagementToken,
83                 Prefix: "/_health/",
84         }).Methods("GET")
85
86         // Any request which does not match any of these routes gets
87         // 400 Bad Request.
88         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
89
90         rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
91         rtr.metrics.setupBufferPoolMetrics(bufs)
92         rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
93         rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
94         rtr.metrics.setupRequestMetrics(rtr.limiter)
95
96         instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
97                 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
98         return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
99 }
100
101 // BadRequestHandler is a HandleFunc to address bad requests.
102 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
103         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
104 }
105
106 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
107         ctx, cancel := contextForResponse(context.TODO(), resp)
108         defer cancel()
109
110         locator := req.URL.Path[1:]
111         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
112                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
113                 return
114         }
115
116         if theConfig.RequireSignatures {
117                 locator := req.URL.Path[1:] // strip leading slash
118                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
119                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
120                         return
121                 }
122         }
123
124         // TODO: Probe volumes to check whether the block _might_
125         // exist. Some volumes/types could support a quick existence
126         // check without causing other operations to suffer. If all
127         // volumes support that, and assure us the block definitely
128         // isn't here, we can return 404 now instead of waiting for a
129         // buffer.
130
131         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
132         if err != nil {
133                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
134                 return
135         }
136         defer bufs.Put(buf)
137
138         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
139         if err != nil {
140                 code := http.StatusInternalServerError
141                 if err, ok := err.(*KeepError); ok {
142                         code = err.HTTPCode
143                 }
144                 http.Error(resp, err.Error(), code)
145                 return
146         }
147
148         resp.Header().Set("Content-Length", strconv.Itoa(size))
149         resp.Header().Set("Content-Type", "application/octet-stream")
150         resp.Write(buf[:size])
151 }
152
153 // Return a new context that gets cancelled by resp's CloseNotifier.
154 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
155         ctx, cancel := context.WithCancel(parent)
156         if cn, ok := resp.(http.CloseNotifier); ok {
157                 go func(c <-chan bool) {
158                         select {
159                         case <-c:
160                                 theConfig.debugLogf("cancel context")
161                                 cancel()
162                         case <-ctx.Done():
163                         }
164                 }(cn.CloseNotify())
165         }
166         return ctx, cancel
167 }
168
169 // Get a buffer from the pool -- but give up and return a non-nil
170 // error if ctx ends before we get a buffer.
171 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
172         bufReady := make(chan []byte)
173         go func() {
174                 bufReady <- bufs.Get(bufSize)
175         }()
176         select {
177         case buf := <-bufReady:
178                 return buf, nil
179         case <-ctx.Done():
180                 go func() {
181                         // Even if closeNotifier happened first, we
182                         // need to keep waiting for our buf so we can
183                         // return it to the pool.
184                         bufs.Put(<-bufReady)
185                 }()
186                 return nil, ErrClientDisconnect
187         }
188 }
189
190 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
191         ctx, cancel := contextForResponse(context.TODO(), resp)
192         defer cancel()
193
194         hash := mux.Vars(req)["hash"]
195
196         // Detect as many error conditions as possible before reading
197         // the body: avoid transmitting data that will not end up
198         // being written anyway.
199
200         if req.ContentLength == -1 {
201                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
202                 return
203         }
204
205         if req.ContentLength > BlockSize {
206                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
207                 return
208         }
209
210         if len(KeepVM.AllWritable()) == 0 {
211                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
212                 return
213         }
214
215         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
216         if err != nil {
217                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
218                 return
219         }
220
221         _, err = io.ReadFull(req.Body, buf)
222         if err != nil {
223                 http.Error(resp, err.Error(), 500)
224                 bufs.Put(buf)
225                 return
226         }
227
228         replication, err := PutBlock(ctx, buf, hash)
229         bufs.Put(buf)
230
231         if err != nil {
232                 code := http.StatusInternalServerError
233                 if err, ok := err.(*KeepError); ok {
234                         code = err.HTTPCode
235                 }
236                 http.Error(resp, err.Error(), code)
237                 return
238         }
239
240         // Success; add a size hint, sign the locator if possible, and
241         // return it to the client.
242         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
243         apiToken := GetAPIToken(req)
244         if theConfig.blobSigningKey != nil && apiToken != "" {
245                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
246                 returnHash = SignLocator(returnHash, apiToken, expiry)
247         }
248         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
249         resp.Write([]byte(returnHash + "\n"))
250 }
251
252 // IndexHandler responds to "/index", "/index/{prefix}", and
253 // "/mounts/{uuid}/blocks" requests.
254 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
255         if !IsSystemAuth(GetAPIToken(req)) {
256                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
257                 return
258         }
259
260         prefix := mux.Vars(req)["prefix"]
261         if prefix == "" {
262                 req.ParseForm()
263                 prefix = req.Form.Get("prefix")
264         }
265
266         uuid := mux.Vars(req)["uuid"]
267
268         var vols []Volume
269         if uuid == "" {
270                 vols = KeepVM.AllReadable()
271         } else if v := KeepVM.Lookup(uuid, false); v == nil {
272                 http.Error(resp, "mount not found", http.StatusNotFound)
273                 return
274         } else {
275                 vols = []Volume{v}
276         }
277
278         for _, v := range vols {
279                 if err := v.IndexTo(prefix, resp); err != nil {
280                         // We can't send an error message to the
281                         // client because we might have already sent
282                         // headers and index content. All we can do is
283                         // log the error in our own logs, and (in
284                         // cases where headers haven't been sent yet)
285                         // set a 500 status.
286                         //
287                         // If headers have already been sent, the
288                         // client must notice the lack of trailing
289                         // newline as an indication that the response
290                         // is incomplete.
291                         log.Printf("index error from volume %s: %s", v, err)
292                         http.Error(resp, "", http.StatusInternalServerError)
293                         return
294                 }
295         }
296         // An empty line at EOF is the only way the client can be
297         // assured the entire index was received.
298         resp.Write([]byte{'\n'})
299 }
300
301 // MountsHandler responds to "GET /mounts" requests.
302 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
303         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
304         if err != nil {
305                 http.Error(resp, err.Error(), http.StatusInternalServerError)
306         }
307 }
308
309 // PoolStatus struct
310 type PoolStatus struct {
311         Alloc uint64 `json:"BytesAllocatedCumulative"`
312         Cap   int    `json:"BuffersMax"`
313         Len   int    `json:"BuffersInUse"`
314 }
315
316 type volumeStatusEnt struct {
317         Label         string
318         Status        *VolumeStatus `json:",omitempty"`
319         VolumeStats   *ioStats      `json:",omitempty"`
320         InternalStats interface{}   `json:",omitempty"`
321 }
322
323 // NodeStatus struct
324 type NodeStatus struct {
325         Volumes         []*volumeStatusEnt
326         BufferPool      PoolStatus
327         PullQueue       WorkQueueStatus
328         TrashQueue      WorkQueueStatus
329         RequestsCurrent int
330         RequestsMax     int
331         Version         string
332 }
333
334 var st NodeStatus
335 var stLock sync.Mutex
336
337 // DebugHandler addresses /debug.json requests.
338 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
339         type debugStats struct {
340                 MemStats runtime.MemStats
341         }
342         var ds debugStats
343         runtime.ReadMemStats(&ds.MemStats)
344         err := json.NewEncoder(resp).Encode(&ds)
345         if err != nil {
346                 http.Error(resp, err.Error(), 500)
347         }
348 }
349
350 // StatusHandler addresses /status.json requests.
351 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
352         stLock.Lock()
353         rtr.readNodeStatus(&st)
354         jstat, err := json.Marshal(&st)
355         stLock.Unlock()
356         if err == nil {
357                 resp.Write(jstat)
358         } else {
359                 log.Printf("json.Marshal: %s", err)
360                 log.Printf("NodeStatus = %v", &st)
361                 http.Error(resp, err.Error(), 500)
362         }
363 }
364
365 // populate the given NodeStatus struct with current values.
366 func (rtr *router) readNodeStatus(st *NodeStatus) {
367         st.Version = version
368         vols := KeepVM.AllReadable()
369         if cap(st.Volumes) < len(vols) {
370                 st.Volumes = make([]*volumeStatusEnt, len(vols))
371         }
372         st.Volumes = st.Volumes[:0]
373         for _, vol := range vols {
374                 var internalStats interface{}
375                 if vol, ok := vol.(InternalStatser); ok {
376                         internalStats = vol.InternalStats()
377                 }
378                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
379                         Label:         vol.String(),
380                         Status:        vol.Status(),
381                         InternalStats: internalStats,
382                         //VolumeStats: KeepVM.VolumeStats(vol),
383                 })
384         }
385         st.BufferPool.Alloc = bufs.Alloc()
386         st.BufferPool.Cap = bufs.Cap()
387         st.BufferPool.Len = bufs.Len()
388         st.PullQueue = getWorkQueueStatus(pullq)
389         st.TrashQueue = getWorkQueueStatus(trashq)
390         if rtr.limiter != nil {
391                 st.RequestsCurrent = rtr.limiter.Current()
392                 st.RequestsMax = rtr.limiter.Max()
393         }
394 }
395
396 // return a WorkQueueStatus for the given queue. If q is nil (which
397 // should never happen except in test suites), return a zero status
398 // value instead of crashing.
399 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
400         if q == nil {
401                 // This should only happen during tests.
402                 return WorkQueueStatus{}
403         }
404         return q.Status()
405 }
406
407 // DeleteHandler processes DELETE requests.
408 //
409 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
410 // from all connected volumes.
411 //
412 // Only the Data Manager, or an Arvados admin with scope "all", are
413 // allowed to issue DELETE requests.  If a DELETE request is not
414 // authenticated or is issued by a non-admin user, the server returns
415 // a PermissionError.
416 //
417 // Upon receiving a valid request from an authorized user,
418 // DeleteHandler deletes all copies of the specified block on local
419 // writable volumes.
420 //
421 // Response format:
422 //
423 // If the requested blocks was not found on any volume, the response
424 // code is HTTP 404 Not Found.
425 //
426 // Otherwise, the response code is 200 OK, with a response body
427 // consisting of the JSON message
428 //
429 //    {"copies_deleted":d,"copies_failed":f}
430 //
431 // where d and f are integers representing the number of blocks that
432 // were successfully and unsuccessfully deleted.
433 //
434 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
435         hash := mux.Vars(req)["hash"]
436
437         // Confirm that this user is an admin and has a token with unlimited scope.
438         var tok = GetAPIToken(req)
439         if tok == "" || !CanDelete(tok) {
440                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
441                 return
442         }
443
444         if !theConfig.EnableDelete {
445                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
446                 return
447         }
448
449         // Delete copies of this block from all available volumes.
450         // Report how many blocks were successfully deleted, and how
451         // many were found on writable volumes but not deleted.
452         var result struct {
453                 Deleted int `json:"copies_deleted"`
454                 Failed  int `json:"copies_failed"`
455         }
456         for _, vol := range KeepVM.AllWritable() {
457                 if err := vol.Trash(hash); err == nil {
458                         result.Deleted++
459                 } else if os.IsNotExist(err) {
460                         continue
461                 } else {
462                         result.Failed++
463                         log.Println("DeleteHandler:", err)
464                 }
465         }
466
467         var st int
468
469         if result.Deleted == 0 && result.Failed == 0 {
470                 st = http.StatusNotFound
471         } else {
472                 st = http.StatusOK
473         }
474
475         resp.WriteHeader(st)
476
477         if st == http.StatusOK {
478                 if body, err := json.Marshal(result); err == nil {
479                         resp.Write(body)
480                 } else {
481                         log.Printf("json.Marshal: %s (result = %v)", err, result)
482                         http.Error(resp, err.Error(), 500)
483                 }
484         }
485 }
486
487 /* PullHandler processes "PUT /pull" requests for the data manager.
488    The request body is a JSON message containing a list of pull
489    requests in the following format:
490
491    [
492       {
493          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
494          "servers":[
495                         "keep0.qr1hi.arvadosapi.com:25107",
496                         "keep1.qr1hi.arvadosapi.com:25108"
497                  ]
498           },
499           {
500                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
501                  "servers":[
502                         "10.0.1.5:25107",
503                         "10.0.1.6:25107",
504                         "10.0.1.7:25108"
505                  ]
506           },
507           ...
508    ]
509
510    Each pull request in the list consists of a block locator string
511    and an ordered list of servers.  Keepstore should try to fetch the
512    block from each server in turn.
513
514    If the request has not been sent by the Data Manager, return 401
515    Unauthorized.
516
517    If the JSON unmarshalling fails, return 400 Bad Request.
518 */
519
520 // PullRequest consists of a block locator and an ordered list of servers
521 type PullRequest struct {
522         Locator string   `json:"locator"`
523         Servers []string `json:"servers"`
524
525         // Destination mount, or "" for "anywhere"
526         MountUUID string `json:"mount_uuid"`
527 }
528
529 // PullHandler processes "PUT /pull" requests for the data manager.
530 func PullHandler(resp http.ResponseWriter, req *http.Request) {
531         // Reject unauthorized requests.
532         if !IsSystemAuth(GetAPIToken(req)) {
533                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
534                 return
535         }
536
537         // Parse the request body.
538         var pr []PullRequest
539         r := json.NewDecoder(req.Body)
540         if err := r.Decode(&pr); err != nil {
541                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
542                 return
543         }
544
545         // We have a properly formatted pull list sent from the data
546         // manager.  Report success and send the list to the pull list
547         // manager for further handling.
548         resp.WriteHeader(http.StatusOK)
549         resp.Write([]byte(
550                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
551
552         plist := list.New()
553         for _, p := range pr {
554                 plist.PushBack(p)
555         }
556         pullq.ReplaceQueue(plist)
557 }
558
559 // TrashRequest consists of a block locator and its Mtime
560 type TrashRequest struct {
561         Locator    string `json:"locator"`
562         BlockMtime int64  `json:"block_mtime"`
563
564         // Target mount, or "" for "everywhere"
565         MountUUID string `json:"mount_uuid"`
566 }
567
568 // TrashHandler processes /trash requests.
569 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
570         // Reject unauthorized requests.
571         if !IsSystemAuth(GetAPIToken(req)) {
572                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
573                 return
574         }
575
576         // Parse the request body.
577         var trash []TrashRequest
578         r := json.NewDecoder(req.Body)
579         if err := r.Decode(&trash); err != nil {
580                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
581                 return
582         }
583
584         // We have a properly formatted trash list sent from the data
585         // manager.  Report success and send the list to the trash work
586         // queue for further handling.
587         resp.WriteHeader(http.StatusOK)
588         resp.Write([]byte(
589                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
590
591         tlist := list.New()
592         for _, t := range trash {
593                 tlist.PushBack(t)
594         }
595         trashq.ReplaceQueue(tlist)
596 }
597
598 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
599 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
600         // Reject unauthorized requests.
601         if !IsSystemAuth(GetAPIToken(req)) {
602                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
603                 return
604         }
605
606         hash := mux.Vars(req)["hash"]
607
608         if len(KeepVM.AllWritable()) == 0 {
609                 http.Error(resp, "No writable volumes", http.StatusNotFound)
610                 return
611         }
612
613         var untrashedOn, failedOn []string
614         var numNotFound int
615         for _, vol := range KeepVM.AllWritable() {
616                 err := vol.Untrash(hash)
617
618                 if os.IsNotExist(err) {
619                         numNotFound++
620                 } else if err != nil {
621                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
622                         failedOn = append(failedOn, vol.String())
623                 } else {
624                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
625                         untrashedOn = append(untrashedOn, vol.String())
626                 }
627         }
628
629         if numNotFound == len(KeepVM.AllWritable()) {
630                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
631                 return
632         }
633
634         if len(failedOn) == len(KeepVM.AllWritable()) {
635                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
636         } else {
637                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
638                 if len(failedOn) > 0 {
639                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
640                 }
641                 resp.Write([]byte(respBody))
642         }
643 }
644
645 // GetBlock and PutBlock implement lower-level code for handling
646 // blocks by rooting through volumes connected to the local machine.
647 // Once the handler has determined that system policy permits the
648 // request, it calls these methods to perform the actual operation.
649 //
650 // TODO(twp): this code would probably be better located in the
651 // VolumeManager interface. As an abstraction, the VolumeManager
652 // should be the only part of the code that cares about which volume a
653 // block is stored on, so it should be responsible for figuring out
654 // which volume to check for fetching blocks, storing blocks, etc.
655
656 // GetBlock fetches the block identified by "hash" into the provided
657 // buf, and returns the data size.
658 //
659 // If the block cannot be found on any volume, returns NotFoundError.
660 //
661 // If the block found does not have the correct MD5 hash, returns
662 // DiskHashError.
663 //
664 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
665         // Attempt to read the requested hash from a keep volume.
666         errorToCaller := NotFoundError
667
668         for _, vol := range KeepVM.AllReadable() {
669                 size, err := vol.Get(ctx, hash, buf)
670                 select {
671                 case <-ctx.Done():
672                         return 0, ErrClientDisconnect
673                 default:
674                 }
675                 if err != nil {
676                         // IsNotExist is an expected error and may be
677                         // ignored. All other errors are logged. In
678                         // any case we continue trying to read other
679                         // volumes. If all volumes report IsNotExist,
680                         // we return a NotFoundError.
681                         if !os.IsNotExist(err) {
682                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
683                         }
684                         // If some volume returns a transient error, return it to the caller
685                         // instead of "Not found" so it can retry.
686                         if err == VolumeBusyError {
687                                 errorToCaller = err.(*KeepError)
688                         }
689                         continue
690                 }
691                 // Check the file checksum.
692                 //
693                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
694                 if filehash != hash {
695                         // TODO: Try harder to tell a sysadmin about
696                         // this.
697                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
698                                 vol, hash, filehash)
699                         errorToCaller = DiskHashError
700                         continue
701                 }
702                 if errorToCaller == DiskHashError {
703                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
704                                 vol, hash)
705                 }
706                 return size, nil
707         }
708         return 0, errorToCaller
709 }
710
711 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
712 //
713 // PutBlock(ctx, block, hash)
714 //   Stores the BLOCK (identified by the content id HASH) in Keep.
715 //
716 //   The MD5 checksum of the block must be identical to the content id HASH.
717 //   If not, an error is returned.
718 //
719 //   PutBlock stores the BLOCK on the first Keep volume with free space.
720 //   A failure code is returned to the user only if all volumes fail.
721 //
722 //   On success, PutBlock returns nil.
723 //   On failure, it returns a KeepError with one of the following codes:
724 //
725 //   500 Collision
726 //          A different block with the same hash already exists on this
727 //          Keep server.
728 //   422 MD5Fail
729 //          The MD5 hash of the BLOCK does not match the argument HASH.
730 //   503 Full
731 //          There was not enough space left in any Keep volume to store
732 //          the object.
733 //   500 Fail
734 //          The object could not be stored for some other reason (e.g.
735 //          all writes failed). The text of the error message should
736 //          provide as much detail as possible.
737 //
738 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
739         // Check that BLOCK's checksum matches HASH.
740         blockhash := fmt.Sprintf("%x", md5.Sum(block))
741         if blockhash != hash {
742                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
743                 return 0, RequestHashError
744         }
745
746         // If we already have this data, it's intact on disk, and we
747         // can update its timestamp, return success. If we have
748         // different data with the same hash, return failure.
749         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
750                 return n, err
751         } else if ctx.Err() != nil {
752                 return 0, ErrClientDisconnect
753         }
754
755         // Choose a Keep volume to write to.
756         // If this volume fails, try all of the volumes in order.
757         if vol := KeepVM.NextWritable(); vol != nil {
758                 if err := vol.Put(ctx, hash, block); err == nil {
759                         return vol.Replication(), nil // success!
760                 }
761                 if ctx.Err() != nil {
762                         return 0, ErrClientDisconnect
763                 }
764         }
765
766         writables := KeepVM.AllWritable()
767         if len(writables) == 0 {
768                 log.Print("No writable volumes.")
769                 return 0, FullError
770         }
771
772         allFull := true
773         for _, vol := range writables {
774                 err := vol.Put(ctx, hash, block)
775                 if ctx.Err() != nil {
776                         return 0, ErrClientDisconnect
777                 }
778                 if err == nil {
779                         return vol.Replication(), nil // success!
780                 }
781                 if err != FullError {
782                         // The volume is not full but the
783                         // write did not succeed.  Report the
784                         // error and continue trying.
785                         allFull = false
786                         log.Printf("%s: Write(%s): %s", vol, hash, err)
787                 }
788         }
789
790         if allFull {
791                 log.Print("All volumes are full.")
792                 return 0, FullError
793         }
794         // Already logged the non-full errors.
795         return 0, GenericError
796 }
797
798 // CompareAndTouch returns the current replication level if one of the
799 // volumes already has the given content and it successfully updates
800 // the relevant block's modification time in order to protect it from
801 // premature garbage collection. Otherwise, it returns a non-nil
802 // error.
803 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
804         var bestErr error = NotFoundError
805         for _, vol := range KeepVM.AllWritable() {
806                 err := vol.Compare(ctx, hash, buf)
807                 if ctx.Err() != nil {
808                         return 0, ctx.Err()
809                 } else if err == CollisionError {
810                         // Stop if we have a block with same hash but
811                         // different content. (It will be impossible
812                         // to tell which one is wanted if we have
813                         // both, so there's no point writing it even
814                         // on a different volume.)
815                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
816                         return 0, err
817                 } else if os.IsNotExist(err) {
818                         // Block does not exist. This is the only
819                         // "normal" error: we don't log anything.
820                         continue
821                 } else if err != nil {
822                         // Couldn't open file, data is corrupt on
823                         // disk, etc.: log this abnormal condition,
824                         // and try the next volume.
825                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
826                         continue
827                 }
828                 if err := vol.Touch(hash); err != nil {
829                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
830                         bestErr = err
831                         continue
832                 }
833                 // Compare and Touch both worked --> done.
834                 return vol.Replication(), nil
835         }
836         return 0, bestErr
837 }
838
839 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
840
841 // IsValidLocator returns true if the specified string is a valid Keep locator.
842 //   When Keep is extended to support hash types other than MD5,
843 //   this should be updated to cover those as well.
844 //
845 func IsValidLocator(loc string) bool {
846         return validLocatorRe.MatchString(loc)
847 }
848
849 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
850
851 // GetAPIToken returns the OAuth2 token from the Authorization
852 // header of a HTTP request, or an empty string if no matching
853 // token is found.
854 func GetAPIToken(req *http.Request) string {
855         if auth, ok := req.Header["Authorization"]; ok {
856                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
857                         return match[2]
858                 }
859         }
860         return ""
861 }
862
863 // IsExpired returns true if the given Unix timestamp (expressed as a
864 // hexadecimal string) is in the past, or if timestampHex cannot be
865 // parsed as a hexadecimal string.
866 func IsExpired(timestampHex string) bool {
867         ts, err := strconv.ParseInt(timestampHex, 16, 0)
868         if err != nil {
869                 log.Printf("IsExpired: %s", err)
870                 return true
871         }
872         return time.Unix(ts, 0).Before(time.Now())
873 }
874
875 // CanDelete returns true if the user identified by apiToken is
876 // allowed to delete blocks.
877 func CanDelete(apiToken string) bool {
878         if apiToken == "" {
879                 return false
880         }
881         // Blocks may be deleted only when Keep has been configured with a
882         // data manager.
883         if IsSystemAuth(apiToken) {
884                 return true
885         }
886         // TODO(twp): look up apiToken with the API server
887         // return true if is_admin is true and if the token
888         // has unlimited scope
889         return false
890 }
891
892 // IsSystemAuth returns true if the given token is allowed to perform
893 // system level actions like deleting data.
894 func IsSystemAuth(token string) bool {
895         return token != "" && token == theConfig.systemAuthToken
896 }