15521: Include request ID etc. in PutBlock log messages.
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "log"
15         "net/http"
16         "os"
17         "regexp"
18         "runtime"
19         "strconv"
20         "strings"
21         "sync"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
26         "git.curoverse.com/arvados.git/sdk/go/health"
27         "git.curoverse.com/arvados.git/sdk/go/httpserver"
28         "github.com/gorilla/mux"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31 )
32
33 type router struct {
34         *mux.Router
35         cluster     *arvados.Cluster
36         logger      logrus.FieldLogger
37         remoteProxy remoteProxy
38         metrics     *nodeMetrics
39         volmgr      *RRVolumeManager
40         pullq       *WorkQueue
41         trashq      *WorkQueue
42 }
43
44 // MakeRESTRouter returns a new router that forwards all Keep requests
45 // to the appropriate handlers.
46 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
47         rtr := &router{
48                 Router:  mux.NewRouter(),
49                 cluster: cluster,
50                 logger:  ctxlog.FromContext(ctx),
51                 metrics: &nodeMetrics{reg: reg},
52                 volmgr:  volmgr,
53                 pullq:   pullq,
54                 trashq:  trashq,
55         }
56
57         rtr.HandleFunc(
58                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
59         rtr.HandleFunc(
60                 `/{hash:[0-9a-f]{32}}+{hints}`,
61                 rtr.handleGET).Methods("GET", "HEAD")
62
63         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
64         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
65         // List all blocks stored here. Privileged client only.
66         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
67         // List blocks stored here whose hash has the given prefix.
68         // Privileged client only.
69         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
70
71         // Internals/debugging info (runtime.MemStats)
72         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
73
74         // List volumes: path, device number, bytes used/avail.
75         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
76
77         // List mounts: UUID, readonly, tier, device ID, ...
78         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
79         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
80         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
81
82         // Replace the current pull queue.
83         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
84
85         // Replace the current trash queue.
86         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
87
88         // Untrash moves blocks from trash back into store
89         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
90
91         rtr.Handle("/_health/{check}", &health.Handler{
92                 Token:  cluster.ManagementToken,
93                 Prefix: "/_health/",
94         }).Methods("GET")
95
96         // Any request which does not match any of these routes gets
97         // 400 Bad Request.
98         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
99
100         rtr.metrics.setupBufferPoolMetrics(bufs)
101         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
102         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
103
104         return rtr
105 }
106
107 // BadRequestHandler is a HandleFunc to address bad requests.
108 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
109         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
110 }
111
112 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
113         ctx, cancel := contextForResponse(context.TODO(), resp)
114         defer cancel()
115
116         locator := req.URL.Path[1:]
117         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
119                 return
120         }
121
122         if rtr.cluster.Collections.BlobSigning {
123                 locator := req.URL.Path[1:] // strip leading slash
124                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
126                         return
127                 }
128         }
129
130         // TODO: Probe volumes to check whether the block _might_
131         // exist. Some volumes/types could support a quick existence
132         // check without causing other operations to suffer. If all
133         // volumes support that, and assure us the block definitely
134         // isn't here, we can return 404 now instead of waiting for a
135         // buffer.
136
137         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
138         if err != nil {
139                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
140                 return
141         }
142         defer bufs.Put(buf)
143
144         size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
145         if err != nil {
146                 code := http.StatusInternalServerError
147                 if err, ok := err.(*KeepError); ok {
148                         code = err.HTTPCode
149                 }
150                 http.Error(resp, err.Error(), code)
151                 return
152         }
153
154         resp.Header().Set("Content-Length", strconv.Itoa(size))
155         resp.Header().Set("Content-Type", "application/octet-stream")
156         resp.Write(buf[:size])
157 }
158
159 // Return a new context that gets cancelled by resp's CloseNotifier.
160 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
161         ctx, cancel := context.WithCancel(parent)
162         if cn, ok := resp.(http.CloseNotifier); ok {
163                 go func(c <-chan bool) {
164                         select {
165                         case <-c:
166                                 cancel()
167                         case <-ctx.Done():
168                         }
169                 }(cn.CloseNotify())
170         }
171         return ctx, cancel
172 }
173
174 // Get a buffer from the pool -- but give up and return a non-nil
175 // error if ctx ends before we get a buffer.
176 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
177         bufReady := make(chan []byte)
178         go func() {
179                 bufReady <- bufs.Get(bufSize)
180         }()
181         select {
182         case buf := <-bufReady:
183                 return buf, nil
184         case <-ctx.Done():
185                 go func() {
186                         // Even if closeNotifier happened first, we
187                         // need to keep waiting for our buf so we can
188                         // return it to the pool.
189                         bufs.Put(<-bufReady)
190                 }()
191                 return nil, ErrClientDisconnect
192         }
193 }
194
195 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
196         ctx, cancel := contextForResponse(context.TODO(), resp)
197         defer cancel()
198
199         hash := mux.Vars(req)["hash"]
200
201         // Detect as many error conditions as possible before reading
202         // the body: avoid transmitting data that will not end up
203         // being written anyway.
204
205         if req.ContentLength == -1 {
206                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
207                 return
208         }
209
210         if req.ContentLength > BlockSize {
211                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
212                 return
213         }
214
215         if len(rtr.volmgr.AllWritable()) == 0 {
216                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
217                 return
218         }
219
220         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
221         if err != nil {
222                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
223                 return
224         }
225
226         _, err = io.ReadFull(req.Body, buf)
227         if err != nil {
228                 http.Error(resp, err.Error(), 500)
229                 bufs.Put(buf)
230                 return
231         }
232
233         replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
234         bufs.Put(buf)
235
236         if err != nil {
237                 code := http.StatusInternalServerError
238                 if err, ok := err.(*KeepError); ok {
239                         code = err.HTTPCode
240                 }
241                 http.Error(resp, err.Error(), code)
242                 return
243         }
244
245         // Success; add a size hint, sign the locator if possible, and
246         // return it to the client.
247         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
248         apiToken := GetAPIToken(req)
249         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
250                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
251                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
252         }
253         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
254         resp.Write([]byte(returnHash + "\n"))
255 }
256
257 // IndexHandler responds to "/index", "/index/{prefix}", and
258 // "/mounts/{uuid}/blocks" requests.
259 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
260         if !rtr.isSystemAuth(GetAPIToken(req)) {
261                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
262                 return
263         }
264
265         prefix := mux.Vars(req)["prefix"]
266         if prefix == "" {
267                 req.ParseForm()
268                 prefix = req.Form.Get("prefix")
269         }
270
271         uuid := mux.Vars(req)["uuid"]
272
273         var vols []*VolumeMount
274         if uuid == "" {
275                 vols = rtr.volmgr.AllReadable()
276         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
277                 http.Error(resp, "mount not found", http.StatusNotFound)
278                 return
279         } else {
280                 vols = []*VolumeMount{mnt}
281         }
282
283         for _, v := range vols {
284                 if err := v.IndexTo(prefix, resp); err != nil {
285                         // We can't send an error status/message to
286                         // the client because IndexTo() might have
287                         // already written body content. All we can do
288                         // is log the error in our own logs.
289                         //
290                         // The client must notice the lack of trailing
291                         // newline as an indication that the response
292                         // is incomplete.
293                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
294                         return
295                 }
296         }
297         // An empty line at EOF is the only way the client can be
298         // assured the entire index was received.
299         resp.Write([]byte{'\n'})
300 }
301
302 // MountsHandler responds to "GET /mounts" requests.
303 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
304         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
305         if err != nil {
306                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
307         }
308 }
309
310 // PoolStatus struct
311 type PoolStatus struct {
312         Alloc uint64 `json:"BytesAllocatedCumulative"`
313         Cap   int    `json:"BuffersMax"`
314         Len   int    `json:"BuffersInUse"`
315 }
316
317 type volumeStatusEnt struct {
318         Label         string
319         Status        *VolumeStatus `json:",omitempty"`
320         VolumeStats   *ioStats      `json:",omitempty"`
321         InternalStats interface{}   `json:",omitempty"`
322 }
323
324 // NodeStatus struct
325 type NodeStatus struct {
326         Volumes         []*volumeStatusEnt
327         BufferPool      PoolStatus
328         PullQueue       WorkQueueStatus
329         TrashQueue      WorkQueueStatus
330         RequestsCurrent int
331         RequestsMax     int
332         Version         string
333 }
334
335 var st NodeStatus
336 var stLock sync.Mutex
337
338 // DebugHandler addresses /debug.json requests.
339 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
340         type debugStats struct {
341                 MemStats runtime.MemStats
342         }
343         var ds debugStats
344         runtime.ReadMemStats(&ds.MemStats)
345         err := json.NewEncoder(resp).Encode(&ds)
346         if err != nil {
347                 http.Error(resp, err.Error(), 500)
348         }
349 }
350
351 // StatusHandler addresses /status.json requests.
352 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
353         stLock.Lock()
354         rtr.readNodeStatus(&st)
355         jstat, err := json.Marshal(&st)
356         stLock.Unlock()
357         if err == nil {
358                 resp.Write(jstat)
359         } else {
360                 log.Printf("json.Marshal: %s", err)
361                 log.Printf("NodeStatus = %v", &st)
362                 http.Error(resp, err.Error(), 500)
363         }
364 }
365
366 // populate the given NodeStatus struct with current values.
367 func (rtr *router) readNodeStatus(st *NodeStatus) {
368         st.Version = version
369         vols := rtr.volmgr.AllReadable()
370         if cap(st.Volumes) < len(vols) {
371                 st.Volumes = make([]*volumeStatusEnt, len(vols))
372         }
373         st.Volumes = st.Volumes[:0]
374         for _, vol := range vols {
375                 var internalStats interface{}
376                 if vol, ok := vol.Volume.(InternalStatser); ok {
377                         internalStats = vol.InternalStats()
378                 }
379                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
380                         Label:         vol.String(),
381                         Status:        vol.Status(),
382                         InternalStats: internalStats,
383                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
384                 })
385         }
386         st.BufferPool.Alloc = bufs.Alloc()
387         st.BufferPool.Cap = bufs.Cap()
388         st.BufferPool.Len = bufs.Len()
389         st.PullQueue = getWorkQueueStatus(rtr.pullq)
390         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
391 }
392
393 // return a WorkQueueStatus for the given queue. If q is nil (which
394 // should never happen except in test suites), return a zero status
395 // value instead of crashing.
396 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
397         if q == nil {
398                 // This should only happen during tests.
399                 return WorkQueueStatus{}
400         }
401         return q.Status()
402 }
403
404 // handleDELETE processes DELETE requests.
405 //
406 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
407 // from all connected volumes.
408 //
409 // Only the Data Manager, or an Arvados admin with scope "all", are
410 // allowed to issue DELETE requests.  If a DELETE request is not
411 // authenticated or is issued by a non-admin user, the server returns
412 // a PermissionError.
413 //
414 // Upon receiving a valid request from an authorized user,
415 // handleDELETE deletes all copies of the specified block on local
416 // writable volumes.
417 //
418 // Response format:
419 //
420 // If the requested blocks was not found on any volume, the response
421 // code is HTTP 404 Not Found.
422 //
423 // Otherwise, the response code is 200 OK, with a response body
424 // consisting of the JSON message
425 //
426 //    {"copies_deleted":d,"copies_failed":f}
427 //
428 // where d and f are integers representing the number of blocks that
429 // were successfully and unsuccessfully deleted.
430 //
431 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
432         hash := mux.Vars(req)["hash"]
433
434         // Confirm that this user is an admin and has a token with unlimited scope.
435         var tok = GetAPIToken(req)
436         if tok == "" || !rtr.canDelete(tok) {
437                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
438                 return
439         }
440
441         if !rtr.cluster.Collections.BlobTrash {
442                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
443                 return
444         }
445
446         // Delete copies of this block from all available volumes.
447         // Report how many blocks were successfully deleted, and how
448         // many were found on writable volumes but not deleted.
449         var result struct {
450                 Deleted int `json:"copies_deleted"`
451                 Failed  int `json:"copies_failed"`
452         }
453         for _, vol := range rtr.volmgr.AllWritable() {
454                 if err := vol.Trash(hash); err == nil {
455                         result.Deleted++
456                 } else if os.IsNotExist(err) {
457                         continue
458                 } else {
459                         result.Failed++
460                         log.Println("DeleteHandler:", err)
461                 }
462         }
463
464         var st int
465
466         if result.Deleted == 0 && result.Failed == 0 {
467                 st = http.StatusNotFound
468         } else {
469                 st = http.StatusOK
470         }
471
472         resp.WriteHeader(st)
473
474         if st == http.StatusOK {
475                 if body, err := json.Marshal(result); err == nil {
476                         resp.Write(body)
477                 } else {
478                         log.Printf("json.Marshal: %s (result = %v)", err, result)
479                         http.Error(resp, err.Error(), 500)
480                 }
481         }
482 }
483
484 /* PullHandler processes "PUT /pull" requests for the data manager.
485    The request body is a JSON message containing a list of pull
486    requests in the following format:
487
488    [
489       {
490          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
491          "servers":[
492                         "keep0.qr1hi.arvadosapi.com:25107",
493                         "keep1.qr1hi.arvadosapi.com:25108"
494                  ]
495           },
496           {
497                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
498                  "servers":[
499                         "10.0.1.5:25107",
500                         "10.0.1.6:25107",
501                         "10.0.1.7:25108"
502                  ]
503           },
504           ...
505    ]
506
507    Each pull request in the list consists of a block locator string
508    and an ordered list of servers.  Keepstore should try to fetch the
509    block from each server in turn.
510
511    If the request has not been sent by the Data Manager, return 401
512    Unauthorized.
513
514    If the JSON unmarshalling fails, return 400 Bad Request.
515 */
516
517 // PullRequest consists of a block locator and an ordered list of servers
518 type PullRequest struct {
519         Locator string   `json:"locator"`
520         Servers []string `json:"servers"`
521
522         // Destination mount, or "" for "anywhere"
523         MountUUID string `json:"mount_uuid"`
524 }
525
526 // PullHandler processes "PUT /pull" requests for the data manager.
527 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
528         // Reject unauthorized requests.
529         if !rtr.isSystemAuth(GetAPIToken(req)) {
530                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
531                 return
532         }
533
534         // Parse the request body.
535         var pr []PullRequest
536         r := json.NewDecoder(req.Body)
537         if err := r.Decode(&pr); err != nil {
538                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
539                 return
540         }
541
542         // We have a properly formatted pull list sent from the data
543         // manager.  Report success and send the list to the pull list
544         // manager for further handling.
545         resp.WriteHeader(http.StatusOK)
546         resp.Write([]byte(
547                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
548
549         plist := list.New()
550         for _, p := range pr {
551                 plist.PushBack(p)
552         }
553         rtr.pullq.ReplaceQueue(plist)
554 }
555
556 // TrashRequest consists of a block locator and its Mtime
557 type TrashRequest struct {
558         Locator    string `json:"locator"`
559         BlockMtime int64  `json:"block_mtime"`
560
561         // Target mount, or "" for "everywhere"
562         MountUUID string `json:"mount_uuid"`
563 }
564
565 // TrashHandler processes /trash requests.
566 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
567         // Reject unauthorized requests.
568         if !rtr.isSystemAuth(GetAPIToken(req)) {
569                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
570                 return
571         }
572
573         // Parse the request body.
574         var trash []TrashRequest
575         r := json.NewDecoder(req.Body)
576         if err := r.Decode(&trash); err != nil {
577                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
578                 return
579         }
580
581         // We have a properly formatted trash list sent from the data
582         // manager.  Report success and send the list to the trash work
583         // queue for further handling.
584         resp.WriteHeader(http.StatusOK)
585         resp.Write([]byte(
586                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
587
588         tlist := list.New()
589         for _, t := range trash {
590                 tlist.PushBack(t)
591         }
592         rtr.trashq.ReplaceQueue(tlist)
593 }
594
595 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
596 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
597         // Reject unauthorized requests.
598         if !rtr.isSystemAuth(GetAPIToken(req)) {
599                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
600                 return
601         }
602
603         hash := mux.Vars(req)["hash"]
604
605         if len(rtr.volmgr.AllWritable()) == 0 {
606                 http.Error(resp, "No writable volumes", http.StatusNotFound)
607                 return
608         }
609
610         var untrashedOn, failedOn []string
611         var numNotFound int
612         for _, vol := range rtr.volmgr.AllWritable() {
613                 err := vol.Untrash(hash)
614
615                 if os.IsNotExist(err) {
616                         numNotFound++
617                 } else if err != nil {
618                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
619                         failedOn = append(failedOn, vol.String())
620                 } else {
621                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
622                         untrashedOn = append(untrashedOn, vol.String())
623                 }
624         }
625
626         if numNotFound == len(rtr.volmgr.AllWritable()) {
627                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
628                 return
629         }
630
631         if len(failedOn) == len(rtr.volmgr.AllWritable()) {
632                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
633         } else {
634                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
635                 if len(failedOn) > 0 {
636                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
637                 }
638                 resp.Write([]byte(respBody))
639         }
640 }
641
642 // GetBlock and PutBlock implement lower-level code for handling
643 // blocks by rooting through volumes connected to the local machine.
644 // Once the handler has determined that system policy permits the
645 // request, it calls these methods to perform the actual operation.
646 //
647 // TODO(twp): this code would probably be better located in the
648 // VolumeManager interface. As an abstraction, the VolumeManager
649 // should be the only part of the code that cares about which volume a
650 // block is stored on, so it should be responsible for figuring out
651 // which volume to check for fetching blocks, storing blocks, etc.
652
653 // GetBlock fetches the block identified by "hash" into the provided
654 // buf, and returns the data size.
655 //
656 // If the block cannot be found on any volume, returns NotFoundError.
657 //
658 // If the block found does not have the correct MD5 hash, returns
659 // DiskHashError.
660 //
661 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
662         // Attempt to read the requested hash from a keep volume.
663         errorToCaller := NotFoundError
664
665         for _, vol := range volmgr.AllReadable() {
666                 size, err := vol.Get(ctx, hash, buf)
667                 select {
668                 case <-ctx.Done():
669                         return 0, ErrClientDisconnect
670                 default:
671                 }
672                 if err != nil {
673                         // IsNotExist is an expected error and may be
674                         // ignored. All other errors are logged. In
675                         // any case we continue trying to read other
676                         // volumes. If all volumes report IsNotExist,
677                         // we return a NotFoundError.
678                         if !os.IsNotExist(err) {
679                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
680                         }
681                         // If some volume returns a transient error, return it to the caller
682                         // instead of "Not found" so it can retry.
683                         if err == VolumeBusyError {
684                                 errorToCaller = err.(*KeepError)
685                         }
686                         continue
687                 }
688                 // Check the file checksum.
689                 //
690                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
691                 if filehash != hash {
692                         // TODO: Try harder to tell a sysadmin about
693                         // this.
694                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
695                                 vol, hash, filehash)
696                         errorToCaller = DiskHashError
697                         continue
698                 }
699                 if errorToCaller == DiskHashError {
700                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
701                                 vol, hash)
702                 }
703                 return size, nil
704         }
705         return 0, errorToCaller
706 }
707
708 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
709 //
710 // PutBlock(ctx, block, hash)
711 //   Stores the BLOCK (identified by the content id HASH) in Keep.
712 //
713 //   The MD5 checksum of the block must be identical to the content id HASH.
714 //   If not, an error is returned.
715 //
716 //   PutBlock stores the BLOCK on the first Keep volume with free space.
717 //   A failure code is returned to the user only if all volumes fail.
718 //
719 //   On success, PutBlock returns nil.
720 //   On failure, it returns a KeepError with one of the following codes:
721 //
722 //   500 Collision
723 //          A different block with the same hash already exists on this
724 //          Keep server.
725 //   422 MD5Fail
726 //          The MD5 hash of the BLOCK does not match the argument HASH.
727 //   503 Full
728 //          There was not enough space left in any Keep volume to store
729 //          the object.
730 //   500 Fail
731 //          The object could not be stored for some other reason (e.g.
732 //          all writes failed). The text of the error message should
733 //          provide as much detail as possible.
734 //
735 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
736         log := ctxlog.FromContext(ctx)
737
738         // Check that BLOCK's checksum matches HASH.
739         blockhash := fmt.Sprintf("%x", md5.Sum(block))
740         if blockhash != hash {
741                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
742                 return 0, RequestHashError
743         }
744
745         // If we already have this data, it's intact on disk, and we
746         // can update its timestamp, return success. If we have
747         // different data with the same hash, return failure.
748         if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
749                 return n, err
750         } else if ctx.Err() != nil {
751                 return 0, ErrClientDisconnect
752         }
753
754         // Choose a Keep volume to write to.
755         // If this volume fails, try all of the volumes in order.
756         if mnt := volmgr.NextWritable(); mnt != nil {
757                 if err := mnt.Put(ctx, hash, block); err == nil {
758                         return mnt.Replication, nil // success!
759                 }
760                 if ctx.Err() != nil {
761                         return 0, ErrClientDisconnect
762                 }
763         }
764
765         writables := volmgr.AllWritable()
766         if len(writables) == 0 {
767                 log.Error("no writable volumes")
768                 return 0, FullError
769         }
770
771         allFull := true
772         for _, vol := range writables {
773                 err := vol.Put(ctx, hash, block)
774                 if ctx.Err() != nil {
775                         return 0, ErrClientDisconnect
776                 }
777                 if err == nil {
778                         return vol.Replication, nil // success!
779                 }
780                 if err != FullError {
781                         // The volume is not full but the
782                         // write did not succeed.  Report the
783                         // error and continue trying.
784                         allFull = false
785                         log.Errorf("%s: Write(%s): %s", vol, hash, err)
786                 }
787         }
788
789         if allFull {
790                 log.Error("all volumes are full")
791                 return 0, FullError
792         }
793         // Already logged the non-full errors.
794         return 0, GenericError
795 }
796
797 // CompareAndTouch returns the current replication level if one of the
798 // volumes already has the given content and it successfully updates
799 // the relevant block's modification time in order to protect it from
800 // premature garbage collection. Otherwise, it returns a non-nil
801 // error.
802 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
803         var bestErr error = NotFoundError
804         for _, mnt := range volmgr.AllWritable() {
805                 err := mnt.Compare(ctx, hash, buf)
806                 if ctx.Err() != nil {
807                         return 0, ctx.Err()
808                 } else if err == CollisionError {
809                         // Stop if we have a block with same hash but
810                         // different content. (It will be impossible
811                         // to tell which one is wanted if we have
812                         // both, so there's no point writing it even
813                         // on a different volume.)
814                         log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
815                         return 0, err
816                 } else if os.IsNotExist(err) {
817                         // Block does not exist. This is the only
818                         // "normal" error: we don't log anything.
819                         continue
820                 } else if err != nil {
821                         // Couldn't open file, data is corrupt on
822                         // disk, etc.: log this abnormal condition,
823                         // and try the next volume.
824                         log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
825                         continue
826                 }
827                 if err := mnt.Touch(hash); err != nil {
828                         log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
829                         bestErr = err
830                         continue
831                 }
832                 // Compare and Touch both worked --> done.
833                 return mnt.Replication, nil
834         }
835         return 0, bestErr
836 }
837
838 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
839
840 // IsValidLocator returns true if the specified string is a valid Keep locator.
841 //   When Keep is extended to support hash types other than MD5,
842 //   this should be updated to cover those as well.
843 //
844 func IsValidLocator(loc string) bool {
845         return validLocatorRe.MatchString(loc)
846 }
847
848 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
849
850 // GetAPIToken returns the OAuth2 token from the Authorization
851 // header of a HTTP request, or an empty string if no matching
852 // token is found.
853 func GetAPIToken(req *http.Request) string {
854         if auth, ok := req.Header["Authorization"]; ok {
855                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
856                         return match[2]
857                 }
858         }
859         return ""
860 }
861
862 // IsExpired returns true if the given Unix timestamp (expressed as a
863 // hexadecimal string) is in the past, or if timestampHex cannot be
864 // parsed as a hexadecimal string.
865 func IsExpired(timestampHex string) bool {
866         ts, err := strconv.ParseInt(timestampHex, 16, 0)
867         if err != nil {
868                 log.Printf("IsExpired: %s", err)
869                 return true
870         }
871         return time.Unix(ts, 0).Before(time.Now())
872 }
873
874 // canDelete returns true if the user identified by apiToken is
875 // allowed to delete blocks.
876 func (rtr *router) canDelete(apiToken string) bool {
877         if apiToken == "" {
878                 return false
879         }
880         // Blocks may be deleted only when Keep has been configured with a
881         // data manager.
882         if rtr.isSystemAuth(apiToken) {
883                 return true
884         }
885         // TODO(twp): look up apiToken with the API server
886         // return true if is_admin is true and if the token
887         // has unlimited scope
888         return false
889 }
890
891 // isSystemAuth returns true if the given token is allowed to perform
892 // system level actions like deleting data.
893 func (rtr *router) isSystemAuth(token string) bool {
894         return token != "" && token == rtr.cluster.SystemRootToken
895 }