13382: Report storage class(es) in headers after successful write.
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "git.arvados.org/arvados.git/sdk/go/health"
26         "git.arvados.org/arvados.git/sdk/go/httpserver"
27         "github.com/gorilla/mux"
28         "github.com/prometheus/client_golang/prometheus"
29         "github.com/sirupsen/logrus"
30 )
31
32 type router struct {
33         *mux.Router
34         cluster     *arvados.Cluster
35         logger      logrus.FieldLogger
36         remoteProxy remoteProxy
37         metrics     *nodeMetrics
38         volmgr      *RRVolumeManager
39         pullq       *WorkQueue
40         trashq      *WorkQueue
41 }
42
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
46         rtr := &router{
47                 Router:  mux.NewRouter(),
48                 cluster: cluster,
49                 logger:  ctxlog.FromContext(ctx),
50                 metrics: &nodeMetrics{reg: reg},
51                 volmgr:  volmgr,
52                 pullq:   pullq,
53                 trashq:  trashq,
54         }
55
56         rtr.HandleFunc(
57                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
58         rtr.HandleFunc(
59                 `/{hash:[0-9a-f]{32}}+{hints}`,
60                 rtr.handleGET).Methods("GET", "HEAD")
61
62         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64         // List all blocks stored here. Privileged client only.
65         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66         // List blocks stored here whose hash has the given prefix.
67         // Privileged client only.
68         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
69         // Update timestamp on existing block. Privileged client only.
70         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
71
72         // Internals/debugging info (runtime.MemStats)
73         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
74
75         // List volumes: path, device number, bytes used/avail.
76         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
77
78         // List mounts: UUID, readonly, tier, device ID, ...
79         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
80         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
81         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
82
83         // Replace the current pull queue.
84         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
85
86         // Replace the current trash queue.
87         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
88
89         // Untrash moves blocks from trash back into store
90         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
91
92         rtr.Handle("/_health/{check}", &health.Handler{
93                 Token:  cluster.ManagementToken,
94                 Prefix: "/_health/",
95         }).Methods("GET")
96
97         // Any request which does not match any of these routes gets
98         // 400 Bad Request.
99         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
100
101         rtr.metrics.setupBufferPoolMetrics(bufs)
102         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
103         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
104
105         return rtr
106 }
107
108 // BadRequestHandler is a HandleFunc to address bad requests.
109 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
110         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
111 }
112
113 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
114         ctx, cancel := contextForResponse(context.TODO(), resp)
115         defer cancel()
116
117         locator := req.URL.Path[1:]
118         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
119                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
120                 return
121         }
122
123         if rtr.cluster.Collections.BlobSigning {
124                 locator := req.URL.Path[1:] // strip leading slash
125                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
126                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
127                         return
128                 }
129         }
130
131         // TODO: Probe volumes to check whether the block _might_
132         // exist. Some volumes/types could support a quick existence
133         // check without causing other operations to suffer. If all
134         // volumes support that, and assure us the block definitely
135         // isn't here, we can return 404 now instead of waiting for a
136         // buffer.
137
138         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
139         if err != nil {
140                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
141                 return
142         }
143         defer bufs.Put(buf)
144
145         size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
146         if err != nil {
147                 code := http.StatusInternalServerError
148                 if err, ok := err.(*KeepError); ok {
149                         code = err.HTTPCode
150                 }
151                 http.Error(resp, err.Error(), code)
152                 return
153         }
154
155         resp.Header().Set("Content-Length", strconv.Itoa(size))
156         resp.Header().Set("Content-Type", "application/octet-stream")
157         resp.Write(buf[:size])
158 }
159
160 // Return a new context that gets cancelled by resp's CloseNotifier.
161 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
162         ctx, cancel := context.WithCancel(parent)
163         if cn, ok := resp.(http.CloseNotifier); ok {
164                 go func(c <-chan bool) {
165                         select {
166                         case <-c:
167                                 cancel()
168                         case <-ctx.Done():
169                         }
170                 }(cn.CloseNotify())
171         }
172         return ctx, cancel
173 }
174
175 // Get a buffer from the pool -- but give up and return a non-nil
176 // error if ctx ends before we get a buffer.
177 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
178         bufReady := make(chan []byte)
179         go func() {
180                 bufReady <- bufs.Get(bufSize)
181         }()
182         select {
183         case buf := <-bufReady:
184                 return buf, nil
185         case <-ctx.Done():
186                 go func() {
187                         // Even if closeNotifier happened first, we
188                         // need to keep waiting for our buf so we can
189                         // return it to the pool.
190                         bufs.Put(<-bufReady)
191                 }()
192                 return nil, ErrClientDisconnect
193         }
194 }
195
196 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
197         if !rtr.isSystemAuth(GetAPIToken(req)) {
198                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
199                 return
200         }
201         hash := mux.Vars(req)["hash"]
202         vols := rtr.volmgr.AllWritable()
203         if len(vols) == 0 {
204                 http.Error(resp, "no volumes", http.StatusNotFound)
205                 return
206         }
207         var err error
208         for _, mnt := range vols {
209                 err = mnt.Touch(hash)
210                 if err == nil {
211                         break
212                 }
213         }
214         switch {
215         case err == nil:
216                 return
217         case os.IsNotExist(err):
218                 http.Error(resp, err.Error(), http.StatusNotFound)
219         default:
220                 http.Error(resp, err.Error(), http.StatusInternalServerError)
221         }
222 }
223
224 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
225         ctx, cancel := contextForResponse(context.TODO(), resp)
226         defer cancel()
227
228         hash := mux.Vars(req)["hash"]
229
230         // Detect as many error conditions as possible before reading
231         // the body: avoid transmitting data that will not end up
232         // being written anyway.
233
234         if req.ContentLength == -1 {
235                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
236                 return
237         }
238
239         if req.ContentLength > BlockSize {
240                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
241                 return
242         }
243
244         if len(rtr.volmgr.AllWritable()) == 0 {
245                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
246                 return
247         }
248
249         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
250         if err != nil {
251                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
252                 return
253         }
254
255         _, err = io.ReadFull(req.Body, buf)
256         if err != nil {
257                 http.Error(resp, err.Error(), 500)
258                 bufs.Put(buf)
259                 return
260         }
261
262         result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
263         bufs.Put(buf)
264
265         if err != nil {
266                 code := http.StatusInternalServerError
267                 if err, ok := err.(*KeepError); ok {
268                         code = err.HTTPCode
269                 }
270                 http.Error(resp, err.Error(), code)
271                 return
272         }
273
274         // Success; add a size hint, sign the locator if possible, and
275         // return it to the client.
276         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
277         apiToken := GetAPIToken(req)
278         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
279                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
280                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
281         }
282         resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
283         resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
284         resp.Write([]byte(returnHash + "\n"))
285 }
286
287 // IndexHandler responds to "/index", "/index/{prefix}", and
288 // "/mounts/{uuid}/blocks" requests.
289 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
290         if !rtr.isSystemAuth(GetAPIToken(req)) {
291                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
292                 return
293         }
294
295         prefix := mux.Vars(req)["prefix"]
296         if prefix == "" {
297                 req.ParseForm()
298                 prefix = req.Form.Get("prefix")
299         }
300
301         uuid := mux.Vars(req)["uuid"]
302
303         var vols []*VolumeMount
304         if uuid == "" {
305                 vols = rtr.volmgr.AllReadable()
306         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
307                 http.Error(resp, "mount not found", http.StatusNotFound)
308                 return
309         } else {
310                 vols = []*VolumeMount{mnt}
311         }
312
313         for _, v := range vols {
314                 if err := v.IndexTo(prefix, resp); err != nil {
315                         // We can't send an error status/message to
316                         // the client because IndexTo() might have
317                         // already written body content. All we can do
318                         // is log the error in our own logs.
319                         //
320                         // The client must notice the lack of trailing
321                         // newline as an indication that the response
322                         // is incomplete.
323                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
324                         return
325                 }
326         }
327         // An empty line at EOF is the only way the client can be
328         // assured the entire index was received.
329         resp.Write([]byte{'\n'})
330 }
331
332 // MountsHandler responds to "GET /mounts" requests.
333 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
334         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
335         if err != nil {
336                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
337         }
338 }
339
340 // PoolStatus struct
341 type PoolStatus struct {
342         Alloc uint64 `json:"BytesAllocatedCumulative"`
343         Cap   int    `json:"BuffersMax"`
344         Len   int    `json:"BuffersInUse"`
345 }
346
347 type volumeStatusEnt struct {
348         Label         string
349         Status        *VolumeStatus `json:",omitempty"`
350         VolumeStats   *ioStats      `json:",omitempty"`
351         InternalStats interface{}   `json:",omitempty"`
352 }
353
354 // NodeStatus struct
355 type NodeStatus struct {
356         Volumes         []*volumeStatusEnt
357         BufferPool      PoolStatus
358         PullQueue       WorkQueueStatus
359         TrashQueue      WorkQueueStatus
360         RequestsCurrent int
361         RequestsMax     int
362         Version         string
363 }
364
365 var st NodeStatus
366 var stLock sync.Mutex
367
368 // DebugHandler addresses /debug.json requests.
369 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
370         type debugStats struct {
371                 MemStats runtime.MemStats
372         }
373         var ds debugStats
374         runtime.ReadMemStats(&ds.MemStats)
375         data, err := json.Marshal(&ds)
376         if err != nil {
377                 http.Error(resp, err.Error(), http.StatusInternalServerError)
378                 return
379         }
380         resp.Write(data)
381 }
382
383 // StatusHandler addresses /status.json requests.
384 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
385         stLock.Lock()
386         rtr.readNodeStatus(&st)
387         data, err := json.Marshal(&st)
388         stLock.Unlock()
389         if err != nil {
390                 http.Error(resp, err.Error(), http.StatusInternalServerError)
391                 return
392         }
393         resp.Write(data)
394 }
395
396 // populate the given NodeStatus struct with current values.
397 func (rtr *router) readNodeStatus(st *NodeStatus) {
398         st.Version = version
399         vols := rtr.volmgr.AllReadable()
400         if cap(st.Volumes) < len(vols) {
401                 st.Volumes = make([]*volumeStatusEnt, len(vols))
402         }
403         st.Volumes = st.Volumes[:0]
404         for _, vol := range vols {
405                 var internalStats interface{}
406                 if vol, ok := vol.Volume.(InternalStatser); ok {
407                         internalStats = vol.InternalStats()
408                 }
409                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
410                         Label:         vol.String(),
411                         Status:        vol.Status(),
412                         InternalStats: internalStats,
413                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
414                 })
415         }
416         st.BufferPool.Alloc = bufs.Alloc()
417         st.BufferPool.Cap = bufs.Cap()
418         st.BufferPool.Len = bufs.Len()
419         st.PullQueue = getWorkQueueStatus(rtr.pullq)
420         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
421 }
422
423 // return a WorkQueueStatus for the given queue. If q is nil (which
424 // should never happen except in test suites), return a zero status
425 // value instead of crashing.
426 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
427         if q == nil {
428                 // This should only happen during tests.
429                 return WorkQueueStatus{}
430         }
431         return q.Status()
432 }
433
434 // handleDELETE processes DELETE requests.
435 //
436 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
437 // from all connected volumes.
438 //
439 // Only the Data Manager, or an Arvados admin with scope "all", are
440 // allowed to issue DELETE requests.  If a DELETE request is not
441 // authenticated or is issued by a non-admin user, the server returns
442 // a PermissionError.
443 //
444 // Upon receiving a valid request from an authorized user,
445 // handleDELETE deletes all copies of the specified block on local
446 // writable volumes.
447 //
448 // Response format:
449 //
450 // If the requested blocks was not found on any volume, the response
451 // code is HTTP 404 Not Found.
452 //
453 // Otherwise, the response code is 200 OK, with a response body
454 // consisting of the JSON message
455 //
456 //    {"copies_deleted":d,"copies_failed":f}
457 //
458 // where d and f are integers representing the number of blocks that
459 // were successfully and unsuccessfully deleted.
460 //
461 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
462         hash := mux.Vars(req)["hash"]
463
464         // Confirm that this user is an admin and has a token with unlimited scope.
465         var tok = GetAPIToken(req)
466         if tok == "" || !rtr.canDelete(tok) {
467                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
468                 return
469         }
470
471         if !rtr.cluster.Collections.BlobTrash {
472                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
473                 return
474         }
475
476         // Delete copies of this block from all available volumes.
477         // Report how many blocks were successfully deleted, and how
478         // many were found on writable volumes but not deleted.
479         var result struct {
480                 Deleted int `json:"copies_deleted"`
481                 Failed  int `json:"copies_failed"`
482         }
483         for _, vol := range rtr.volmgr.AllWritable() {
484                 if err := vol.Trash(hash); err == nil {
485                         result.Deleted++
486                 } else if os.IsNotExist(err) {
487                         continue
488                 } else {
489                         result.Failed++
490                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
491                 }
492         }
493         if result.Deleted == 0 && result.Failed == 0 {
494                 resp.WriteHeader(http.StatusNotFound)
495                 return
496         }
497         body, err := json.Marshal(result)
498         if err != nil {
499                 http.Error(resp, err.Error(), http.StatusInternalServerError)
500                 return
501         }
502         resp.Write(body)
503 }
504
505 /* PullHandler processes "PUT /pull" requests for the data manager.
506    The request body is a JSON message containing a list of pull
507    requests in the following format:
508
509    [
510       {
511          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
512          "servers":[
513                         "keep0.qr1hi.arvadosapi.com:25107",
514                         "keep1.qr1hi.arvadosapi.com:25108"
515                  ]
516           },
517           {
518                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
519                  "servers":[
520                         "10.0.1.5:25107",
521                         "10.0.1.6:25107",
522                         "10.0.1.7:25108"
523                  ]
524           },
525           ...
526    ]
527
528    Each pull request in the list consists of a block locator string
529    and an ordered list of servers.  Keepstore should try to fetch the
530    block from each server in turn.
531
532    If the request has not been sent by the Data Manager, return 401
533    Unauthorized.
534
535    If the JSON unmarshalling fails, return 400 Bad Request.
536 */
537
538 // PullRequest consists of a block locator and an ordered list of servers
539 type PullRequest struct {
540         Locator string   `json:"locator"`
541         Servers []string `json:"servers"`
542
543         // Destination mount, or "" for "anywhere"
544         MountUUID string `json:"mount_uuid"`
545 }
546
547 // PullHandler processes "PUT /pull" requests for the data manager.
548 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
549         // Reject unauthorized requests.
550         if !rtr.isSystemAuth(GetAPIToken(req)) {
551                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
552                 return
553         }
554
555         // Parse the request body.
556         var pr []PullRequest
557         r := json.NewDecoder(req.Body)
558         if err := r.Decode(&pr); err != nil {
559                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
560                 return
561         }
562
563         // We have a properly formatted pull list sent from the data
564         // manager.  Report success and send the list to the pull list
565         // manager for further handling.
566         resp.WriteHeader(http.StatusOK)
567         resp.Write([]byte(
568                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
569
570         plist := list.New()
571         for _, p := range pr {
572                 plist.PushBack(p)
573         }
574         rtr.pullq.ReplaceQueue(plist)
575 }
576
577 // TrashRequest consists of a block locator and its Mtime
578 type TrashRequest struct {
579         Locator    string `json:"locator"`
580         BlockMtime int64  `json:"block_mtime"`
581
582         // Target mount, or "" for "everywhere"
583         MountUUID string `json:"mount_uuid"`
584 }
585
586 // TrashHandler processes /trash requests.
587 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
588         // Reject unauthorized requests.
589         if !rtr.isSystemAuth(GetAPIToken(req)) {
590                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
591                 return
592         }
593
594         // Parse the request body.
595         var trash []TrashRequest
596         r := json.NewDecoder(req.Body)
597         if err := r.Decode(&trash); err != nil {
598                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
599                 return
600         }
601
602         // We have a properly formatted trash list sent from the data
603         // manager.  Report success and send the list to the trash work
604         // queue for further handling.
605         resp.WriteHeader(http.StatusOK)
606         resp.Write([]byte(
607                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
608
609         tlist := list.New()
610         for _, t := range trash {
611                 tlist.PushBack(t)
612         }
613         rtr.trashq.ReplaceQueue(tlist)
614 }
615
616 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
617 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
618         // Reject unauthorized requests.
619         if !rtr.isSystemAuth(GetAPIToken(req)) {
620                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
621                 return
622         }
623
624         log := ctxlog.FromContext(req.Context())
625         hash := mux.Vars(req)["hash"]
626
627         if len(rtr.volmgr.AllWritable()) == 0 {
628                 http.Error(resp, "No writable volumes", http.StatusNotFound)
629                 return
630         }
631
632         var untrashedOn, failedOn []string
633         var numNotFound int
634         for _, vol := range rtr.volmgr.AllWritable() {
635                 err := vol.Untrash(hash)
636
637                 if os.IsNotExist(err) {
638                         numNotFound++
639                 } else if err != nil {
640                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
641                         failedOn = append(failedOn, vol.String())
642                 } else {
643                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
644                         untrashedOn = append(untrashedOn, vol.String())
645                 }
646         }
647
648         if numNotFound == len(rtr.volmgr.AllWritable()) {
649                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
650         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
651                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
652         } else {
653                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
654                 if len(failedOn) > 0 {
655                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
656                         http.Error(resp, respBody, http.StatusInternalServerError)
657                 } else {
658                         fmt.Fprintln(resp, respBody)
659                 }
660         }
661 }
662
663 // GetBlock and PutBlock implement lower-level code for handling
664 // blocks by rooting through volumes connected to the local machine.
665 // Once the handler has determined that system policy permits the
666 // request, it calls these methods to perform the actual operation.
667 //
668 // TODO(twp): this code would probably be better located in the
669 // VolumeManager interface. As an abstraction, the VolumeManager
670 // should be the only part of the code that cares about which volume a
671 // block is stored on, so it should be responsible for figuring out
672 // which volume to check for fetching blocks, storing blocks, etc.
673
674 // GetBlock fetches the block identified by "hash" into the provided
675 // buf, and returns the data size.
676 //
677 // If the block cannot be found on any volume, returns NotFoundError.
678 //
679 // If the block found does not have the correct MD5 hash, returns
680 // DiskHashError.
681 //
682 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
683         log := ctxlog.FromContext(ctx)
684
685         // Attempt to read the requested hash from a keep volume.
686         errorToCaller := NotFoundError
687
688         for _, vol := range volmgr.AllReadable() {
689                 size, err := vol.Get(ctx, hash, buf)
690                 select {
691                 case <-ctx.Done():
692                         return 0, ErrClientDisconnect
693                 default:
694                 }
695                 if err != nil {
696                         // IsNotExist is an expected error and may be
697                         // ignored. All other errors are logged. In
698                         // any case we continue trying to read other
699                         // volumes. If all volumes report IsNotExist,
700                         // we return a NotFoundError.
701                         if !os.IsNotExist(err) {
702                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
703                         }
704                         // If some volume returns a transient error, return it to the caller
705                         // instead of "Not found" so it can retry.
706                         if err == VolumeBusyError {
707                                 errorToCaller = err.(*KeepError)
708                         }
709                         continue
710                 }
711                 // Check the file checksum.
712                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
713                 if filehash != hash {
714                         // TODO: Try harder to tell a sysadmin about
715                         // this.
716                         log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
717                         errorToCaller = DiskHashError
718                         continue
719                 }
720                 if errorToCaller == DiskHashError {
721                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
722                 }
723                 return size, nil
724         }
725         return 0, errorToCaller
726 }
727
728 type putResult struct {
729         totalReplication int
730         classReplication map[string]int
731 }
732
733 // Number of distinct replicas stored. "2" can mean the block was
734 // stored on 2 different volumes with replication 1, or on 1 volume
735 // with replication 2.
736 func (pr putResult) TotalReplication() string {
737         return strconv.Itoa(pr.totalReplication)
738 }
739
740 // Number of replicas satisfying each storage class, formatted like
741 // "default=2; special=1".
742 func (pr putResult) ClassReplication() string {
743         s := ""
744         for k, v := range pr.classReplication {
745                 if len(s) > 0 {
746                         s += ", "
747                 }
748                 s += k + "=" + strconv.Itoa(v)
749         }
750         return s
751 }
752
753 func newPutResult(mnt *VolumeMount) putResult {
754         result := putResult{
755                 totalReplication: mnt.Replication,
756                 classReplication: map[string]int{},
757         }
758         for class := range mnt.StorageClasses {
759                 result.classReplication[class] += mnt.Replication
760         }
761         return result
762 }
763
764 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
765 //
766 // PutBlock(ctx, block, hash)
767 //   Stores the BLOCK (identified by the content id HASH) in Keep.
768 //
769 //   The MD5 checksum of the block must be identical to the content id HASH.
770 //   If not, an error is returned.
771 //
772 //   PutBlock stores the BLOCK on the first Keep volume with free space.
773 //   A failure code is returned to the user only if all volumes fail.
774 //
775 //   On success, PutBlock returns nil.
776 //   On failure, it returns a KeepError with one of the following codes:
777 //
778 //   500 Collision
779 //          A different block with the same hash already exists on this
780 //          Keep server.
781 //   422 MD5Fail
782 //          The MD5 hash of the BLOCK does not match the argument HASH.
783 //   503 Full
784 //          There was not enough space left in any Keep volume to store
785 //          the object.
786 //   500 Fail
787 //          The object could not be stored for some other reason (e.g.
788 //          all writes failed). The text of the error message should
789 //          provide as much detail as possible.
790 //
791 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
792         log := ctxlog.FromContext(ctx)
793
794         // Check that BLOCK's checksum matches HASH.
795         blockhash := fmt.Sprintf("%x", md5.Sum(block))
796         if blockhash != hash {
797                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
798                 return putResult{}, RequestHashError
799         }
800
801         // If we already have this data, it's intact on disk, and we
802         // can update its timestamp, return success. If we have
803         // different data with the same hash, return failure.
804         if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
805                 return result, err
806         } else if ctx.Err() != nil {
807                 return putResult{}, ErrClientDisconnect
808         }
809
810         // Choose a Keep volume to write to.
811         // If this volume fails, try all of the volumes in order.
812         if mnt := volmgr.NextWritable(); mnt != nil {
813                 if err := mnt.Put(ctx, hash, block); err != nil {
814                         log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
815                 } else {
816                         return newPutResult(mnt), nil // success!
817                 }
818         }
819         if ctx.Err() != nil {
820                 return putResult{}, ErrClientDisconnect
821         }
822
823         writables := volmgr.AllWritable()
824         if len(writables) == 0 {
825                 log.Error("no writable volumes")
826                 return putResult{}, FullError
827         }
828
829         allFull := true
830         for _, mnt := range writables {
831                 err := mnt.Put(ctx, hash, block)
832                 if ctx.Err() != nil {
833                         return putResult{}, ErrClientDisconnect
834                 }
835                 switch err {
836                 case nil:
837                         return newPutResult(mnt), nil // success!
838                 case FullError:
839                         continue
840                 default:
841                         // The volume is not full but the
842                         // write did not succeed.  Report the
843                         // error and continue trying.
844                         allFull = false
845                         log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
846                 }
847         }
848
849         if allFull {
850                 log.Error("all volumes are full")
851                 return putResult{}, FullError
852         }
853         // Already logged the non-full errors.
854         return putResult{}, GenericError
855 }
856
857 // CompareAndTouch returns the current replication level if one of the
858 // volumes already has the given content and it successfully updates
859 // the relevant block's modification time in order to protect it from
860 // premature garbage collection. Otherwise, it returns a non-nil
861 // error.
862 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
863         log := ctxlog.FromContext(ctx)
864         var bestErr error = NotFoundError
865         for _, mnt := range volmgr.AllWritable() {
866                 err := mnt.Compare(ctx, hash, buf)
867                 if ctx.Err() != nil {
868                         return putResult{}, ctx.Err()
869                 } else if err == CollisionError {
870                         // Stop if we have a block with same hash but
871                         // different content. (It will be impossible
872                         // to tell which one is wanted if we have
873                         // both, so there's no point writing it even
874                         // on a different volume.)
875                         log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
876                         return putResult{}, err
877                 } else if os.IsNotExist(err) {
878                         // Block does not exist. This is the only
879                         // "normal" error: we don't log anything.
880                         continue
881                 } else if err != nil {
882                         // Couldn't open file, data is corrupt on
883                         // disk, etc.: log this abnormal condition,
884                         // and try the next volume.
885                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
886                         continue
887                 }
888                 if err := mnt.Touch(hash); err != nil {
889                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
890                         bestErr = err
891                         continue
892                 }
893                 // Compare and Touch both worked --> done.
894                 return newPutResult(mnt), nil
895         }
896         return putResult{}, bestErr
897 }
898
899 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
900
901 // IsValidLocator returns true if the specified string is a valid Keep locator.
902 //   When Keep is extended to support hash types other than MD5,
903 //   this should be updated to cover those as well.
904 //
905 func IsValidLocator(loc string) bool {
906         return validLocatorRe.MatchString(loc)
907 }
908
909 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
910
911 // GetAPIToken returns the OAuth2 token from the Authorization
912 // header of a HTTP request, or an empty string if no matching
913 // token is found.
914 func GetAPIToken(req *http.Request) string {
915         if auth, ok := req.Header["Authorization"]; ok {
916                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
917                         return match[2]
918                 }
919         }
920         return ""
921 }
922
923 // canDelete returns true if the user identified by apiToken is
924 // allowed to delete blocks.
925 func (rtr *router) canDelete(apiToken string) bool {
926         if apiToken == "" {
927                 return false
928         }
929         // Blocks may be deleted only when Keep has been configured with a
930         // data manager.
931         if rtr.isSystemAuth(apiToken) {
932                 return true
933         }
934         // TODO(twp): look up apiToken with the API server
935         // return true if is_admin is true and if the token
936         // has unlimited scope
937         return false
938 }
939
940 // isSystemAuth returns true if the given token is allowed to perform
941 // system level actions like deleting data.
942 func (rtr *router) isSystemAuth(token string) bool {
943         return token != "" && token == rtr.cluster.SystemRootToken
944 }