16159: Merge branch 'master' into 16159-token-expiration-on-logout
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "git.arvados.org/arvados.git/sdk/go/health"
26         "git.arvados.org/arvados.git/sdk/go/httpserver"
27         "github.com/gorilla/mux"
28         "github.com/prometheus/client_golang/prometheus"
29         "github.com/sirupsen/logrus"
30 )
31
32 type router struct {
33         *mux.Router
34         cluster     *arvados.Cluster
35         logger      logrus.FieldLogger
36         remoteProxy remoteProxy
37         metrics     *nodeMetrics
38         volmgr      *RRVolumeManager
39         pullq       *WorkQueue
40         trashq      *WorkQueue
41 }
42
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
46         rtr := &router{
47                 Router:  mux.NewRouter(),
48                 cluster: cluster,
49                 logger:  ctxlog.FromContext(ctx),
50                 metrics: &nodeMetrics{reg: reg},
51                 volmgr:  volmgr,
52                 pullq:   pullq,
53                 trashq:  trashq,
54         }
55
56         rtr.HandleFunc(
57                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
58         rtr.HandleFunc(
59                 `/{hash:[0-9a-f]{32}}+{hints}`,
60                 rtr.handleGET).Methods("GET", "HEAD")
61
62         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64         // List all blocks stored here. Privileged client only.
65         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66         // List blocks stored here whose hash has the given prefix.
67         // Privileged client only.
68         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
69         // Update timestamp on existing block. Privileged client only.
70         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
71
72         // Internals/debugging info (runtime.MemStats)
73         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
74
75         // List volumes: path, device number, bytes used/avail.
76         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
77
78         // List mounts: UUID, readonly, tier, device ID, ...
79         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
80         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
81         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
82
83         // Replace the current pull queue.
84         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
85
86         // Replace the current trash queue.
87         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
88
89         // Untrash moves blocks from trash back into store
90         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
91
92         rtr.Handle("/_health/{check}", &health.Handler{
93                 Token:  cluster.ManagementToken,
94                 Prefix: "/_health/",
95         }).Methods("GET")
96
97         // Any request which does not match any of these routes gets
98         // 400 Bad Request.
99         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
100
101         rtr.metrics.setupBufferPoolMetrics(bufs)
102         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
103         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
104
105         return rtr
106 }
107
108 // BadRequestHandler is a HandleFunc to address bad requests.
109 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
110         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
111 }
112
113 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
114         ctx, cancel := contextForResponse(context.TODO(), resp)
115         defer cancel()
116
117         locator := req.URL.Path[1:]
118         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
119                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
120                 return
121         }
122
123         if rtr.cluster.Collections.BlobSigning {
124                 locator := req.URL.Path[1:] // strip leading slash
125                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
126                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
127                         return
128                 }
129         }
130
131         // TODO: Probe volumes to check whether the block _might_
132         // exist. Some volumes/types could support a quick existence
133         // check without causing other operations to suffer. If all
134         // volumes support that, and assure us the block definitely
135         // isn't here, we can return 404 now instead of waiting for a
136         // buffer.
137
138         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
139         if err != nil {
140                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
141                 return
142         }
143         defer bufs.Put(buf)
144
145         size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
146         if err != nil {
147                 code := http.StatusInternalServerError
148                 if err, ok := err.(*KeepError); ok {
149                         code = err.HTTPCode
150                 }
151                 http.Error(resp, err.Error(), code)
152                 return
153         }
154
155         resp.Header().Set("Content-Length", strconv.Itoa(size))
156         resp.Header().Set("Content-Type", "application/octet-stream")
157         resp.Write(buf[:size])
158 }
159
160 // Return a new context that gets cancelled by resp's CloseNotifier.
161 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
162         ctx, cancel := context.WithCancel(parent)
163         if cn, ok := resp.(http.CloseNotifier); ok {
164                 go func(c <-chan bool) {
165                         select {
166                         case <-c:
167                                 cancel()
168                         case <-ctx.Done():
169                         }
170                 }(cn.CloseNotify())
171         }
172         return ctx, cancel
173 }
174
175 // Get a buffer from the pool -- but give up and return a non-nil
176 // error if ctx ends before we get a buffer.
177 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
178         bufReady := make(chan []byte)
179         go func() {
180                 bufReady <- bufs.Get(bufSize)
181         }()
182         select {
183         case buf := <-bufReady:
184                 return buf, nil
185         case <-ctx.Done():
186                 go func() {
187                         // Even if closeNotifier happened first, we
188                         // need to keep waiting for our buf so we can
189                         // return it to the pool.
190                         bufs.Put(<-bufReady)
191                 }()
192                 return nil, ErrClientDisconnect
193         }
194 }
195
196 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
197         if !rtr.isSystemAuth(GetAPIToken(req)) {
198                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
199                 return
200         }
201         hash := mux.Vars(req)["hash"]
202         vols := rtr.volmgr.AllWritable()
203         if len(vols) == 0 {
204                 http.Error(resp, "no volumes", http.StatusNotFound)
205                 return
206         }
207         var err error
208         for _, mnt := range vols {
209                 err = mnt.Touch(hash)
210                 if err == nil {
211                         break
212                 }
213         }
214         switch {
215         case err == nil:
216                 return
217         case os.IsNotExist(err):
218                 http.Error(resp, err.Error(), http.StatusNotFound)
219         default:
220                 http.Error(resp, err.Error(), http.StatusInternalServerError)
221         }
222 }
223
224 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
225         ctx, cancel := contextForResponse(context.TODO(), resp)
226         defer cancel()
227
228         hash := mux.Vars(req)["hash"]
229
230         // Detect as many error conditions as possible before reading
231         // the body: avoid transmitting data that will not end up
232         // being written anyway.
233
234         if req.ContentLength == -1 {
235                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
236                 return
237         }
238
239         if req.ContentLength > BlockSize {
240                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
241                 return
242         }
243
244         if len(rtr.volmgr.AllWritable()) == 0 {
245                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
246                 return
247         }
248
249         var wantStorageClasses []string
250         if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
251                 wantStorageClasses = strings.Split(hdr, ",")
252                 for i, sc := range wantStorageClasses {
253                         wantStorageClasses[i] = strings.TrimSpace(sc)
254                 }
255         }
256
257         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
258         if err != nil {
259                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
260                 return
261         }
262
263         _, err = io.ReadFull(req.Body, buf)
264         if err != nil {
265                 http.Error(resp, err.Error(), 500)
266                 bufs.Put(buf)
267                 return
268         }
269
270         result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
271         bufs.Put(buf)
272
273         if err != nil {
274                 code := http.StatusInternalServerError
275                 if err, ok := err.(*KeepError); ok {
276                         code = err.HTTPCode
277                 }
278                 http.Error(resp, err.Error(), code)
279                 return
280         }
281
282         // Success; add a size hint, sign the locator if possible, and
283         // return it to the client.
284         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
285         apiToken := GetAPIToken(req)
286         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
287                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
288                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
289         }
290         resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
291         resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
292         resp.Write([]byte(returnHash + "\n"))
293 }
294
295 // IndexHandler responds to "/index", "/index/{prefix}", and
296 // "/mounts/{uuid}/blocks" requests.
297 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
298         if !rtr.isSystemAuth(GetAPIToken(req)) {
299                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
300                 return
301         }
302
303         prefix := mux.Vars(req)["prefix"]
304         if prefix == "" {
305                 req.ParseForm()
306                 prefix = req.Form.Get("prefix")
307         }
308
309         uuid := mux.Vars(req)["uuid"]
310
311         var vols []*VolumeMount
312         if uuid == "" {
313                 vols = rtr.volmgr.AllReadable()
314         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
315                 http.Error(resp, "mount not found", http.StatusNotFound)
316                 return
317         } else {
318                 vols = []*VolumeMount{mnt}
319         }
320
321         for _, v := range vols {
322                 if err := v.IndexTo(prefix, resp); err != nil {
323                         // We can't send an error status/message to
324                         // the client because IndexTo() might have
325                         // already written body content. All we can do
326                         // is log the error in our own logs.
327                         //
328                         // The client must notice the lack of trailing
329                         // newline as an indication that the response
330                         // is incomplete.
331                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
332                         return
333                 }
334         }
335         // An empty line at EOF is the only way the client can be
336         // assured the entire index was received.
337         resp.Write([]byte{'\n'})
338 }
339
340 // MountsHandler responds to "GET /mounts" requests.
341 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
342         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
343         if err != nil {
344                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
345         }
346 }
347
348 // PoolStatus struct
349 type PoolStatus struct {
350         Alloc uint64 `json:"BytesAllocatedCumulative"`
351         Cap   int    `json:"BuffersMax"`
352         Len   int    `json:"BuffersInUse"`
353 }
354
355 type volumeStatusEnt struct {
356         Label         string
357         Status        *VolumeStatus `json:",omitempty"`
358         VolumeStats   *ioStats      `json:",omitempty"`
359         InternalStats interface{}   `json:",omitempty"`
360 }
361
362 // NodeStatus struct
363 type NodeStatus struct {
364         Volumes         []*volumeStatusEnt
365         BufferPool      PoolStatus
366         PullQueue       WorkQueueStatus
367         TrashQueue      WorkQueueStatus
368         RequestsCurrent int
369         RequestsMax     int
370         Version         string
371 }
372
373 var st NodeStatus
374 var stLock sync.Mutex
375
376 // DebugHandler addresses /debug.json requests.
377 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
378         type debugStats struct {
379                 MemStats runtime.MemStats
380         }
381         var ds debugStats
382         runtime.ReadMemStats(&ds.MemStats)
383         data, err := json.Marshal(&ds)
384         if err != nil {
385                 http.Error(resp, err.Error(), http.StatusInternalServerError)
386                 return
387         }
388         resp.Write(data)
389 }
390
391 // StatusHandler addresses /status.json requests.
392 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
393         stLock.Lock()
394         rtr.readNodeStatus(&st)
395         data, err := json.Marshal(&st)
396         stLock.Unlock()
397         if err != nil {
398                 http.Error(resp, err.Error(), http.StatusInternalServerError)
399                 return
400         }
401         resp.Write(data)
402 }
403
404 // populate the given NodeStatus struct with current values.
405 func (rtr *router) readNodeStatus(st *NodeStatus) {
406         st.Version = version
407         vols := rtr.volmgr.AllReadable()
408         if cap(st.Volumes) < len(vols) {
409                 st.Volumes = make([]*volumeStatusEnt, len(vols))
410         }
411         st.Volumes = st.Volumes[:0]
412         for _, vol := range vols {
413                 var internalStats interface{}
414                 if vol, ok := vol.Volume.(InternalStatser); ok {
415                         internalStats = vol.InternalStats()
416                 }
417                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
418                         Label:         vol.String(),
419                         Status:        vol.Status(),
420                         InternalStats: internalStats,
421                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
422                 })
423         }
424         st.BufferPool.Alloc = bufs.Alloc()
425         st.BufferPool.Cap = bufs.Cap()
426         st.BufferPool.Len = bufs.Len()
427         st.PullQueue = getWorkQueueStatus(rtr.pullq)
428         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
429 }
430
431 // return a WorkQueueStatus for the given queue. If q is nil (which
432 // should never happen except in test suites), return a zero status
433 // value instead of crashing.
434 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
435         if q == nil {
436                 // This should only happen during tests.
437                 return WorkQueueStatus{}
438         }
439         return q.Status()
440 }
441
442 // handleDELETE processes DELETE requests.
443 //
444 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
445 // from all connected volumes.
446 //
447 // Only the Data Manager, or an Arvados admin with scope "all", are
448 // allowed to issue DELETE requests.  If a DELETE request is not
449 // authenticated or is issued by a non-admin user, the server returns
450 // a PermissionError.
451 //
452 // Upon receiving a valid request from an authorized user,
453 // handleDELETE deletes all copies of the specified block on local
454 // writable volumes.
455 //
456 // Response format:
457 //
458 // If the requested blocks was not found on any volume, the response
459 // code is HTTP 404 Not Found.
460 //
461 // Otherwise, the response code is 200 OK, with a response body
462 // consisting of the JSON message
463 //
464 //    {"copies_deleted":d,"copies_failed":f}
465 //
466 // where d and f are integers representing the number of blocks that
467 // were successfully and unsuccessfully deleted.
468 //
469 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
470         hash := mux.Vars(req)["hash"]
471
472         // Confirm that this user is an admin and has a token with unlimited scope.
473         var tok = GetAPIToken(req)
474         if tok == "" || !rtr.canDelete(tok) {
475                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
476                 return
477         }
478
479         if !rtr.cluster.Collections.BlobTrash {
480                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
481                 return
482         }
483
484         // Delete copies of this block from all available volumes.
485         // Report how many blocks were successfully deleted, and how
486         // many were found on writable volumes but not deleted.
487         var result struct {
488                 Deleted int `json:"copies_deleted"`
489                 Failed  int `json:"copies_failed"`
490         }
491         for _, vol := range rtr.volmgr.AllWritable() {
492                 if err := vol.Trash(hash); err == nil {
493                         result.Deleted++
494                 } else if os.IsNotExist(err) {
495                         continue
496                 } else {
497                         result.Failed++
498                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
499                 }
500         }
501         if result.Deleted == 0 && result.Failed == 0 {
502                 resp.WriteHeader(http.StatusNotFound)
503                 return
504         }
505         body, err := json.Marshal(result)
506         if err != nil {
507                 http.Error(resp, err.Error(), http.StatusInternalServerError)
508                 return
509         }
510         resp.Write(body)
511 }
512
513 /* PullHandler processes "PUT /pull" requests for the data manager.
514    The request body is a JSON message containing a list of pull
515    requests in the following format:
516
517    [
518       {
519          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
520          "servers":[
521                         "keep0.qr1hi.arvadosapi.com:25107",
522                         "keep1.qr1hi.arvadosapi.com:25108"
523                  ]
524           },
525           {
526                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
527                  "servers":[
528                         "10.0.1.5:25107",
529                         "10.0.1.6:25107",
530                         "10.0.1.7:25108"
531                  ]
532           },
533           ...
534    ]
535
536    Each pull request in the list consists of a block locator string
537    and an ordered list of servers.  Keepstore should try to fetch the
538    block from each server in turn.
539
540    If the request has not been sent by the Data Manager, return 401
541    Unauthorized.
542
543    If the JSON unmarshalling fails, return 400 Bad Request.
544 */
545
546 // PullRequest consists of a block locator and an ordered list of servers
547 type PullRequest struct {
548         Locator string   `json:"locator"`
549         Servers []string `json:"servers"`
550
551         // Destination mount, or "" for "anywhere"
552         MountUUID string `json:"mount_uuid"`
553 }
554
555 // PullHandler processes "PUT /pull" requests for the data manager.
556 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
557         // Reject unauthorized requests.
558         if !rtr.isSystemAuth(GetAPIToken(req)) {
559                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
560                 return
561         }
562
563         // Parse the request body.
564         var pr []PullRequest
565         r := json.NewDecoder(req.Body)
566         if err := r.Decode(&pr); err != nil {
567                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
568                 return
569         }
570
571         // We have a properly formatted pull list sent from the data
572         // manager.  Report success and send the list to the pull list
573         // manager for further handling.
574         resp.WriteHeader(http.StatusOK)
575         resp.Write([]byte(
576                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
577
578         plist := list.New()
579         for _, p := range pr {
580                 plist.PushBack(p)
581         }
582         rtr.pullq.ReplaceQueue(plist)
583 }
584
585 // TrashRequest consists of a block locator and its Mtime
586 type TrashRequest struct {
587         Locator    string `json:"locator"`
588         BlockMtime int64  `json:"block_mtime"`
589
590         // Target mount, or "" for "everywhere"
591         MountUUID string `json:"mount_uuid"`
592 }
593
594 // TrashHandler processes /trash requests.
595 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
596         // Reject unauthorized requests.
597         if !rtr.isSystemAuth(GetAPIToken(req)) {
598                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
599                 return
600         }
601
602         // Parse the request body.
603         var trash []TrashRequest
604         r := json.NewDecoder(req.Body)
605         if err := r.Decode(&trash); err != nil {
606                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
607                 return
608         }
609
610         // We have a properly formatted trash list sent from the data
611         // manager.  Report success and send the list to the trash work
612         // queue for further handling.
613         resp.WriteHeader(http.StatusOK)
614         resp.Write([]byte(
615                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
616
617         tlist := list.New()
618         for _, t := range trash {
619                 tlist.PushBack(t)
620         }
621         rtr.trashq.ReplaceQueue(tlist)
622 }
623
624 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
625 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
626         // Reject unauthorized requests.
627         if !rtr.isSystemAuth(GetAPIToken(req)) {
628                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
629                 return
630         }
631
632         log := ctxlog.FromContext(req.Context())
633         hash := mux.Vars(req)["hash"]
634
635         if len(rtr.volmgr.AllWritable()) == 0 {
636                 http.Error(resp, "No writable volumes", http.StatusNotFound)
637                 return
638         }
639
640         var untrashedOn, failedOn []string
641         var numNotFound int
642         for _, vol := range rtr.volmgr.AllWritable() {
643                 err := vol.Untrash(hash)
644
645                 if os.IsNotExist(err) {
646                         numNotFound++
647                 } else if err != nil {
648                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
649                         failedOn = append(failedOn, vol.String())
650                 } else {
651                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
652                         untrashedOn = append(untrashedOn, vol.String())
653                 }
654         }
655
656         if numNotFound == len(rtr.volmgr.AllWritable()) {
657                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
658         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
659                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
660         } else {
661                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
662                 if len(failedOn) > 0 {
663                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
664                         http.Error(resp, respBody, http.StatusInternalServerError)
665                 } else {
666                         fmt.Fprintln(resp, respBody)
667                 }
668         }
669 }
670
671 // GetBlock and PutBlock implement lower-level code for handling
672 // blocks by rooting through volumes connected to the local machine.
673 // Once the handler has determined that system policy permits the
674 // request, it calls these methods to perform the actual operation.
675 //
676 // TODO(twp): this code would probably be better located in the
677 // VolumeManager interface. As an abstraction, the VolumeManager
678 // should be the only part of the code that cares about which volume a
679 // block is stored on, so it should be responsible for figuring out
680 // which volume to check for fetching blocks, storing blocks, etc.
681
682 // GetBlock fetches the block identified by "hash" into the provided
683 // buf, and returns the data size.
684 //
685 // If the block cannot be found on any volume, returns NotFoundError.
686 //
687 // If the block found does not have the correct MD5 hash, returns
688 // DiskHashError.
689 //
690 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
691         log := ctxlog.FromContext(ctx)
692
693         // Attempt to read the requested hash from a keep volume.
694         errorToCaller := NotFoundError
695
696         for _, vol := range volmgr.AllReadable() {
697                 size, err := vol.Get(ctx, hash, buf)
698                 select {
699                 case <-ctx.Done():
700                         return 0, ErrClientDisconnect
701                 default:
702                 }
703                 if err != nil {
704                         // IsNotExist is an expected error and may be
705                         // ignored. All other errors are logged. In
706                         // any case we continue trying to read other
707                         // volumes. If all volumes report IsNotExist,
708                         // we return a NotFoundError.
709                         if !os.IsNotExist(err) {
710                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
711                         }
712                         // If some volume returns a transient error, return it to the caller
713                         // instead of "Not found" so it can retry.
714                         if err == VolumeBusyError {
715                                 errorToCaller = err.(*KeepError)
716                         }
717                         continue
718                 }
719                 // Check the file checksum.
720                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
721                 if filehash != hash {
722                         // TODO: Try harder to tell a sysadmin about
723                         // this.
724                         log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
725                         errorToCaller = DiskHashError
726                         continue
727                 }
728                 if errorToCaller == DiskHashError {
729                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
730                 }
731                 return size, nil
732         }
733         return 0, errorToCaller
734 }
735
736 type putProgress struct {
737         classTodo        map[string]bool
738         mountUsed        map[*VolumeMount]bool
739         totalReplication int
740         classDone        map[string]int
741 }
742
743 // Number of distinct replicas stored. "2" can mean the block was
744 // stored on 2 different volumes with replication 1, or on 1 volume
745 // with replication 2.
746 func (pr putProgress) TotalReplication() string {
747         return strconv.Itoa(pr.totalReplication)
748 }
749
750 // Number of replicas satisfying each storage class, formatted like
751 // "default=2; special=1".
752 func (pr putProgress) ClassReplication() string {
753         s := ""
754         for k, v := range pr.classDone {
755                 if len(s) > 0 {
756                         s += ", "
757                 }
758                 s += k + "=" + strconv.Itoa(v)
759         }
760         return s
761 }
762
763 func (pr *putProgress) Add(mnt *VolumeMount) {
764         if pr.mountUsed[mnt] {
765                 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
766                 return
767         }
768         pr.mountUsed[mnt] = true
769         pr.totalReplication += mnt.Replication
770         for class := range mnt.StorageClasses {
771                 pr.classDone[class] += mnt.Replication
772                 delete(pr.classTodo, class)
773         }
774 }
775
776 func (pr *putProgress) Done() bool {
777         return len(pr.classTodo) == 0 && pr.totalReplication > 0
778 }
779
780 func (pr *putProgress) Want(mnt *VolumeMount) bool {
781         if pr.Done() || pr.mountUsed[mnt] {
782                 return false
783         }
784         if len(pr.classTodo) == 0 {
785                 // none specified == "any"
786                 return true
787         }
788         for class := range mnt.StorageClasses {
789                 if pr.classTodo[class] {
790                         return true
791                 }
792         }
793         return false
794 }
795
796 func newPutResult(classes []string) putProgress {
797         pr := putProgress{
798                 classTodo: make(map[string]bool, len(classes)),
799                 classDone: map[string]int{},
800                 mountUsed: map[*VolumeMount]bool{},
801         }
802         for _, c := range classes {
803                 if c != "" {
804                         pr.classTodo[c] = true
805                 }
806         }
807         return pr
808 }
809
810 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
811 //
812 // PutBlock(ctx, block, hash)
813 //   Stores the BLOCK (identified by the content id HASH) in Keep.
814 //
815 //   The MD5 checksum of the block must be identical to the content id HASH.
816 //   If not, an error is returned.
817 //
818 //   PutBlock stores the BLOCK on the first Keep volume with free space.
819 //   A failure code is returned to the user only if all volumes fail.
820 //
821 //   On success, PutBlock returns nil.
822 //   On failure, it returns a KeepError with one of the following codes:
823 //
824 //   500 Collision
825 //          A different block with the same hash already exists on this
826 //          Keep server.
827 //   422 MD5Fail
828 //          The MD5 hash of the BLOCK does not match the argument HASH.
829 //   503 Full
830 //          There was not enough space left in any Keep volume to store
831 //          the object.
832 //   500 Fail
833 //          The object could not be stored for some other reason (e.g.
834 //          all writes failed). The text of the error message should
835 //          provide as much detail as possible.
836 //
837 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
838         log := ctxlog.FromContext(ctx)
839
840         // Check that BLOCK's checksum matches HASH.
841         blockhash := fmt.Sprintf("%x", md5.Sum(block))
842         if blockhash != hash {
843                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
844                 return putProgress{}, RequestHashError
845         }
846
847         result := newPutResult(wantStorageClasses)
848
849         // If we already have this data, it's intact on disk, and we
850         // can update its timestamp, return success. If we have
851         // different data with the same hash, return failure.
852         if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
853                 return result, err
854         }
855         if ctx.Err() != nil {
856                 return result, ErrClientDisconnect
857         }
858
859         // Choose a Keep volume to write to.
860         // If this volume fails, try all of the volumes in order.
861         if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
862                 // fall through to "try all volumes" below
863         } else if err := mnt.Put(ctx, hash, block); err != nil {
864                 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
865         } else {
866                 result.Add(mnt)
867                 if result.Done() {
868                         return result, nil
869                 }
870         }
871         if ctx.Err() != nil {
872                 return putProgress{}, ErrClientDisconnect
873         }
874
875         writables := volmgr.AllWritable()
876         if len(writables) == 0 {
877                 log.Error("no writable volumes")
878                 return putProgress{}, FullError
879         }
880
881         allFull := true
882         for _, mnt := range writables {
883                 if !result.Want(mnt) {
884                         continue
885                 }
886                 err := mnt.Put(ctx, hash, block)
887                 if ctx.Err() != nil {
888                         return result, ErrClientDisconnect
889                 }
890                 switch err {
891                 case nil:
892                         result.Add(mnt)
893                         if result.Done() {
894                                 return result, nil
895                         }
896                         continue
897                 case FullError:
898                         continue
899                 default:
900                         // The volume is not full but the
901                         // write did not succeed.  Report the
902                         // error and continue trying.
903                         allFull = false
904                         log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
905                 }
906         }
907
908         if result.totalReplication > 0 {
909                 // Some, but not all, of the storage classes were
910                 // satisfied. This qualifies as success.
911                 return result, nil
912         } else if allFull {
913                 log.Error("all volumes with qualifying storage classes are full")
914                 return putProgress{}, FullError
915         } else {
916                 // Already logged the non-full errors.
917                 return putProgress{}, GenericError
918         }
919 }
920
921 // CompareAndTouch looks for volumes where the given content already
922 // exists and its modification time can be updated (i.e., it is
923 // protected from garbage collection), and updates result accordingly.
924 // It returns when the result is Done() or all volumes have been
925 // checked.
926 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
927         log := ctxlog.FromContext(ctx)
928         for _, mnt := range volmgr.AllWritable() {
929                 if !result.Want(mnt) {
930                         continue
931                 }
932                 err := mnt.Compare(ctx, hash, buf)
933                 if ctx.Err() != nil {
934                         return nil
935                 } else if err == CollisionError {
936                         // Stop if we have a block with same hash but
937                         // different content. (It will be impossible
938                         // to tell which one is wanted if we have
939                         // both, so there's no point writing it even
940                         // on a different volume.)
941                         log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
942                         return CollisionError
943                 } else if os.IsNotExist(err) {
944                         // Block does not exist. This is the only
945                         // "normal" error: we don't log anything.
946                         continue
947                 } else if err != nil {
948                         // Couldn't open file, data is corrupt on
949                         // disk, etc.: log this abnormal condition,
950                         // and try the next volume.
951                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
952                         continue
953                 }
954                 if err := mnt.Touch(hash); err != nil {
955                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
956                         continue
957                 }
958                 // Compare and Touch both worked --> done.
959                 result.Add(mnt)
960                 if result.Done() {
961                         return nil
962                 }
963         }
964         return nil
965 }
966
967 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
968
969 // IsValidLocator returns true if the specified string is a valid Keep locator.
970 //   When Keep is extended to support hash types other than MD5,
971 //   this should be updated to cover those as well.
972 //
973 func IsValidLocator(loc string) bool {
974         return validLocatorRe.MatchString(loc)
975 }
976
977 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
978
979 // GetAPIToken returns the OAuth2 token from the Authorization
980 // header of a HTTP request, or an empty string if no matching
981 // token is found.
982 func GetAPIToken(req *http.Request) string {
983         if auth, ok := req.Header["Authorization"]; ok {
984                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
985                         return match[2]
986                 }
987         }
988         return ""
989 }
990
991 // canDelete returns true if the user identified by apiToken is
992 // allowed to delete blocks.
993 func (rtr *router) canDelete(apiToken string) bool {
994         if apiToken == "" {
995                 return false
996         }
997         // Blocks may be deleted only when Keep has been configured with a
998         // data manager.
999         if rtr.isSystemAuth(apiToken) {
1000                 return true
1001         }
1002         // TODO(twp): look up apiToken with the API server
1003         // return true if is_admin is true and if the token
1004         // has unlimited scope
1005         return false
1006 }
1007
1008 // isSystemAuth returns true if the given token is allowed to perform
1009 // system level actions like deleting data.
1010 func (rtr *router) isSystemAuth(token string) bool {
1011         return token != "" && token == rtr.cluster.SystemRootToken
1012 }