Merge branch 'master' into 14965-arv-mount-py-three
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
25         "git.curoverse.com/arvados.git/sdk/go/health"
26         "git.curoverse.com/arvados.git/sdk/go/httpserver"
27         "github.com/gorilla/mux"
28         "github.com/prometheus/client_golang/prometheus"
29 )
30
31 type router struct {
32         *mux.Router
33         limiter     httpserver.RequestCounter
34         cluster     *arvados.Cluster
35         remoteProxy remoteProxy
36         metrics     *nodeMetrics
37 }
38
39 // MakeRESTRouter returns a new router that forwards all Keep requests
40 // to the appropriate handlers.
41 func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
42         rtr := &router{
43                 Router:  mux.NewRouter(),
44                 cluster: cluster,
45                 metrics: &nodeMetrics{reg: reg},
46         }
47
48         rtr.HandleFunc(
49                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
50         rtr.HandleFunc(
51                 `/{hash:[0-9a-f]{32}}+{hints}`,
52                 rtr.handleGET).Methods("GET", "HEAD")
53
54         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
55         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
56         // List all blocks stored here. Privileged client only.
57         rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
58         // List blocks stored here whose hash has the given prefix.
59         // Privileged client only.
60         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
61
62         // Internals/debugging info (runtime.MemStats)
63         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
64
65         // List volumes: path, device number, bytes used/avail.
66         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
67
68         // List mounts: UUID, readonly, tier, device ID, ...
69         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
70         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
71         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
72
73         // Replace the current pull queue.
74         rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
75
76         // Replace the current trash queue.
77         rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
78
79         // Untrash moves blocks from trash back into store
80         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
81
82         rtr.Handle("/_health/{check}", &health.Handler{
83                 Token:  theConfig.ManagementToken,
84                 Prefix: "/_health/",
85         }).Methods("GET")
86
87         // Any request which does not match any of these routes gets
88         // 400 Bad Request.
89         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
90
91         rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
92         rtr.metrics.setupBufferPoolMetrics(bufs)
93         rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
94         rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
95         rtr.metrics.setupRequestMetrics(rtr.limiter)
96
97         instrumented := httpserver.Instrument(rtr.metrics.reg, log,
98                 httpserver.HandlerWithContext(
99                         ctxlog.Context(context.Background(), log),
100                         httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
101         return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
102 }
103
104 // BadRequestHandler is a HandleFunc to address bad requests.
105 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
106         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
107 }
108
109 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
110         ctx, cancel := contextForResponse(context.TODO(), resp)
111         defer cancel()
112
113         locator := req.URL.Path[1:]
114         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
115                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
116                 return
117         }
118
119         if theConfig.RequireSignatures {
120                 locator := req.URL.Path[1:] // strip leading slash
121                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
122                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
123                         return
124                 }
125         }
126
127         // TODO: Probe volumes to check whether the block _might_
128         // exist. Some volumes/types could support a quick existence
129         // check without causing other operations to suffer. If all
130         // volumes support that, and assure us the block definitely
131         // isn't here, we can return 404 now instead of waiting for a
132         // buffer.
133
134         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
135         if err != nil {
136                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
137                 return
138         }
139         defer bufs.Put(buf)
140
141         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
142         if err != nil {
143                 code := http.StatusInternalServerError
144                 if err, ok := err.(*KeepError); ok {
145                         code = err.HTTPCode
146                 }
147                 http.Error(resp, err.Error(), code)
148                 return
149         }
150
151         resp.Header().Set("Content-Length", strconv.Itoa(size))
152         resp.Header().Set("Content-Type", "application/octet-stream")
153         resp.Write(buf[:size])
154 }
155
156 // Return a new context that gets cancelled by resp's CloseNotifier.
157 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
158         ctx, cancel := context.WithCancel(parent)
159         if cn, ok := resp.(http.CloseNotifier); ok {
160                 go func(c <-chan bool) {
161                         select {
162                         case <-c:
163                                 theConfig.debugLogf("cancel context")
164                                 cancel()
165                         case <-ctx.Done():
166                         }
167                 }(cn.CloseNotify())
168         }
169         return ctx, cancel
170 }
171
172 // Get a buffer from the pool -- but give up and return a non-nil
173 // error if ctx ends before we get a buffer.
174 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
175         bufReady := make(chan []byte)
176         go func() {
177                 bufReady <- bufs.Get(bufSize)
178         }()
179         select {
180         case buf := <-bufReady:
181                 return buf, nil
182         case <-ctx.Done():
183                 go func() {
184                         // Even if closeNotifier happened first, we
185                         // need to keep waiting for our buf so we can
186                         // return it to the pool.
187                         bufs.Put(<-bufReady)
188                 }()
189                 return nil, ErrClientDisconnect
190         }
191 }
192
193 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
194         ctx, cancel := contextForResponse(context.TODO(), resp)
195         defer cancel()
196
197         hash := mux.Vars(req)["hash"]
198
199         // Detect as many error conditions as possible before reading
200         // the body: avoid transmitting data that will not end up
201         // being written anyway.
202
203         if req.ContentLength == -1 {
204                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
205                 return
206         }
207
208         if req.ContentLength > BlockSize {
209                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
210                 return
211         }
212
213         if len(KeepVM.AllWritable()) == 0 {
214                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
215                 return
216         }
217
218         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
219         if err != nil {
220                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
221                 return
222         }
223
224         _, err = io.ReadFull(req.Body, buf)
225         if err != nil {
226                 http.Error(resp, err.Error(), 500)
227                 bufs.Put(buf)
228                 return
229         }
230
231         replication, err := PutBlock(ctx, buf, hash)
232         bufs.Put(buf)
233
234         if err != nil {
235                 code := http.StatusInternalServerError
236                 if err, ok := err.(*KeepError); ok {
237                         code = err.HTTPCode
238                 }
239                 http.Error(resp, err.Error(), code)
240                 return
241         }
242
243         // Success; add a size hint, sign the locator if possible, and
244         // return it to the client.
245         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
246         apiToken := GetAPIToken(req)
247         if theConfig.blobSigningKey != nil && apiToken != "" {
248                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
249                 returnHash = SignLocator(returnHash, apiToken, expiry)
250         }
251         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
252         resp.Write([]byte(returnHash + "\n"))
253 }
254
255 // IndexHandler responds to "/index", "/index/{prefix}", and
256 // "/mounts/{uuid}/blocks" requests.
257 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
258         if !IsSystemAuth(GetAPIToken(req)) {
259                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
260                 return
261         }
262
263         prefix := mux.Vars(req)["prefix"]
264         if prefix == "" {
265                 req.ParseForm()
266                 prefix = req.Form.Get("prefix")
267         }
268
269         uuid := mux.Vars(req)["uuid"]
270
271         var vols []Volume
272         if uuid == "" {
273                 vols = KeepVM.AllReadable()
274         } else if v := KeepVM.Lookup(uuid, false); v == nil {
275                 http.Error(resp, "mount not found", http.StatusNotFound)
276                 return
277         } else {
278                 vols = []Volume{v}
279         }
280
281         for _, v := range vols {
282                 if err := v.IndexTo(prefix, resp); err != nil {
283                         // We can't send an error message to the
284                         // client because we might have already sent
285                         // headers and index content. All we can do is
286                         // log the error in our own logs, and (in
287                         // cases where headers haven't been sent yet)
288                         // set a 500 status.
289                         //
290                         // If headers have already been sent, the
291                         // client must notice the lack of trailing
292                         // newline as an indication that the response
293                         // is incomplete.
294                         log.Printf("index error from volume %s: %s", v, err)
295                         http.Error(resp, "", http.StatusInternalServerError)
296                         return
297                 }
298         }
299         // An empty line at EOF is the only way the client can be
300         // assured the entire index was received.
301         resp.Write([]byte{'\n'})
302 }
303
304 // MountsHandler responds to "GET /mounts" requests.
305 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
306         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
307         if err != nil {
308                 http.Error(resp, err.Error(), http.StatusInternalServerError)
309         }
310 }
311
312 // PoolStatus struct
313 type PoolStatus struct {
314         Alloc uint64 `json:"BytesAllocatedCumulative"`
315         Cap   int    `json:"BuffersMax"`
316         Len   int    `json:"BuffersInUse"`
317 }
318
319 type volumeStatusEnt struct {
320         Label         string
321         Status        *VolumeStatus `json:",omitempty"`
322         VolumeStats   *ioStats      `json:",omitempty"`
323         InternalStats interface{}   `json:",omitempty"`
324 }
325
326 // NodeStatus struct
327 type NodeStatus struct {
328         Volumes         []*volumeStatusEnt
329         BufferPool      PoolStatus
330         PullQueue       WorkQueueStatus
331         TrashQueue      WorkQueueStatus
332         RequestsCurrent int
333         RequestsMax     int
334         Version         string
335 }
336
337 var st NodeStatus
338 var stLock sync.Mutex
339
340 // DebugHandler addresses /debug.json requests.
341 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
342         type debugStats struct {
343                 MemStats runtime.MemStats
344         }
345         var ds debugStats
346         runtime.ReadMemStats(&ds.MemStats)
347         err := json.NewEncoder(resp).Encode(&ds)
348         if err != nil {
349                 http.Error(resp, err.Error(), 500)
350         }
351 }
352
353 // StatusHandler addresses /status.json requests.
354 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
355         stLock.Lock()
356         rtr.readNodeStatus(&st)
357         jstat, err := json.Marshal(&st)
358         stLock.Unlock()
359         if err == nil {
360                 resp.Write(jstat)
361         } else {
362                 log.Printf("json.Marshal: %s", err)
363                 log.Printf("NodeStatus = %v", &st)
364                 http.Error(resp, err.Error(), 500)
365         }
366 }
367
368 // populate the given NodeStatus struct with current values.
369 func (rtr *router) readNodeStatus(st *NodeStatus) {
370         st.Version = version
371         vols := KeepVM.AllReadable()
372         if cap(st.Volumes) < len(vols) {
373                 st.Volumes = make([]*volumeStatusEnt, len(vols))
374         }
375         st.Volumes = st.Volumes[:0]
376         for _, vol := range vols {
377                 var internalStats interface{}
378                 if vol, ok := vol.(InternalStatser); ok {
379                         internalStats = vol.InternalStats()
380                 }
381                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
382                         Label:         vol.String(),
383                         Status:        vol.Status(),
384                         InternalStats: internalStats,
385                         //VolumeStats: KeepVM.VolumeStats(vol),
386                 })
387         }
388         st.BufferPool.Alloc = bufs.Alloc()
389         st.BufferPool.Cap = bufs.Cap()
390         st.BufferPool.Len = bufs.Len()
391         st.PullQueue = getWorkQueueStatus(pullq)
392         st.TrashQueue = getWorkQueueStatus(trashq)
393         if rtr.limiter != nil {
394                 st.RequestsCurrent = rtr.limiter.Current()
395                 st.RequestsMax = rtr.limiter.Max()
396         }
397 }
398
399 // return a WorkQueueStatus for the given queue. If q is nil (which
400 // should never happen except in test suites), return a zero status
401 // value instead of crashing.
402 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
403         if q == nil {
404                 // This should only happen during tests.
405                 return WorkQueueStatus{}
406         }
407         return q.Status()
408 }
409
410 // DeleteHandler processes DELETE requests.
411 //
412 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
413 // from all connected volumes.
414 //
415 // Only the Data Manager, or an Arvados admin with scope "all", are
416 // allowed to issue DELETE requests.  If a DELETE request is not
417 // authenticated or is issued by a non-admin user, the server returns
418 // a PermissionError.
419 //
420 // Upon receiving a valid request from an authorized user,
421 // DeleteHandler deletes all copies of the specified block on local
422 // writable volumes.
423 //
424 // Response format:
425 //
426 // If the requested blocks was not found on any volume, the response
427 // code is HTTP 404 Not Found.
428 //
429 // Otherwise, the response code is 200 OK, with a response body
430 // consisting of the JSON message
431 //
432 //    {"copies_deleted":d,"copies_failed":f}
433 //
434 // where d and f are integers representing the number of blocks that
435 // were successfully and unsuccessfully deleted.
436 //
437 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
438         hash := mux.Vars(req)["hash"]
439
440         // Confirm that this user is an admin and has a token with unlimited scope.
441         var tok = GetAPIToken(req)
442         if tok == "" || !CanDelete(tok) {
443                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
444                 return
445         }
446
447         if !theConfig.EnableDelete {
448                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
449                 return
450         }
451
452         // Delete copies of this block from all available volumes.
453         // Report how many blocks were successfully deleted, and how
454         // many were found on writable volumes but not deleted.
455         var result struct {
456                 Deleted int `json:"copies_deleted"`
457                 Failed  int `json:"copies_failed"`
458         }
459         for _, vol := range KeepVM.AllWritable() {
460                 if err := vol.Trash(hash); err == nil {
461                         result.Deleted++
462                 } else if os.IsNotExist(err) {
463                         continue
464                 } else {
465                         result.Failed++
466                         log.Println("DeleteHandler:", err)
467                 }
468         }
469
470         var st int
471
472         if result.Deleted == 0 && result.Failed == 0 {
473                 st = http.StatusNotFound
474         } else {
475                 st = http.StatusOK
476         }
477
478         resp.WriteHeader(st)
479
480         if st == http.StatusOK {
481                 if body, err := json.Marshal(result); err == nil {
482                         resp.Write(body)
483                 } else {
484                         log.Printf("json.Marshal: %s (result = %v)", err, result)
485                         http.Error(resp, err.Error(), 500)
486                 }
487         }
488 }
489
490 /* PullHandler processes "PUT /pull" requests for the data manager.
491    The request body is a JSON message containing a list of pull
492    requests in the following format:
493
494    [
495       {
496          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
497          "servers":[
498                         "keep0.qr1hi.arvadosapi.com:25107",
499                         "keep1.qr1hi.arvadosapi.com:25108"
500                  ]
501           },
502           {
503                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
504                  "servers":[
505                         "10.0.1.5:25107",
506                         "10.0.1.6:25107",
507                         "10.0.1.7:25108"
508                  ]
509           },
510           ...
511    ]
512
513    Each pull request in the list consists of a block locator string
514    and an ordered list of servers.  Keepstore should try to fetch the
515    block from each server in turn.
516
517    If the request has not been sent by the Data Manager, return 401
518    Unauthorized.
519
520    If the JSON unmarshalling fails, return 400 Bad Request.
521 */
522
523 // PullRequest consists of a block locator and an ordered list of servers
524 type PullRequest struct {
525         Locator string   `json:"locator"`
526         Servers []string `json:"servers"`
527
528         // Destination mount, or "" for "anywhere"
529         MountUUID string `json:"mount_uuid"`
530 }
531
532 // PullHandler processes "PUT /pull" requests for the data manager.
533 func PullHandler(resp http.ResponseWriter, req *http.Request) {
534         // Reject unauthorized requests.
535         if !IsSystemAuth(GetAPIToken(req)) {
536                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
537                 return
538         }
539
540         // Parse the request body.
541         var pr []PullRequest
542         r := json.NewDecoder(req.Body)
543         if err := r.Decode(&pr); err != nil {
544                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
545                 return
546         }
547
548         // We have a properly formatted pull list sent from the data
549         // manager.  Report success and send the list to the pull list
550         // manager for further handling.
551         resp.WriteHeader(http.StatusOK)
552         resp.Write([]byte(
553                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
554
555         plist := list.New()
556         for _, p := range pr {
557                 plist.PushBack(p)
558         }
559         pullq.ReplaceQueue(plist)
560 }
561
562 // TrashRequest consists of a block locator and its Mtime
563 type TrashRequest struct {
564         Locator    string `json:"locator"`
565         BlockMtime int64  `json:"block_mtime"`
566
567         // Target mount, or "" for "everywhere"
568         MountUUID string `json:"mount_uuid"`
569 }
570
571 // TrashHandler processes /trash requests.
572 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
573         // Reject unauthorized requests.
574         if !IsSystemAuth(GetAPIToken(req)) {
575                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
576                 return
577         }
578
579         // Parse the request body.
580         var trash []TrashRequest
581         r := json.NewDecoder(req.Body)
582         if err := r.Decode(&trash); err != nil {
583                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
584                 return
585         }
586
587         // We have a properly formatted trash list sent from the data
588         // manager.  Report success and send the list to the trash work
589         // queue for further handling.
590         resp.WriteHeader(http.StatusOK)
591         resp.Write([]byte(
592                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
593
594         tlist := list.New()
595         for _, t := range trash {
596                 tlist.PushBack(t)
597         }
598         trashq.ReplaceQueue(tlist)
599 }
600
601 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
602 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
603         // Reject unauthorized requests.
604         if !IsSystemAuth(GetAPIToken(req)) {
605                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
606                 return
607         }
608
609         hash := mux.Vars(req)["hash"]
610
611         if len(KeepVM.AllWritable()) == 0 {
612                 http.Error(resp, "No writable volumes", http.StatusNotFound)
613                 return
614         }
615
616         var untrashedOn, failedOn []string
617         var numNotFound int
618         for _, vol := range KeepVM.AllWritable() {
619                 err := vol.Untrash(hash)
620
621                 if os.IsNotExist(err) {
622                         numNotFound++
623                 } else if err != nil {
624                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
625                         failedOn = append(failedOn, vol.String())
626                 } else {
627                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
628                         untrashedOn = append(untrashedOn, vol.String())
629                 }
630         }
631
632         if numNotFound == len(KeepVM.AllWritable()) {
633                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
634                 return
635         }
636
637         if len(failedOn) == len(KeepVM.AllWritable()) {
638                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
639         } else {
640                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
641                 if len(failedOn) > 0 {
642                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
643                 }
644                 resp.Write([]byte(respBody))
645         }
646 }
647
648 // GetBlock and PutBlock implement lower-level code for handling
649 // blocks by rooting through volumes connected to the local machine.
650 // Once the handler has determined that system policy permits the
651 // request, it calls these methods to perform the actual operation.
652 //
653 // TODO(twp): this code would probably be better located in the
654 // VolumeManager interface. As an abstraction, the VolumeManager
655 // should be the only part of the code that cares about which volume a
656 // block is stored on, so it should be responsible for figuring out
657 // which volume to check for fetching blocks, storing blocks, etc.
658
659 // GetBlock fetches the block identified by "hash" into the provided
660 // buf, and returns the data size.
661 //
662 // If the block cannot be found on any volume, returns NotFoundError.
663 //
664 // If the block found does not have the correct MD5 hash, returns
665 // DiskHashError.
666 //
667 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
668         // Attempt to read the requested hash from a keep volume.
669         errorToCaller := NotFoundError
670
671         for _, vol := range KeepVM.AllReadable() {
672                 size, err := vol.Get(ctx, hash, buf)
673                 select {
674                 case <-ctx.Done():
675                         return 0, ErrClientDisconnect
676                 default:
677                 }
678                 if err != nil {
679                         // IsNotExist is an expected error and may be
680                         // ignored. All other errors are logged. In
681                         // any case we continue trying to read other
682                         // volumes. If all volumes report IsNotExist,
683                         // we return a NotFoundError.
684                         if !os.IsNotExist(err) {
685                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
686                         }
687                         // If some volume returns a transient error, return it to the caller
688                         // instead of "Not found" so it can retry.
689                         if err == VolumeBusyError {
690                                 errorToCaller = err.(*KeepError)
691                         }
692                         continue
693                 }
694                 // Check the file checksum.
695                 //
696                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
697                 if filehash != hash {
698                         // TODO: Try harder to tell a sysadmin about
699                         // this.
700                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
701                                 vol, hash, filehash)
702                         errorToCaller = DiskHashError
703                         continue
704                 }
705                 if errorToCaller == DiskHashError {
706                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
707                                 vol, hash)
708                 }
709                 return size, nil
710         }
711         return 0, errorToCaller
712 }
713
714 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
715 //
716 // PutBlock(ctx, block, hash)
717 //   Stores the BLOCK (identified by the content id HASH) in Keep.
718 //
719 //   The MD5 checksum of the block must be identical to the content id HASH.
720 //   If not, an error is returned.
721 //
722 //   PutBlock stores the BLOCK on the first Keep volume with free space.
723 //   A failure code is returned to the user only if all volumes fail.
724 //
725 //   On success, PutBlock returns nil.
726 //   On failure, it returns a KeepError with one of the following codes:
727 //
728 //   500 Collision
729 //          A different block with the same hash already exists on this
730 //          Keep server.
731 //   422 MD5Fail
732 //          The MD5 hash of the BLOCK does not match the argument HASH.
733 //   503 Full
734 //          There was not enough space left in any Keep volume to store
735 //          the object.
736 //   500 Fail
737 //          The object could not be stored for some other reason (e.g.
738 //          all writes failed). The text of the error message should
739 //          provide as much detail as possible.
740 //
741 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
742         // Check that BLOCK's checksum matches HASH.
743         blockhash := fmt.Sprintf("%x", md5.Sum(block))
744         if blockhash != hash {
745                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
746                 return 0, RequestHashError
747         }
748
749         // If we already have this data, it's intact on disk, and we
750         // can update its timestamp, return success. If we have
751         // different data with the same hash, return failure.
752         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
753                 return n, err
754         } else if ctx.Err() != nil {
755                 return 0, ErrClientDisconnect
756         }
757
758         // Choose a Keep volume to write to.
759         // If this volume fails, try all of the volumes in order.
760         if vol := KeepVM.NextWritable(); vol != nil {
761                 if err := vol.Put(ctx, hash, block); err == nil {
762                         return vol.Replication(), nil // success!
763                 }
764                 if ctx.Err() != nil {
765                         return 0, ErrClientDisconnect
766                 }
767         }
768
769         writables := KeepVM.AllWritable()
770         if len(writables) == 0 {
771                 log.Print("No writable volumes.")
772                 return 0, FullError
773         }
774
775         allFull := true
776         for _, vol := range writables {
777                 err := vol.Put(ctx, hash, block)
778                 if ctx.Err() != nil {
779                         return 0, ErrClientDisconnect
780                 }
781                 if err == nil {
782                         return vol.Replication(), nil // success!
783                 }
784                 if err != FullError {
785                         // The volume is not full but the
786                         // write did not succeed.  Report the
787                         // error and continue trying.
788                         allFull = false
789                         log.Printf("%s: Write(%s): %s", vol, hash, err)
790                 }
791         }
792
793         if allFull {
794                 log.Print("All volumes are full.")
795                 return 0, FullError
796         }
797         // Already logged the non-full errors.
798         return 0, GenericError
799 }
800
801 // CompareAndTouch returns the current replication level if one of the
802 // volumes already has the given content and it successfully updates
803 // the relevant block's modification time in order to protect it from
804 // premature garbage collection. Otherwise, it returns a non-nil
805 // error.
806 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
807         var bestErr error = NotFoundError
808         for _, vol := range KeepVM.AllWritable() {
809                 err := vol.Compare(ctx, hash, buf)
810                 if ctx.Err() != nil {
811                         return 0, ctx.Err()
812                 } else if err == CollisionError {
813                         // Stop if we have a block with same hash but
814                         // different content. (It will be impossible
815                         // to tell which one is wanted if we have
816                         // both, so there's no point writing it even
817                         // on a different volume.)
818                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
819                         return 0, err
820                 } else if os.IsNotExist(err) {
821                         // Block does not exist. This is the only
822                         // "normal" error: we don't log anything.
823                         continue
824                 } else if err != nil {
825                         // Couldn't open file, data is corrupt on
826                         // disk, etc.: log this abnormal condition,
827                         // and try the next volume.
828                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
829                         continue
830                 }
831                 if err := vol.Touch(hash); err != nil {
832                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
833                         bestErr = err
834                         continue
835                 }
836                 // Compare and Touch both worked --> done.
837                 return vol.Replication(), nil
838         }
839         return 0, bestErr
840 }
841
842 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
843
844 // IsValidLocator returns true if the specified string is a valid Keep locator.
845 //   When Keep is extended to support hash types other than MD5,
846 //   this should be updated to cover those as well.
847 //
848 func IsValidLocator(loc string) bool {
849         return validLocatorRe.MatchString(loc)
850 }
851
852 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
853
854 // GetAPIToken returns the OAuth2 token from the Authorization
855 // header of a HTTP request, or an empty string if no matching
856 // token is found.
857 func GetAPIToken(req *http.Request) string {
858         if auth, ok := req.Header["Authorization"]; ok {
859                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
860                         return match[2]
861                 }
862         }
863         return ""
864 }
865
866 // IsExpired returns true if the given Unix timestamp (expressed as a
867 // hexadecimal string) is in the past, or if timestampHex cannot be
868 // parsed as a hexadecimal string.
869 func IsExpired(timestampHex string) bool {
870         ts, err := strconv.ParseInt(timestampHex, 16, 0)
871         if err != nil {
872                 log.Printf("IsExpired: %s", err)
873                 return true
874         }
875         return time.Unix(ts, 0).Before(time.Now())
876 }
877
878 // CanDelete returns true if the user identified by apiToken is
879 // allowed to delete blocks.
880 func CanDelete(apiToken string) bool {
881         if apiToken == "" {
882                 return false
883         }
884         // Blocks may be deleted only when Keep has been configured with a
885         // data manager.
886         if IsSystemAuth(apiToken) {
887                 return true
888         }
889         // TODO(twp): look up apiToken with the API server
890         // return true if is_admin is true and if the token
891         // has unlimited scope
892         return false
893 }
894
895 // IsSystemAuth returns true if the given token is allowed to perform
896 // system level actions like deleting data.
897 func IsSystemAuth(token string) bool {
898         return token != "" && token == theConfig.systemAuthToken
899 }