Merge branch 'patch-1' of https://github.com/mr-c/arvados into mr-c-patch-1
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "git.arvados.org/arvados.git/sdk/go/health"
26         "git.arvados.org/arvados.git/sdk/go/httpserver"
27         "github.com/gorilla/mux"
28         "github.com/prometheus/client_golang/prometheus"
29         "github.com/sirupsen/logrus"
30 )
31
32 type router struct {
33         *mux.Router
34         cluster     *arvados.Cluster
35         logger      logrus.FieldLogger
36         remoteProxy remoteProxy
37         metrics     *nodeMetrics
38         volmgr      *RRVolumeManager
39         pullq       *WorkQueue
40         trashq      *WorkQueue
41 }
42
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
46         rtr := &router{
47                 Router:  mux.NewRouter(),
48                 cluster: cluster,
49                 logger:  ctxlog.FromContext(ctx),
50                 metrics: &nodeMetrics{reg: reg},
51                 volmgr:  volmgr,
52                 pullq:   pullq,
53                 trashq:  trashq,
54         }
55
56         rtr.HandleFunc(
57                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
58         rtr.HandleFunc(
59                 `/{hash:[0-9a-f]{32}}+{hints}`,
60                 rtr.handleGET).Methods("GET", "HEAD")
61
62         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64         // List all blocks stored here. Privileged client only.
65         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66         // List blocks stored here whose hash has the given prefix.
67         // Privileged client only.
68         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
69         // Update timestamp on existing block. Privileged client only.
70         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
71
72         // Internals/debugging info (runtime.MemStats)
73         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
74
75         // List volumes: path, device number, bytes used/avail.
76         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
77
78         // List mounts: UUID, readonly, tier, device ID, ...
79         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
80         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
81         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
82
83         // Replace the current pull queue.
84         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
85
86         // Replace the current trash queue.
87         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
88
89         // Untrash moves blocks from trash back into store
90         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
91
92         rtr.Handle("/_health/{check}", &health.Handler{
93                 Token:  cluster.ManagementToken,
94                 Prefix: "/_health/",
95         }).Methods("GET")
96
97         // Any request which does not match any of these routes gets
98         // 400 Bad Request.
99         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
100
101         rtr.metrics.setupBufferPoolMetrics(bufs)
102         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
103         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
104
105         return rtr
106 }
107
108 // BadRequestHandler is a HandleFunc to address bad requests.
109 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
110         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
111 }
112
113 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
114         ctx, cancel := contextForResponse(context.TODO(), resp)
115         defer cancel()
116
117         locator := req.URL.Path[1:]
118         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
119                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
120                 return
121         }
122
123         if rtr.cluster.Collections.BlobSigning {
124                 locator := req.URL.Path[1:] // strip leading slash
125                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
126                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
127                         return
128                 }
129         }
130
131         // TODO: Probe volumes to check whether the block _might_
132         // exist. Some volumes/types could support a quick existence
133         // check without causing other operations to suffer. If all
134         // volumes support that, and assure us the block definitely
135         // isn't here, we can return 404 now instead of waiting for a
136         // buffer.
137
138         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
139         if err != nil {
140                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
141                 return
142         }
143         defer bufs.Put(buf)
144
145         size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
146         if err != nil {
147                 code := http.StatusInternalServerError
148                 if err, ok := err.(*KeepError); ok {
149                         code = err.HTTPCode
150                 }
151                 http.Error(resp, err.Error(), code)
152                 return
153         }
154
155         resp.Header().Set("Content-Length", strconv.Itoa(size))
156         resp.Header().Set("Content-Type", "application/octet-stream")
157         resp.Write(buf[:size])
158 }
159
160 // Return a new context that gets cancelled by resp's CloseNotifier.
161 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
162         ctx, cancel := context.WithCancel(parent)
163         if cn, ok := resp.(http.CloseNotifier); ok {
164                 go func(c <-chan bool) {
165                         select {
166                         case <-c:
167                                 cancel()
168                         case <-ctx.Done():
169                         }
170                 }(cn.CloseNotify())
171         }
172         return ctx, cancel
173 }
174
175 // Get a buffer from the pool -- but give up and return a non-nil
176 // error if ctx ends before we get a buffer.
177 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
178         bufReady := make(chan []byte)
179         go func() {
180                 bufReady <- bufs.Get(bufSize)
181         }()
182         select {
183         case buf := <-bufReady:
184                 return buf, nil
185         case <-ctx.Done():
186                 go func() {
187                         // Even if closeNotifier happened first, we
188                         // need to keep waiting for our buf so we can
189                         // return it to the pool.
190                         bufs.Put(<-bufReady)
191                 }()
192                 return nil, ErrClientDisconnect
193         }
194 }
195
196 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
197         if !rtr.isSystemAuth(GetAPIToken(req)) {
198                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
199                 return
200         }
201         hash := mux.Vars(req)["hash"]
202         vols := rtr.volmgr.AllWritable()
203         if len(vols) == 0 {
204                 http.Error(resp, "no volumes", http.StatusNotFound)
205                 return
206         }
207         var err error
208         for _, mnt := range vols {
209                 err = mnt.Touch(hash)
210                 if err == nil {
211                         break
212                 }
213         }
214         switch {
215         case err == nil:
216                 return
217         case os.IsNotExist(err):
218                 http.Error(resp, err.Error(), http.StatusNotFound)
219         default:
220                 http.Error(resp, err.Error(), http.StatusInternalServerError)
221         }
222 }
223
224 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
225         ctx, cancel := contextForResponse(context.TODO(), resp)
226         defer cancel()
227
228         hash := mux.Vars(req)["hash"]
229
230         // Detect as many error conditions as possible before reading
231         // the body: avoid transmitting data that will not end up
232         // being written anyway.
233
234         if req.ContentLength == -1 {
235                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
236                 return
237         }
238
239         if req.ContentLength > BlockSize {
240                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
241                 return
242         }
243
244         if len(rtr.volmgr.AllWritable()) == 0 {
245                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
246                 return
247         }
248
249         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
250         if err != nil {
251                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
252                 return
253         }
254
255         _, err = io.ReadFull(req.Body, buf)
256         if err != nil {
257                 http.Error(resp, err.Error(), 500)
258                 bufs.Put(buf)
259                 return
260         }
261
262         replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
263         bufs.Put(buf)
264
265         if err != nil {
266                 code := http.StatusInternalServerError
267                 if err, ok := err.(*KeepError); ok {
268                         code = err.HTTPCode
269                 }
270                 http.Error(resp, err.Error(), code)
271                 return
272         }
273
274         // Success; add a size hint, sign the locator if possible, and
275         // return it to the client.
276         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
277         apiToken := GetAPIToken(req)
278         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
279                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
280                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
281         }
282         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
283         resp.Write([]byte(returnHash + "\n"))
284 }
285
286 // IndexHandler responds to "/index", "/index/{prefix}", and
287 // "/mounts/{uuid}/blocks" requests.
288 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
289         if !rtr.isSystemAuth(GetAPIToken(req)) {
290                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
291                 return
292         }
293
294         prefix := mux.Vars(req)["prefix"]
295         if prefix == "" {
296                 req.ParseForm()
297                 prefix = req.Form.Get("prefix")
298         }
299
300         uuid := mux.Vars(req)["uuid"]
301
302         var vols []*VolumeMount
303         if uuid == "" {
304                 vols = rtr.volmgr.AllReadable()
305         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
306                 http.Error(resp, "mount not found", http.StatusNotFound)
307                 return
308         } else {
309                 vols = []*VolumeMount{mnt}
310         }
311
312         for _, v := range vols {
313                 if err := v.IndexTo(prefix, resp); err != nil {
314                         // We can't send an error status/message to
315                         // the client because IndexTo() might have
316                         // already written body content. All we can do
317                         // is log the error in our own logs.
318                         //
319                         // The client must notice the lack of trailing
320                         // newline as an indication that the response
321                         // is incomplete.
322                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
323                         return
324                 }
325         }
326         // An empty line at EOF is the only way the client can be
327         // assured the entire index was received.
328         resp.Write([]byte{'\n'})
329 }
330
331 // MountsHandler responds to "GET /mounts" requests.
332 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
333         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
334         if err != nil {
335                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
336         }
337 }
338
339 // PoolStatus struct
340 type PoolStatus struct {
341         Alloc uint64 `json:"BytesAllocatedCumulative"`
342         Cap   int    `json:"BuffersMax"`
343         Len   int    `json:"BuffersInUse"`
344 }
345
346 type volumeStatusEnt struct {
347         Label         string
348         Status        *VolumeStatus `json:",omitempty"`
349         VolumeStats   *ioStats      `json:",omitempty"`
350         InternalStats interface{}   `json:",omitempty"`
351 }
352
353 // NodeStatus struct
354 type NodeStatus struct {
355         Volumes         []*volumeStatusEnt
356         BufferPool      PoolStatus
357         PullQueue       WorkQueueStatus
358         TrashQueue      WorkQueueStatus
359         RequestsCurrent int
360         RequestsMax     int
361         Version         string
362 }
363
364 var st NodeStatus
365 var stLock sync.Mutex
366
367 // DebugHandler addresses /debug.json requests.
368 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
369         type debugStats struct {
370                 MemStats runtime.MemStats
371         }
372         var ds debugStats
373         runtime.ReadMemStats(&ds.MemStats)
374         data, err := json.Marshal(&ds)
375         if err != nil {
376                 http.Error(resp, err.Error(), http.StatusInternalServerError)
377                 return
378         }
379         resp.Write(data)
380 }
381
382 // StatusHandler addresses /status.json requests.
383 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
384         stLock.Lock()
385         rtr.readNodeStatus(&st)
386         data, err := json.Marshal(&st)
387         stLock.Unlock()
388         if err != nil {
389                 http.Error(resp, err.Error(), http.StatusInternalServerError)
390                 return
391         }
392         resp.Write(data)
393 }
394
395 // populate the given NodeStatus struct with current values.
396 func (rtr *router) readNodeStatus(st *NodeStatus) {
397         st.Version = version
398         vols := rtr.volmgr.AllReadable()
399         if cap(st.Volumes) < len(vols) {
400                 st.Volumes = make([]*volumeStatusEnt, len(vols))
401         }
402         st.Volumes = st.Volumes[:0]
403         for _, vol := range vols {
404                 var internalStats interface{}
405                 if vol, ok := vol.Volume.(InternalStatser); ok {
406                         internalStats = vol.InternalStats()
407                 }
408                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
409                         Label:         vol.String(),
410                         Status:        vol.Status(),
411                         InternalStats: internalStats,
412                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
413                 })
414         }
415         st.BufferPool.Alloc = bufs.Alloc()
416         st.BufferPool.Cap = bufs.Cap()
417         st.BufferPool.Len = bufs.Len()
418         st.PullQueue = getWorkQueueStatus(rtr.pullq)
419         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
420 }
421
422 // return a WorkQueueStatus for the given queue. If q is nil (which
423 // should never happen except in test suites), return a zero status
424 // value instead of crashing.
425 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
426         if q == nil {
427                 // This should only happen during tests.
428                 return WorkQueueStatus{}
429         }
430         return q.Status()
431 }
432
433 // handleDELETE processes DELETE requests.
434 //
435 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
436 // from all connected volumes.
437 //
438 // Only the Data Manager, or an Arvados admin with scope "all", are
439 // allowed to issue DELETE requests.  If a DELETE request is not
440 // authenticated or is issued by a non-admin user, the server returns
441 // a PermissionError.
442 //
443 // Upon receiving a valid request from an authorized user,
444 // handleDELETE deletes all copies of the specified block on local
445 // writable volumes.
446 //
447 // Response format:
448 //
449 // If the requested blocks was not found on any volume, the response
450 // code is HTTP 404 Not Found.
451 //
452 // Otherwise, the response code is 200 OK, with a response body
453 // consisting of the JSON message
454 //
455 //    {"copies_deleted":d,"copies_failed":f}
456 //
457 // where d and f are integers representing the number of blocks that
458 // were successfully and unsuccessfully deleted.
459 //
460 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
461         hash := mux.Vars(req)["hash"]
462
463         // Confirm that this user is an admin and has a token with unlimited scope.
464         var tok = GetAPIToken(req)
465         if tok == "" || !rtr.canDelete(tok) {
466                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
467                 return
468         }
469
470         if !rtr.cluster.Collections.BlobTrash {
471                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
472                 return
473         }
474
475         // Delete copies of this block from all available volumes.
476         // Report how many blocks were successfully deleted, and how
477         // many were found on writable volumes but not deleted.
478         var result struct {
479                 Deleted int `json:"copies_deleted"`
480                 Failed  int `json:"copies_failed"`
481         }
482         for _, vol := range rtr.volmgr.AllWritable() {
483                 if err := vol.Trash(hash); err == nil {
484                         result.Deleted++
485                 } else if os.IsNotExist(err) {
486                         continue
487                 } else {
488                         result.Failed++
489                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
490                 }
491         }
492         if result.Deleted == 0 && result.Failed == 0 {
493                 resp.WriteHeader(http.StatusNotFound)
494                 return
495         }
496         body, err := json.Marshal(result)
497         if err != nil {
498                 http.Error(resp, err.Error(), http.StatusInternalServerError)
499                 return
500         }
501         resp.Write(body)
502 }
503
504 /* PullHandler processes "PUT /pull" requests for the data manager.
505    The request body is a JSON message containing a list of pull
506    requests in the following format:
507
508    [
509       {
510          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
511          "servers":[
512                         "keep0.qr1hi.arvadosapi.com:25107",
513                         "keep1.qr1hi.arvadosapi.com:25108"
514                  ]
515           },
516           {
517                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
518                  "servers":[
519                         "10.0.1.5:25107",
520                         "10.0.1.6:25107",
521                         "10.0.1.7:25108"
522                  ]
523           },
524           ...
525    ]
526
527    Each pull request in the list consists of a block locator string
528    and an ordered list of servers.  Keepstore should try to fetch the
529    block from each server in turn.
530
531    If the request has not been sent by the Data Manager, return 401
532    Unauthorized.
533
534    If the JSON unmarshalling fails, return 400 Bad Request.
535 */
536
537 // PullRequest consists of a block locator and an ordered list of servers
538 type PullRequest struct {
539         Locator string   `json:"locator"`
540         Servers []string `json:"servers"`
541
542         // Destination mount, or "" for "anywhere"
543         MountUUID string `json:"mount_uuid"`
544 }
545
546 // PullHandler processes "PUT /pull" requests for the data manager.
547 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
548         // Reject unauthorized requests.
549         if !rtr.isSystemAuth(GetAPIToken(req)) {
550                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
551                 return
552         }
553
554         // Parse the request body.
555         var pr []PullRequest
556         r := json.NewDecoder(req.Body)
557         if err := r.Decode(&pr); err != nil {
558                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
559                 return
560         }
561
562         // We have a properly formatted pull list sent from the data
563         // manager.  Report success and send the list to the pull list
564         // manager for further handling.
565         resp.WriteHeader(http.StatusOK)
566         resp.Write([]byte(
567                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
568
569         plist := list.New()
570         for _, p := range pr {
571                 plist.PushBack(p)
572         }
573         rtr.pullq.ReplaceQueue(plist)
574 }
575
576 // TrashRequest consists of a block locator and its Mtime
577 type TrashRequest struct {
578         Locator    string `json:"locator"`
579         BlockMtime int64  `json:"block_mtime"`
580
581         // Target mount, or "" for "everywhere"
582         MountUUID string `json:"mount_uuid"`
583 }
584
585 // TrashHandler processes /trash requests.
586 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
587         // Reject unauthorized requests.
588         if !rtr.isSystemAuth(GetAPIToken(req)) {
589                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
590                 return
591         }
592
593         // Parse the request body.
594         var trash []TrashRequest
595         r := json.NewDecoder(req.Body)
596         if err := r.Decode(&trash); err != nil {
597                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
598                 return
599         }
600
601         // We have a properly formatted trash list sent from the data
602         // manager.  Report success and send the list to the trash work
603         // queue for further handling.
604         resp.WriteHeader(http.StatusOK)
605         resp.Write([]byte(
606                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
607
608         tlist := list.New()
609         for _, t := range trash {
610                 tlist.PushBack(t)
611         }
612         rtr.trashq.ReplaceQueue(tlist)
613 }
614
615 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
616 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
617         // Reject unauthorized requests.
618         if !rtr.isSystemAuth(GetAPIToken(req)) {
619                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
620                 return
621         }
622
623         log := ctxlog.FromContext(req.Context())
624         hash := mux.Vars(req)["hash"]
625
626         if len(rtr.volmgr.AllWritable()) == 0 {
627                 http.Error(resp, "No writable volumes", http.StatusNotFound)
628                 return
629         }
630
631         var untrashedOn, failedOn []string
632         var numNotFound int
633         for _, vol := range rtr.volmgr.AllWritable() {
634                 err := vol.Untrash(hash)
635
636                 if os.IsNotExist(err) {
637                         numNotFound++
638                 } else if err != nil {
639                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
640                         failedOn = append(failedOn, vol.String())
641                 } else {
642                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
643                         untrashedOn = append(untrashedOn, vol.String())
644                 }
645         }
646
647         if numNotFound == len(rtr.volmgr.AllWritable()) {
648                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
649         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
650                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
651         } else {
652                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
653                 if len(failedOn) > 0 {
654                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
655                         http.Error(resp, respBody, http.StatusInternalServerError)
656                 } else {
657                         fmt.Fprintln(resp, respBody)
658                 }
659         }
660 }
661
662 // GetBlock and PutBlock implement lower-level code for handling
663 // blocks by rooting through volumes connected to the local machine.
664 // Once the handler has determined that system policy permits the
665 // request, it calls these methods to perform the actual operation.
666 //
667 // TODO(twp): this code would probably be better located in the
668 // VolumeManager interface. As an abstraction, the VolumeManager
669 // should be the only part of the code that cares about which volume a
670 // block is stored on, so it should be responsible for figuring out
671 // which volume to check for fetching blocks, storing blocks, etc.
672
673 // GetBlock fetches the block identified by "hash" into the provided
674 // buf, and returns the data size.
675 //
676 // If the block cannot be found on any volume, returns NotFoundError.
677 //
678 // If the block found does not have the correct MD5 hash, returns
679 // DiskHashError.
680 //
681 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
682         log := ctxlog.FromContext(ctx)
683
684         // Attempt to read the requested hash from a keep volume.
685         errorToCaller := NotFoundError
686
687         for _, vol := range volmgr.AllReadable() {
688                 size, err := vol.Get(ctx, hash, buf)
689                 select {
690                 case <-ctx.Done():
691                         return 0, ErrClientDisconnect
692                 default:
693                 }
694                 if err != nil {
695                         // IsNotExist is an expected error and may be
696                         // ignored. All other errors are logged. In
697                         // any case we continue trying to read other
698                         // volumes. If all volumes report IsNotExist,
699                         // we return a NotFoundError.
700                         if !os.IsNotExist(err) {
701                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
702                         }
703                         // If some volume returns a transient error, return it to the caller
704                         // instead of "Not found" so it can retry.
705                         if err == VolumeBusyError {
706                                 errorToCaller = err.(*KeepError)
707                         }
708                         continue
709                 }
710                 // Check the file checksum.
711                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
712                 if filehash != hash {
713                         // TODO: Try harder to tell a sysadmin about
714                         // this.
715                         log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
716                         errorToCaller = DiskHashError
717                         continue
718                 }
719                 if errorToCaller == DiskHashError {
720                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
721                 }
722                 return size, nil
723         }
724         return 0, errorToCaller
725 }
726
727 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
728 //
729 // PutBlock(ctx, block, hash)
730 //   Stores the BLOCK (identified by the content id HASH) in Keep.
731 //
732 //   The MD5 checksum of the block must be identical to the content id HASH.
733 //   If not, an error is returned.
734 //
735 //   PutBlock stores the BLOCK on the first Keep volume with free space.
736 //   A failure code is returned to the user only if all volumes fail.
737 //
738 //   On success, PutBlock returns nil.
739 //   On failure, it returns a KeepError with one of the following codes:
740 //
741 //   500 Collision
742 //          A different block with the same hash already exists on this
743 //          Keep server.
744 //   422 MD5Fail
745 //          The MD5 hash of the BLOCK does not match the argument HASH.
746 //   503 Full
747 //          There was not enough space left in any Keep volume to store
748 //          the object.
749 //   500 Fail
750 //          The object could not be stored for some other reason (e.g.
751 //          all writes failed). The text of the error message should
752 //          provide as much detail as possible.
753 //
754 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
755         log := ctxlog.FromContext(ctx)
756
757         // Check that BLOCK's checksum matches HASH.
758         blockhash := fmt.Sprintf("%x", md5.Sum(block))
759         if blockhash != hash {
760                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
761                 return 0, RequestHashError
762         }
763
764         // If we already have this data, it's intact on disk, and we
765         // can update its timestamp, return success. If we have
766         // different data with the same hash, return failure.
767         if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
768                 return n, err
769         } else if ctx.Err() != nil {
770                 return 0, ErrClientDisconnect
771         }
772
773         // Choose a Keep volume to write to.
774         // If this volume fails, try all of the volumes in order.
775         if mnt := volmgr.NextWritable(); mnt != nil {
776                 if err := mnt.Put(ctx, hash, block); err != nil {
777                         log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
778                 } else {
779                         return mnt.Replication, nil // success!
780                 }
781         }
782         if ctx.Err() != nil {
783                 return 0, ErrClientDisconnect
784         }
785
786         writables := volmgr.AllWritable()
787         if len(writables) == 0 {
788                 log.Error("no writable volumes")
789                 return 0, FullError
790         }
791
792         allFull := true
793         for _, vol := range writables {
794                 err := vol.Put(ctx, hash, block)
795                 if ctx.Err() != nil {
796                         return 0, ErrClientDisconnect
797                 }
798                 switch err {
799                 case nil:
800                         return vol.Replication, nil // success!
801                 case FullError:
802                         continue
803                 default:
804                         // The volume is not full but the
805                         // write did not succeed.  Report the
806                         // error and continue trying.
807                         allFull = false
808                         log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
809                 }
810         }
811
812         if allFull {
813                 log.Error("all volumes are full")
814                 return 0, FullError
815         }
816         // Already logged the non-full errors.
817         return 0, GenericError
818 }
819
820 // CompareAndTouch returns the current replication level if one of the
821 // volumes already has the given content and it successfully updates
822 // the relevant block's modification time in order to protect it from
823 // premature garbage collection. Otherwise, it returns a non-nil
824 // error.
825 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
826         log := ctxlog.FromContext(ctx)
827         var bestErr error = NotFoundError
828         for _, mnt := range volmgr.AllWritable() {
829                 err := mnt.Compare(ctx, hash, buf)
830                 if ctx.Err() != nil {
831                         return 0, ctx.Err()
832                 } else if err == CollisionError {
833                         // Stop if we have a block with same hash but
834                         // different content. (It will be impossible
835                         // to tell which one is wanted if we have
836                         // both, so there's no point writing it even
837                         // on a different volume.)
838                         log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
839                         return 0, err
840                 } else if os.IsNotExist(err) {
841                         // Block does not exist. This is the only
842                         // "normal" error: we don't log anything.
843                         continue
844                 } else if err != nil {
845                         // Couldn't open file, data is corrupt on
846                         // disk, etc.: log this abnormal condition,
847                         // and try the next volume.
848                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
849                         continue
850                 }
851                 if err := mnt.Touch(hash); err != nil {
852                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
853                         bestErr = err
854                         continue
855                 }
856                 // Compare and Touch both worked --> done.
857                 return mnt.Replication, nil
858         }
859         return 0, bestErr
860 }
861
862 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
863
864 // IsValidLocator returns true if the specified string is a valid Keep locator.
865 //   When Keep is extended to support hash types other than MD5,
866 //   this should be updated to cover those as well.
867 //
868 func IsValidLocator(loc string) bool {
869         return validLocatorRe.MatchString(loc)
870 }
871
872 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
873
874 // GetAPIToken returns the OAuth2 token from the Authorization
875 // header of a HTTP request, or an empty string if no matching
876 // token is found.
877 func GetAPIToken(req *http.Request) string {
878         if auth, ok := req.Header["Authorization"]; ok {
879                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
880                         return match[2]
881                 }
882         }
883         return ""
884 }
885
886 // canDelete returns true if the user identified by apiToken is
887 // allowed to delete blocks.
888 func (rtr *router) canDelete(apiToken string) bool {
889         if apiToken == "" {
890                 return false
891         }
892         // Blocks may be deleted only when Keep has been configured with a
893         // data manager.
894         if rtr.isSystemAuth(apiToken) {
895                 return true
896         }
897         // TODO(twp): look up apiToken with the API server
898         // return true if is_admin is true and if the token
899         // has unlimited scope
900         return false
901 }
902
903 // isSystemAuth returns true if the given token is allowed to perform
904 // system level actions like deleting data.
905 func (rtr *router) isSystemAuth(token string) bool {
906         return token != "" && token == rtr.cluster.SystemRootToken
907 }