21217: Strip leading/trailing space chars from incoming tokens.
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/lib/cmd"
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "git.arvados.org/arvados.git/sdk/go/health"
28         "git.arvados.org/arvados.git/sdk/go/httpserver"
29         "github.com/gorilla/mux"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32 )
33
34 type router struct {
35         *mux.Router
36         cluster     *arvados.Cluster
37         logger      logrus.FieldLogger
38         remoteProxy remoteProxy
39         metrics     *nodeMetrics
40         volmgr      *RRVolumeManager
41         pullq       *WorkQueue
42         trashq      *WorkQueue
43 }
44
45 // MakeRESTRouter returns a new router that forwards all Keep requests
46 // to the appropriate handlers.
47 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
48         rtr := &router{
49                 Router:  mux.NewRouter(),
50                 cluster: cluster,
51                 logger:  ctxlog.FromContext(ctx),
52                 metrics: &nodeMetrics{reg: reg},
53                 volmgr:  volmgr,
54                 pullq:   pullq,
55                 trashq:  trashq,
56         }
57
58         rtr.HandleFunc(
59                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
60         rtr.HandleFunc(
61                 `/{hash:[0-9a-f]{32}}+{hints}`,
62                 rtr.handleGET).Methods("GET", "HEAD")
63
64         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
65         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
66         // List all blocks stored here. Privileged client only.
67         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
68         // List blocks stored here whose hash has the given prefix.
69         // Privileged client only.
70         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
71         // Update timestamp on existing block. Privileged client only.
72         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
73
74         // Internals/debugging info (runtime.MemStats)
75         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
76
77         // List volumes: path, device number, bytes used/avail.
78         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
79
80         // List mounts: UUID, readonly, tier, device ID, ...
81         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
82         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
83         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
84
85         // Replace the current pull queue.
86         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
87
88         // Replace the current trash queue.
89         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
90
91         // Untrash moves blocks from trash back into store
92         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
93
94         rtr.Handle("/_health/{check}", &health.Handler{
95                 Token:  cluster.ManagementToken,
96                 Prefix: "/_health/",
97         }).Methods("GET")
98
99         // Any request which does not match any of these routes gets
100         // 400 Bad Request.
101         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
102
103         rtr.metrics.setupBufferPoolMetrics(bufs)
104         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
105         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
106
107         return rtr
108 }
109
110 // BadRequestHandler is a HandleFunc to address bad requests.
111 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
112         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
113 }
114
115 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
116         locator := req.URL.Path[1:]
117         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118                 rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
119                 return
120         }
121
122         if rtr.cluster.Collections.BlobSigning {
123                 locator := req.URL.Path[1:] // strip leading slash
124                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
126                         return
127                 }
128         }
129
130         // TODO: Probe volumes to check whether the block _might_
131         // exist. Some volumes/types could support a quick existence
132         // check without causing other operations to suffer. If all
133         // volumes support that, and assure us the block definitely
134         // isn't here, we can return 404 now instead of waiting for a
135         // buffer.
136
137         buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
138         if err != nil {
139                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
140                 return
141         }
142         defer bufs.Put(buf)
143
144         size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
145         if err != nil {
146                 code := http.StatusInternalServerError
147                 if err, ok := err.(*KeepError); ok {
148                         code = err.HTTPCode
149                 }
150                 http.Error(resp, err.Error(), code)
151                 return
152         }
153
154         resp.Header().Set("Content-Length", strconv.Itoa(size))
155         resp.Header().Set("Content-Type", "application/octet-stream")
156         resp.Write(buf[:size])
157 }
158
159 // Get a buffer from the pool -- but give up and return a non-nil
160 // error if ctx ends before we get a buffer.
161 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
162         bufReady := make(chan []byte)
163         go func() {
164                 bufReady <- bufs.Get(bufSize)
165         }()
166         select {
167         case buf := <-bufReady:
168                 return buf, nil
169         case <-ctx.Done():
170                 go func() {
171                         // Even if closeNotifier happened first, we
172                         // need to keep waiting for our buf so we can
173                         // return it to the pool.
174                         bufs.Put(<-bufReady)
175                 }()
176                 return nil, ErrClientDisconnect
177         }
178 }
179
180 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
181         if !rtr.isSystemAuth(GetAPIToken(req)) {
182                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
183                 return
184         }
185         hash := mux.Vars(req)["hash"]
186         vols := rtr.volmgr.AllWritable()
187         if len(vols) == 0 {
188                 http.Error(resp, "no volumes", http.StatusNotFound)
189                 return
190         }
191         var err error
192         for _, mnt := range vols {
193                 err = mnt.Touch(hash)
194                 if err == nil {
195                         break
196                 }
197         }
198         switch {
199         case err == nil:
200                 return
201         case os.IsNotExist(err):
202                 http.Error(resp, err.Error(), http.StatusNotFound)
203         default:
204                 http.Error(resp, err.Error(), http.StatusInternalServerError)
205         }
206 }
207
208 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
209         hash := mux.Vars(req)["hash"]
210
211         // Detect as many error conditions as possible before reading
212         // the body: avoid transmitting data that will not end up
213         // being written anyway.
214
215         if req.ContentLength == -1 {
216                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
217                 return
218         }
219
220         if req.ContentLength > BlockSize {
221                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
222                 return
223         }
224
225         if len(rtr.volmgr.AllWritable()) == 0 {
226                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
227                 return
228         }
229
230         var wantStorageClasses []string
231         if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
232                 wantStorageClasses = strings.Split(hdr, ",")
233                 for i, sc := range wantStorageClasses {
234                         wantStorageClasses[i] = strings.TrimSpace(sc)
235                 }
236         } else {
237                 // none specified -- use configured default
238                 for class, cfg := range rtr.cluster.StorageClasses {
239                         if cfg.Default {
240                                 wantStorageClasses = append(wantStorageClasses, class)
241                         }
242                 }
243         }
244
245         buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
246         if err != nil {
247                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
248                 return
249         }
250
251         _, err = io.ReadFull(req.Body, buf)
252         if err != nil {
253                 http.Error(resp, err.Error(), 500)
254                 bufs.Put(buf)
255                 return
256         }
257
258         result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
259         bufs.Put(buf)
260
261         if err != nil {
262                 code := http.StatusInternalServerError
263                 if err, ok := err.(*KeepError); ok {
264                         code = err.HTTPCode
265                 }
266                 http.Error(resp, err.Error(), code)
267                 return
268         }
269
270         // Success; add a size hint, sign the locator if possible, and
271         // return it to the client.
272         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
273         apiToken := GetAPIToken(req)
274         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
275                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
276                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
277         }
278         resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
279         resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
280         resp.Write([]byte(returnHash + "\n"))
281 }
282
283 // IndexHandler responds to "/index", "/index/{prefix}", and
284 // "/mounts/{uuid}/blocks" requests.
285 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
286         if !rtr.isSystemAuth(GetAPIToken(req)) {
287                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
288                 return
289         }
290
291         prefix := mux.Vars(req)["prefix"]
292         if prefix == "" {
293                 req.ParseForm()
294                 prefix = req.Form.Get("prefix")
295         }
296
297         uuid := mux.Vars(req)["uuid"]
298
299         var vols []*VolumeMount
300         if uuid == "" {
301                 vols = rtr.volmgr.AllReadable()
302         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
303                 http.Error(resp, "mount not found", http.StatusNotFound)
304                 return
305         } else {
306                 vols = []*VolumeMount{mnt}
307         }
308
309         for _, v := range vols {
310                 if err := v.IndexTo(prefix, resp); err != nil {
311                         // We can't send an error status/message to
312                         // the client because IndexTo() might have
313                         // already written body content. All we can do
314                         // is log the error in our own logs.
315                         //
316                         // The client must notice the lack of trailing
317                         // newline as an indication that the response
318                         // is incomplete.
319                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
320                         return
321                 }
322         }
323         // An empty line at EOF is the only way the client can be
324         // assured the entire index was received.
325         resp.Write([]byte{'\n'})
326 }
327
328 // MountsHandler responds to "GET /mounts" requests.
329 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
330         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
331         if err != nil {
332                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
333         }
334 }
335
336 // PoolStatus struct
337 type PoolStatus struct {
338         Alloc uint64 `json:"BytesAllocatedCumulative"`
339         Cap   int    `json:"BuffersMax"`
340         Len   int    `json:"BuffersInUse"`
341 }
342
343 type volumeStatusEnt struct {
344         Label         string
345         Status        *VolumeStatus `json:",omitempty"`
346         VolumeStats   *ioStats      `json:",omitempty"`
347         InternalStats interface{}   `json:",omitempty"`
348 }
349
350 // NodeStatus struct
351 type NodeStatus struct {
352         Volumes         []*volumeStatusEnt
353         BufferPool      PoolStatus
354         PullQueue       WorkQueueStatus
355         TrashQueue      WorkQueueStatus
356         RequestsCurrent int
357         RequestsMax     int
358         Version         string
359 }
360
361 var st NodeStatus
362 var stLock sync.Mutex
363
364 // DebugHandler addresses /debug.json requests.
365 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
366         type debugStats struct {
367                 MemStats runtime.MemStats
368         }
369         var ds debugStats
370         runtime.ReadMemStats(&ds.MemStats)
371         data, err := json.Marshal(&ds)
372         if err != nil {
373                 http.Error(resp, err.Error(), http.StatusInternalServerError)
374                 return
375         }
376         resp.Write(data)
377 }
378
379 // StatusHandler addresses /status.json requests.
380 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
381         stLock.Lock()
382         rtr.readNodeStatus(&st)
383         data, err := json.Marshal(&st)
384         stLock.Unlock()
385         if err != nil {
386                 http.Error(resp, err.Error(), http.StatusInternalServerError)
387                 return
388         }
389         resp.Write(data)
390 }
391
392 // populate the given NodeStatus struct with current values.
393 func (rtr *router) readNodeStatus(st *NodeStatus) {
394         st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
395         vols := rtr.volmgr.AllReadable()
396         if cap(st.Volumes) < len(vols) {
397                 st.Volumes = make([]*volumeStatusEnt, len(vols))
398         }
399         st.Volumes = st.Volumes[:0]
400         for _, vol := range vols {
401                 var internalStats interface{}
402                 if vol, ok := vol.Volume.(InternalStatser); ok {
403                         internalStats = vol.InternalStats()
404                 }
405                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
406                         Label:         vol.String(),
407                         Status:        vol.Status(),
408                         InternalStats: internalStats,
409                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
410                 })
411         }
412         st.BufferPool.Alloc = bufs.Alloc()
413         st.BufferPool.Cap = bufs.Cap()
414         st.BufferPool.Len = bufs.Len()
415         st.PullQueue = getWorkQueueStatus(rtr.pullq)
416         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
417 }
418
419 // return a WorkQueueStatus for the given queue. If q is nil (which
420 // should never happen except in test suites), return a zero status
421 // value instead of crashing.
422 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
423         if q == nil {
424                 // This should only happen during tests.
425                 return WorkQueueStatus{}
426         }
427         return q.Status()
428 }
429
430 // handleDELETE processes DELETE requests.
431 //
432 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
433 // from all connected volumes.
434 //
435 // Only the Data Manager, or an Arvados admin with scope "all", are
436 // allowed to issue DELETE requests.  If a DELETE request is not
437 // authenticated or is issued by a non-admin user, the server returns
438 // a PermissionError.
439 //
440 // Upon receiving a valid request from an authorized user,
441 // handleDELETE deletes all copies of the specified block on local
442 // writable volumes.
443 //
444 // Response format:
445 //
446 // If the requested blocks was not found on any volume, the response
447 // code is HTTP 404 Not Found.
448 //
449 // Otherwise, the response code is 200 OK, with a response body
450 // consisting of the JSON message
451 //
452 //      {"copies_deleted":d,"copies_failed":f}
453 //
454 // where d and f are integers representing the number of blocks that
455 // were successfully and unsuccessfully deleted.
456 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
457         hash := mux.Vars(req)["hash"]
458
459         // Confirm that this user is an admin and has a token with unlimited scope.
460         var tok = GetAPIToken(req)
461         if tok == "" || !rtr.canDelete(tok) {
462                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
463                 return
464         }
465
466         if !rtr.cluster.Collections.BlobTrash {
467                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
468                 return
469         }
470
471         // Delete copies of this block from all available volumes.
472         // Report how many blocks were successfully deleted, and how
473         // many were found on writable volumes but not deleted.
474         var result struct {
475                 Deleted int `json:"copies_deleted"`
476                 Failed  int `json:"copies_failed"`
477         }
478         for _, vol := range rtr.volmgr.Mounts() {
479                 if !vol.KeepMount.AllowTrash {
480                         continue
481                 } else if err := vol.Trash(hash); err == nil {
482                         result.Deleted++
483                 } else if os.IsNotExist(err) {
484                         continue
485                 } else {
486                         result.Failed++
487                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
488                 }
489         }
490         if result.Deleted == 0 && result.Failed == 0 {
491                 resp.WriteHeader(http.StatusNotFound)
492                 return
493         }
494         body, err := json.Marshal(result)
495         if err != nil {
496                 http.Error(resp, err.Error(), http.StatusInternalServerError)
497                 return
498         }
499         resp.Write(body)
500 }
501
502 /* PullHandler processes "PUT /pull" requests for the data manager.
503    The request body is a JSON message containing a list of pull
504    requests in the following format:
505
506    [
507       {
508          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
509          "servers":[
510                         "keep0.qr1hi.arvadosapi.com:25107",
511                         "keep1.qr1hi.arvadosapi.com:25108"
512                  ]
513           },
514           {
515                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
516                  "servers":[
517                         "10.0.1.5:25107",
518                         "10.0.1.6:25107",
519                         "10.0.1.7:25108"
520                  ]
521           },
522           ...
523    ]
524
525    Each pull request in the list consists of a block locator string
526    and an ordered list of servers.  Keepstore should try to fetch the
527    block from each server in turn.
528
529    If the request has not been sent by the Data Manager, return 401
530    Unauthorized.
531
532    If the JSON unmarshalling fails, return 400 Bad Request.
533 */
534
535 // PullRequest consists of a block locator and an ordered list of servers
536 type PullRequest struct {
537         Locator string   `json:"locator"`
538         Servers []string `json:"servers"`
539
540         // Destination mount, or "" for "anywhere"
541         MountUUID string `json:"mount_uuid"`
542 }
543
544 // PullHandler processes "PUT /pull" requests for the data manager.
545 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
546         // Reject unauthorized requests.
547         if !rtr.isSystemAuth(GetAPIToken(req)) {
548                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
549                 return
550         }
551
552         // Parse the request body.
553         var pr []PullRequest
554         r := json.NewDecoder(req.Body)
555         if err := r.Decode(&pr); err != nil {
556                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
557                 return
558         }
559
560         // We have a properly formatted pull list sent from the data
561         // manager.  Report success and send the list to the pull list
562         // manager for further handling.
563         resp.WriteHeader(http.StatusOK)
564         resp.Write([]byte(
565                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
566
567         plist := list.New()
568         for _, p := range pr {
569                 plist.PushBack(p)
570         }
571         rtr.pullq.ReplaceQueue(plist)
572 }
573
574 // TrashRequest consists of a block locator and its Mtime
575 type TrashRequest struct {
576         Locator    string `json:"locator"`
577         BlockMtime int64  `json:"block_mtime"`
578
579         // Target mount, or "" for "everywhere"
580         MountUUID string `json:"mount_uuid"`
581 }
582
583 // TrashHandler processes /trash requests.
584 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
585         // Reject unauthorized requests.
586         if !rtr.isSystemAuth(GetAPIToken(req)) {
587                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
588                 return
589         }
590
591         // Parse the request body.
592         var trash []TrashRequest
593         r := json.NewDecoder(req.Body)
594         if err := r.Decode(&trash); err != nil {
595                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
596                 return
597         }
598
599         // We have a properly formatted trash list sent from the data
600         // manager.  Report success and send the list to the trash work
601         // queue for further handling.
602         resp.WriteHeader(http.StatusOK)
603         resp.Write([]byte(
604                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
605
606         tlist := list.New()
607         for _, t := range trash {
608                 tlist.PushBack(t)
609         }
610         rtr.trashq.ReplaceQueue(tlist)
611 }
612
613 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
614 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
615         // Reject unauthorized requests.
616         if !rtr.isSystemAuth(GetAPIToken(req)) {
617                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
618                 return
619         }
620
621         log := ctxlog.FromContext(req.Context())
622         hash := mux.Vars(req)["hash"]
623
624         if len(rtr.volmgr.AllWritable()) == 0 {
625                 http.Error(resp, "No writable volumes", http.StatusNotFound)
626                 return
627         }
628
629         var untrashedOn, failedOn []string
630         var numNotFound int
631         for _, vol := range rtr.volmgr.AllWritable() {
632                 err := vol.Untrash(hash)
633
634                 if os.IsNotExist(err) {
635                         numNotFound++
636                 } else if err != nil {
637                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
638                         failedOn = append(failedOn, vol.String())
639                 } else {
640                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
641                         untrashedOn = append(untrashedOn, vol.String())
642                 }
643         }
644
645         if numNotFound == len(rtr.volmgr.AllWritable()) {
646                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
647         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
648                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
649         } else {
650                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
651                 if len(failedOn) > 0 {
652                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
653                         http.Error(resp, respBody, http.StatusInternalServerError)
654                 } else {
655                         fmt.Fprintln(resp, respBody)
656                 }
657         }
658 }
659
660 // GetBlock and PutBlock implement lower-level code for handling
661 // blocks by rooting through volumes connected to the local machine.
662 // Once the handler has determined that system policy permits the
663 // request, it calls these methods to perform the actual operation.
664 //
665 // TODO(twp): this code would probably be better located in the
666 // VolumeManager interface. As an abstraction, the VolumeManager
667 // should be the only part of the code that cares about which volume a
668 // block is stored on, so it should be responsible for figuring out
669 // which volume to check for fetching blocks, storing blocks, etc.
670
671 // GetBlock fetches the block identified by "hash" into the provided
672 // buf, and returns the data size.
673 //
674 // If the block cannot be found on any volume, returns NotFoundError.
675 //
676 // If the block found does not have the correct MD5 hash, returns
677 // DiskHashError.
678 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
679         log := ctxlog.FromContext(ctx)
680
681         // Attempt to read the requested hash from a keep volume.
682         errorToCaller := NotFoundError
683
684         for _, vol := range volmgr.AllReadable() {
685                 size, err := vol.Get(ctx, hash, buf)
686                 select {
687                 case <-ctx.Done():
688                         return 0, ErrClientDisconnect
689                 default:
690                 }
691                 if err != nil {
692                         // IsNotExist is an expected error and may be
693                         // ignored. All other errors are logged. In
694                         // any case we continue trying to read other
695                         // volumes. If all volumes report IsNotExist,
696                         // we return a NotFoundError.
697                         if !os.IsNotExist(err) {
698                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
699                         }
700                         // If some volume returns a transient error, return it to the caller
701                         // instead of "Not found" so it can retry.
702                         if err == VolumeBusyError {
703                                 errorToCaller = err.(*KeepError)
704                         }
705                         continue
706                 }
707                 // Check the file checksum.
708                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
709                 if filehash != hash {
710                         // TODO: Try harder to tell a sysadmin about
711                         // this.
712                         log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
713                         errorToCaller = DiskHashError
714                         continue
715                 }
716                 if errorToCaller == DiskHashError {
717                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
718                 }
719                 return size, nil
720         }
721         return 0, errorToCaller
722 }
723
724 type putProgress struct {
725         classNeeded      map[string]bool
726         classTodo        map[string]bool
727         mountUsed        map[*VolumeMount]bool
728         totalReplication int
729         classDone        map[string]int
730 }
731
732 // Number of distinct replicas stored. "2" can mean the block was
733 // stored on 2 different volumes with replication 1, or on 1 volume
734 // with replication 2.
735 func (pr putProgress) TotalReplication() string {
736         return strconv.Itoa(pr.totalReplication)
737 }
738
739 // Number of replicas satisfying each storage class, formatted like
740 // "default=2; special=1".
741 func (pr putProgress) ClassReplication() string {
742         s := ""
743         for k, v := range pr.classDone {
744                 if len(s) > 0 {
745                         s += ", "
746                 }
747                 s += k + "=" + strconv.Itoa(v)
748         }
749         return s
750 }
751
752 func (pr *putProgress) Add(mnt *VolumeMount) {
753         if pr.mountUsed[mnt] {
754                 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
755                 return
756         }
757         pr.mountUsed[mnt] = true
758         pr.totalReplication += mnt.Replication
759         for class := range mnt.StorageClasses {
760                 pr.classDone[class] += mnt.Replication
761                 delete(pr.classTodo, class)
762         }
763 }
764
765 func (pr *putProgress) Sub(mnt *VolumeMount) {
766         if !pr.mountUsed[mnt] {
767                 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
768                 return
769         }
770         pr.mountUsed[mnt] = false
771         pr.totalReplication -= mnt.Replication
772         for class := range mnt.StorageClasses {
773                 pr.classDone[class] -= mnt.Replication
774                 if pr.classNeeded[class] {
775                         pr.classTodo[class] = true
776                 }
777         }
778 }
779
780 func (pr *putProgress) Done() bool {
781         return len(pr.classTodo) == 0 && pr.totalReplication > 0
782 }
783
784 func (pr *putProgress) Want(mnt *VolumeMount) bool {
785         if pr.Done() || pr.mountUsed[mnt] {
786                 return false
787         }
788         if len(pr.classTodo) == 0 {
789                 // none specified == "any"
790                 return true
791         }
792         for class := range mnt.StorageClasses {
793                 if pr.classTodo[class] {
794                         return true
795                 }
796         }
797         return false
798 }
799
800 func (pr *putProgress) Copy() *putProgress {
801         cp := putProgress{
802                 classNeeded:      pr.classNeeded,
803                 classTodo:        make(map[string]bool, len(pr.classTodo)),
804                 classDone:        make(map[string]int, len(pr.classDone)),
805                 mountUsed:        make(map[*VolumeMount]bool, len(pr.mountUsed)),
806                 totalReplication: pr.totalReplication,
807         }
808         for k, v := range pr.classTodo {
809                 cp.classTodo[k] = v
810         }
811         for k, v := range pr.classDone {
812                 cp.classDone[k] = v
813         }
814         for k, v := range pr.mountUsed {
815                 cp.mountUsed[k] = v
816         }
817         return &cp
818 }
819
820 func newPutProgress(classes []string) putProgress {
821         pr := putProgress{
822                 classNeeded: make(map[string]bool, len(classes)),
823                 classTodo:   make(map[string]bool, len(classes)),
824                 classDone:   map[string]int{},
825                 mountUsed:   map[*VolumeMount]bool{},
826         }
827         for _, c := range classes {
828                 if c != "" {
829                         pr.classNeeded[c] = true
830                         pr.classTodo[c] = true
831                 }
832         }
833         return pr
834 }
835
836 // PutBlock stores the given block on one or more volumes.
837 //
838 // The MD5 checksum of the block must match the given hash.
839 //
840 // The block is written to each writable volume (ordered by priority
841 // and then UUID, see volume.go) until at least one replica has been
842 // stored in each of the requested storage classes.
843 //
844 // The returned error, if any, is a KeepError with one of the
845 // following codes:
846 //
847 // 500 Collision
848 //
849 //      A different block with the same hash already exists on this
850 //      Keep server.
851 //
852 // 422 MD5Fail
853 //
854 //      The MD5 hash of the BLOCK does not match the argument HASH.
855 //
856 // 503 Full
857 //
858 //      There was not enough space left in any Keep volume to store
859 //      the object.
860 //
861 // 500 Fail
862 //
863 //      The object could not be stored for some other reason (e.g.
864 //      all writes failed). The text of the error message should
865 //      provide as much detail as possible.
866 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
867         log := ctxlog.FromContext(ctx)
868
869         // Check that BLOCK's checksum matches HASH.
870         blockhash := fmt.Sprintf("%x", md5.Sum(block))
871         if blockhash != hash {
872                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
873                 return putProgress{}, RequestHashError
874         }
875
876         result := newPutProgress(wantStorageClasses)
877
878         // If we already have this data, it's intact on disk, and we
879         // can update its timestamp, return success. If we have
880         // different data with the same hash, return failure.
881         if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
882                 return result, err
883         }
884         if ctx.Err() != nil {
885                 return result, ErrClientDisconnect
886         }
887
888         writables := volmgr.NextWritable()
889         if len(writables) == 0 {
890                 log.Error("no writable volumes")
891                 return result, FullError
892         }
893
894         var wg sync.WaitGroup
895         var mtx sync.Mutex
896         cond := sync.Cond{L: &mtx}
897         // pending predicts what result will be if all pending writes
898         // succeed.
899         pending := result.Copy()
900         var allFull atomic.Value
901         allFull.Store(true)
902
903         // We hold the lock for the duration of the "each volume" loop
904         // below, except when it is released during cond.Wait().
905         mtx.Lock()
906
907         for _, mnt := range writables {
908                 // Wait until our decision to use this mount does not
909                 // depend on the outcome of pending writes.
910                 for result.Want(mnt) && !pending.Want(mnt) {
911                         cond.Wait()
912                 }
913                 if !result.Want(mnt) {
914                         continue
915                 }
916                 mnt := mnt
917                 pending.Add(mnt)
918                 wg.Add(1)
919                 go func() {
920                         log.Debugf("PutBlock: start write to %s", mnt.UUID)
921                         defer wg.Done()
922                         err := mnt.Put(ctx, hash, block)
923
924                         mtx.Lock()
925                         if err != nil {
926                                 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
927                                 pending.Sub(mnt)
928                         } else {
929                                 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
930                                 result.Add(mnt)
931                         }
932                         cond.Broadcast()
933                         mtx.Unlock()
934
935                         if err != nil && err != FullError && ctx.Err() == nil {
936                                 // The volume is not full but the
937                                 // write did not succeed.  Report the
938                                 // error and continue trying.
939                                 allFull.Store(false)
940                                 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
941                         }
942                 }()
943         }
944         mtx.Unlock()
945         wg.Wait()
946         if ctx.Err() != nil {
947                 return result, ErrClientDisconnect
948         }
949         if result.Done() {
950                 return result, nil
951         }
952
953         if result.totalReplication > 0 {
954                 // Some, but not all, of the storage classes were
955                 // satisfied. This qualifies as success.
956                 return result, nil
957         } else if allFull.Load().(bool) {
958                 log.Error("all volumes with qualifying storage classes are full")
959                 return putProgress{}, FullError
960         } else {
961                 // Already logged the non-full errors.
962                 return putProgress{}, GenericError
963         }
964 }
965
966 // CompareAndTouch looks for volumes where the given content already
967 // exists and its modification time can be updated (i.e., it is
968 // protected from garbage collection), and updates result accordingly.
969 // It returns when the result is Done() or all volumes have been
970 // checked.
971 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
972         log := ctxlog.FromContext(ctx)
973         for _, mnt := range volmgr.AllWritable() {
974                 if !result.Want(mnt) {
975                         continue
976                 }
977                 err := mnt.Compare(ctx, hash, buf)
978                 if ctx.Err() != nil {
979                         return nil
980                 } else if err == CollisionError {
981                         // Stop if we have a block with same hash but
982                         // different content. (It will be impossible
983                         // to tell which one is wanted if we have
984                         // both, so there's no point writing it even
985                         // on a different volume.)
986                         log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
987                         return CollisionError
988                 } else if os.IsNotExist(err) {
989                         // Block does not exist. This is the only
990                         // "normal" error: we don't log anything.
991                         continue
992                 } else if err != nil {
993                         // Couldn't open file, data is corrupt on
994                         // disk, etc.: log this abnormal condition,
995                         // and try the next volume.
996                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
997                         continue
998                 }
999                 if err := mnt.Touch(hash); err != nil {
1000                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
1001                         continue
1002                 }
1003                 // Compare and Touch both worked --> done.
1004                 result.Add(mnt)
1005                 if result.Done() {
1006                         return nil
1007                 }
1008         }
1009         return nil
1010 }
1011
1012 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1013
1014 // IsValidLocator returns true if the specified string is a valid Keep
1015 // locator.  When Keep is extended to support hash types other than
1016 // MD5, this should be updated to cover those as well.
1017 func IsValidLocator(loc string) bool {
1018         return validLocatorRe.MatchString(loc)
1019 }
1020
1021 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1022
1023 // GetAPIToken returns the OAuth2 token from the Authorization
1024 // header of a HTTP request, or an empty string if no matching
1025 // token is found.
1026 func GetAPIToken(req *http.Request) string {
1027         if auth, ok := req.Header["Authorization"]; ok {
1028                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1029                         return match[2]
1030                 }
1031         }
1032         return ""
1033 }
1034
1035 // canDelete returns true if the user identified by apiToken is
1036 // allowed to delete blocks.
1037 func (rtr *router) canDelete(apiToken string) bool {
1038         if apiToken == "" {
1039                 return false
1040         }
1041         // Blocks may be deleted only when Keep has been configured with a
1042         // data manager.
1043         if rtr.isSystemAuth(apiToken) {
1044                 return true
1045         }
1046         // TODO(twp): look up apiToken with the API server
1047         // return true if is_admin is true and if the token
1048         // has unlimited scope
1049         return false
1050 }
1051
1052 // isSystemAuth returns true if the given token is allowed to perform
1053 // system level actions like deleting data.
1054 func (rtr *router) isSystemAuth(token string) bool {
1055         return token != "" && token == rtr.cluster.SystemRootToken
1056 }