18874: Merge commit '6f8dcb2b13f3058db656908fb26b09e23b527f08' into 18874-merge-wb2
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/lib/cmd"
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "git.arvados.org/arvados.git/sdk/go/health"
28         "git.arvados.org/arvados.git/sdk/go/httpserver"
29         "github.com/gorilla/mux"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32 )
33
34 type router struct {
35         *mux.Router
36         cluster     *arvados.Cluster
37         logger      logrus.FieldLogger
38         remoteProxy remoteProxy
39         metrics     *nodeMetrics
40         volmgr      *RRVolumeManager
41         pullq       *WorkQueue
42         trashq      *WorkQueue
43 }
44
45 // MakeRESTRouter returns a new router that forwards all Keep requests
46 // to the appropriate handlers.
47 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
48         rtr := &router{
49                 Router:  mux.NewRouter(),
50                 cluster: cluster,
51                 logger:  ctxlog.FromContext(ctx),
52                 metrics: &nodeMetrics{reg: reg},
53                 volmgr:  volmgr,
54                 pullq:   pullq,
55                 trashq:  trashq,
56         }
57
58         rtr.HandleFunc(
59                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
60         rtr.HandleFunc(
61                 `/{hash:[0-9a-f]{32}}+{hints}`,
62                 rtr.handleGET).Methods("GET", "HEAD")
63
64         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
65         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
66         // List all blocks stored here. Privileged client only.
67         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
68         // List blocks stored here whose hash has the given prefix.
69         // Privileged client only.
70         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
71         // Update timestamp on existing block. Privileged client only.
72         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
73
74         // Internals/debugging info (runtime.MemStats)
75         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
76
77         // List volumes: path, device number, bytes used/avail.
78         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
79
80         // List mounts: UUID, readonly, tier, device ID, ...
81         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
82         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
83         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
84
85         // Replace the current pull queue.
86         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
87
88         // Replace the current trash queue.
89         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
90
91         // Untrash moves blocks from trash back into store
92         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
93
94         rtr.Handle("/_health/{check}", &health.Handler{
95                 Token:  cluster.ManagementToken,
96                 Prefix: "/_health/",
97         }).Methods("GET")
98
99         // Any request which does not match any of these routes gets
100         // 400 Bad Request.
101         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
102
103         rtr.metrics.setupBufferPoolMetrics(bufs)
104         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
105         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
106
107         return rtr
108 }
109
110 // BadRequestHandler is a HandleFunc to address bad requests.
111 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
112         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
113 }
114
115 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
116         locator := req.URL.Path[1:]
117         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118                 rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
119                 return
120         }
121
122         if rtr.cluster.Collections.BlobSigning {
123                 locator := req.URL.Path[1:] // strip leading slash
124                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
126                         return
127                 }
128         }
129
130         // TODO: Probe volumes to check whether the block _might_
131         // exist. Some volumes/types could support a quick existence
132         // check without causing other operations to suffer. If all
133         // volumes support that, and assure us the block definitely
134         // isn't here, we can return 404 now instead of waiting for a
135         // buffer.
136
137         buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
138         if err != nil {
139                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
140                 return
141         }
142         defer bufs.Put(buf)
143
144         size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
145         if err != nil {
146                 code := http.StatusInternalServerError
147                 if err, ok := err.(*KeepError); ok {
148                         code = err.HTTPCode
149                 }
150                 http.Error(resp, err.Error(), code)
151                 return
152         }
153
154         resp.Header().Set("Content-Length", strconv.Itoa(size))
155         resp.Header().Set("Content-Type", "application/octet-stream")
156         resp.Write(buf[:size])
157 }
158
159 // Get a buffer from the pool -- but give up and return a non-nil
160 // error if ctx ends before we get a buffer.
161 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
162         bufReady := make(chan []byte)
163         go func() {
164                 bufReady <- bufs.Get(bufSize)
165         }()
166         select {
167         case buf := <-bufReady:
168                 return buf, nil
169         case <-ctx.Done():
170                 go func() {
171                         // Even if closeNotifier happened first, we
172                         // need to keep waiting for our buf so we can
173                         // return it to the pool.
174                         bufs.Put(<-bufReady)
175                 }()
176                 return nil, ErrClientDisconnect
177         }
178 }
179
180 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
181         if !rtr.isSystemAuth(GetAPIToken(req)) {
182                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
183                 return
184         }
185         hash := mux.Vars(req)["hash"]
186         vols := rtr.volmgr.AllWritable()
187         if len(vols) == 0 {
188                 http.Error(resp, "no volumes", http.StatusNotFound)
189                 return
190         }
191         var err error
192         for _, mnt := range vols {
193                 err = mnt.Touch(hash)
194                 if err == nil {
195                         break
196                 }
197         }
198         switch {
199         case err == nil:
200                 return
201         case os.IsNotExist(err):
202                 http.Error(resp, err.Error(), http.StatusNotFound)
203         default:
204                 http.Error(resp, err.Error(), http.StatusInternalServerError)
205         }
206 }
207
208 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
209         hash := mux.Vars(req)["hash"]
210
211         // Detect as many error conditions as possible before reading
212         // the body: avoid transmitting data that will not end up
213         // being written anyway.
214
215         if req.ContentLength == -1 {
216                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
217                 return
218         }
219
220         if req.ContentLength > BlockSize {
221                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
222                 return
223         }
224
225         if len(rtr.volmgr.AllWritable()) == 0 {
226                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
227                 return
228         }
229
230         var wantStorageClasses []string
231         if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
232                 wantStorageClasses = strings.Split(hdr, ",")
233                 for i, sc := range wantStorageClasses {
234                         wantStorageClasses[i] = strings.TrimSpace(sc)
235                 }
236         } else {
237                 // none specified -- use configured default
238                 for class, cfg := range rtr.cluster.StorageClasses {
239                         if cfg.Default {
240                                 wantStorageClasses = append(wantStorageClasses, class)
241                         }
242                 }
243         }
244
245         buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
246         if err != nil {
247                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
248                 return
249         }
250
251         _, err = io.ReadFull(req.Body, buf)
252         if err != nil {
253                 http.Error(resp, err.Error(), 500)
254                 bufs.Put(buf)
255                 return
256         }
257
258         result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
259         bufs.Put(buf)
260
261         if err != nil {
262                 code := http.StatusInternalServerError
263                 if err, ok := err.(*KeepError); ok {
264                         code = err.HTTPCode
265                 }
266                 http.Error(resp, err.Error(), code)
267                 return
268         }
269
270         // Success; add a size hint, sign the locator if possible, and
271         // return it to the client.
272         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
273         apiToken := GetAPIToken(req)
274         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
275                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
276                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
277         }
278         resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
279         resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
280         resp.Write([]byte(returnHash + "\n"))
281 }
282
283 // IndexHandler responds to "/index", "/index/{prefix}", and
284 // "/mounts/{uuid}/blocks" requests.
285 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
286         if !rtr.isSystemAuth(GetAPIToken(req)) {
287                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
288                 return
289         }
290
291         prefix := mux.Vars(req)["prefix"]
292         if prefix == "" {
293                 req.ParseForm()
294                 prefix = req.Form.Get("prefix")
295         }
296
297         uuid := mux.Vars(req)["uuid"]
298
299         var vols []*VolumeMount
300         if uuid == "" {
301                 vols = rtr.volmgr.AllReadable()
302         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
303                 http.Error(resp, "mount not found", http.StatusNotFound)
304                 return
305         } else {
306                 vols = []*VolumeMount{mnt}
307         }
308
309         for _, v := range vols {
310                 if err := v.IndexTo(prefix, resp); err != nil {
311                         // We can't send an error status/message to
312                         // the client because IndexTo() might have
313                         // already written body content. All we can do
314                         // is log the error in our own logs.
315                         //
316                         // The client must notice the lack of trailing
317                         // newline as an indication that the response
318                         // is incomplete.
319                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
320                         return
321                 }
322         }
323         // An empty line at EOF is the only way the client can be
324         // assured the entire index was received.
325         resp.Write([]byte{'\n'})
326 }
327
328 // MountsHandler responds to "GET /mounts" requests.
329 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
330         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
331         if err != nil {
332                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
333         }
334 }
335
336 // PoolStatus struct
337 type PoolStatus struct {
338         Alloc uint64 `json:"BytesAllocatedCumulative"`
339         Cap   int    `json:"BuffersMax"`
340         Len   int    `json:"BuffersInUse"`
341 }
342
343 type volumeStatusEnt struct {
344         Label         string
345         Status        *VolumeStatus `json:",omitempty"`
346         VolumeStats   *ioStats      `json:",omitempty"`
347         InternalStats interface{}   `json:",omitempty"`
348 }
349
350 // NodeStatus struct
351 type NodeStatus struct {
352         Volumes         []*volumeStatusEnt
353         BufferPool      PoolStatus
354         PullQueue       WorkQueueStatus
355         TrashQueue      WorkQueueStatus
356         RequestsCurrent int
357         RequestsMax     int
358         Version         string
359 }
360
361 var st NodeStatus
362 var stLock sync.Mutex
363
364 // DebugHandler addresses /debug.json requests.
365 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
366         type debugStats struct {
367                 MemStats runtime.MemStats
368         }
369         var ds debugStats
370         runtime.ReadMemStats(&ds.MemStats)
371         data, err := json.Marshal(&ds)
372         if err != nil {
373                 http.Error(resp, err.Error(), http.StatusInternalServerError)
374                 return
375         }
376         resp.Write(data)
377 }
378
379 // StatusHandler addresses /status.json requests.
380 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
381         stLock.Lock()
382         rtr.readNodeStatus(&st)
383         data, err := json.Marshal(&st)
384         stLock.Unlock()
385         if err != nil {
386                 http.Error(resp, err.Error(), http.StatusInternalServerError)
387                 return
388         }
389         resp.Write(data)
390 }
391
392 // populate the given NodeStatus struct with current values.
393 func (rtr *router) readNodeStatus(st *NodeStatus) {
394         st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
395         vols := rtr.volmgr.AllReadable()
396         if cap(st.Volumes) < len(vols) {
397                 st.Volumes = make([]*volumeStatusEnt, len(vols))
398         }
399         st.Volumes = st.Volumes[:0]
400         for _, vol := range vols {
401                 var internalStats interface{}
402                 if vol, ok := vol.Volume.(InternalStatser); ok {
403                         internalStats = vol.InternalStats()
404                 }
405                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
406                         Label:         vol.String(),
407                         Status:        vol.Status(),
408                         InternalStats: internalStats,
409                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
410                 })
411         }
412         st.BufferPool.Alloc = bufs.Alloc()
413         st.BufferPool.Cap = bufs.Cap()
414         st.BufferPool.Len = bufs.Len()
415         st.PullQueue = getWorkQueueStatus(rtr.pullq)
416         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
417 }
418
419 // return a WorkQueueStatus for the given queue. If q is nil (which
420 // should never happen except in test suites), return a zero status
421 // value instead of crashing.
422 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
423         if q == nil {
424                 // This should only happen during tests.
425                 return WorkQueueStatus{}
426         }
427         return q.Status()
428 }
429
430 // handleDELETE processes DELETE requests.
431 //
432 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
433 // from all connected volumes.
434 //
435 // Only the Data Manager, or an Arvados admin with scope "all", are
436 // allowed to issue DELETE requests.  If a DELETE request is not
437 // authenticated or is issued by a non-admin user, the server returns
438 // a PermissionError.
439 //
440 // Upon receiving a valid request from an authorized user,
441 // handleDELETE deletes all copies of the specified block on local
442 // writable volumes.
443 //
444 // Response format:
445 //
446 // If the requested blocks was not found on any volume, the response
447 // code is HTTP 404 Not Found.
448 //
449 // Otherwise, the response code is 200 OK, with a response body
450 // consisting of the JSON message
451 //
452 //      {"copies_deleted":d,"copies_failed":f}
453 //
454 // where d and f are integers representing the number of blocks that
455 // were successfully and unsuccessfully deleted.
456 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
457         hash := mux.Vars(req)["hash"]
458
459         // Confirm that this user is an admin and has a token with unlimited scope.
460         var tok = GetAPIToken(req)
461         if tok == "" || !rtr.canDelete(tok) {
462                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
463                 return
464         }
465
466         if !rtr.cluster.Collections.BlobTrash {
467                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
468                 return
469         }
470
471         // Delete copies of this block from all available volumes.
472         // Report how many blocks were successfully deleted, and how
473         // many were found on writable volumes but not deleted.
474         var result struct {
475                 Deleted int `json:"copies_deleted"`
476                 Failed  int `json:"copies_failed"`
477         }
478         for _, vol := range rtr.volmgr.AllWritable() {
479                 if err := vol.Trash(hash); err == nil {
480                         result.Deleted++
481                 } else if os.IsNotExist(err) {
482                         continue
483                 } else {
484                         result.Failed++
485                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
486                 }
487         }
488         if result.Deleted == 0 && result.Failed == 0 {
489                 resp.WriteHeader(http.StatusNotFound)
490                 return
491         }
492         body, err := json.Marshal(result)
493         if err != nil {
494                 http.Error(resp, err.Error(), http.StatusInternalServerError)
495                 return
496         }
497         resp.Write(body)
498 }
499
500 /* PullHandler processes "PUT /pull" requests for the data manager.
501    The request body is a JSON message containing a list of pull
502    requests in the following format:
503
504    [
505       {
506          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
507          "servers":[
508                         "keep0.qr1hi.arvadosapi.com:25107",
509                         "keep1.qr1hi.arvadosapi.com:25108"
510                  ]
511           },
512           {
513                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
514                  "servers":[
515                         "10.0.1.5:25107",
516                         "10.0.1.6:25107",
517                         "10.0.1.7:25108"
518                  ]
519           },
520           ...
521    ]
522
523    Each pull request in the list consists of a block locator string
524    and an ordered list of servers.  Keepstore should try to fetch the
525    block from each server in turn.
526
527    If the request has not been sent by the Data Manager, return 401
528    Unauthorized.
529
530    If the JSON unmarshalling fails, return 400 Bad Request.
531 */
532
533 // PullRequest consists of a block locator and an ordered list of servers
534 type PullRequest struct {
535         Locator string   `json:"locator"`
536         Servers []string `json:"servers"`
537
538         // Destination mount, or "" for "anywhere"
539         MountUUID string `json:"mount_uuid"`
540 }
541
542 // PullHandler processes "PUT /pull" requests for the data manager.
543 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
544         // Reject unauthorized requests.
545         if !rtr.isSystemAuth(GetAPIToken(req)) {
546                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
547                 return
548         }
549
550         // Parse the request body.
551         var pr []PullRequest
552         r := json.NewDecoder(req.Body)
553         if err := r.Decode(&pr); err != nil {
554                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
555                 return
556         }
557
558         // We have a properly formatted pull list sent from the data
559         // manager.  Report success and send the list to the pull list
560         // manager for further handling.
561         resp.WriteHeader(http.StatusOK)
562         resp.Write([]byte(
563                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
564
565         plist := list.New()
566         for _, p := range pr {
567                 plist.PushBack(p)
568         }
569         rtr.pullq.ReplaceQueue(plist)
570 }
571
572 // TrashRequest consists of a block locator and its Mtime
573 type TrashRequest struct {
574         Locator    string `json:"locator"`
575         BlockMtime int64  `json:"block_mtime"`
576
577         // Target mount, or "" for "everywhere"
578         MountUUID string `json:"mount_uuid"`
579 }
580
581 // TrashHandler processes /trash requests.
582 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
583         // Reject unauthorized requests.
584         if !rtr.isSystemAuth(GetAPIToken(req)) {
585                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
586                 return
587         }
588
589         // Parse the request body.
590         var trash []TrashRequest
591         r := json.NewDecoder(req.Body)
592         if err := r.Decode(&trash); err != nil {
593                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
594                 return
595         }
596
597         // We have a properly formatted trash list sent from the data
598         // manager.  Report success and send the list to the trash work
599         // queue for further handling.
600         resp.WriteHeader(http.StatusOK)
601         resp.Write([]byte(
602                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
603
604         tlist := list.New()
605         for _, t := range trash {
606                 tlist.PushBack(t)
607         }
608         rtr.trashq.ReplaceQueue(tlist)
609 }
610
611 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
612 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
613         // Reject unauthorized requests.
614         if !rtr.isSystemAuth(GetAPIToken(req)) {
615                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
616                 return
617         }
618
619         log := ctxlog.FromContext(req.Context())
620         hash := mux.Vars(req)["hash"]
621
622         if len(rtr.volmgr.AllWritable()) == 0 {
623                 http.Error(resp, "No writable volumes", http.StatusNotFound)
624                 return
625         }
626
627         var untrashedOn, failedOn []string
628         var numNotFound int
629         for _, vol := range rtr.volmgr.AllWritable() {
630                 err := vol.Untrash(hash)
631
632                 if os.IsNotExist(err) {
633                         numNotFound++
634                 } else if err != nil {
635                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
636                         failedOn = append(failedOn, vol.String())
637                 } else {
638                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
639                         untrashedOn = append(untrashedOn, vol.String())
640                 }
641         }
642
643         if numNotFound == len(rtr.volmgr.AllWritable()) {
644                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
645         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
646                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
647         } else {
648                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
649                 if len(failedOn) > 0 {
650                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
651                         http.Error(resp, respBody, http.StatusInternalServerError)
652                 } else {
653                         fmt.Fprintln(resp, respBody)
654                 }
655         }
656 }
657
658 // GetBlock and PutBlock implement lower-level code for handling
659 // blocks by rooting through volumes connected to the local machine.
660 // Once the handler has determined that system policy permits the
661 // request, it calls these methods to perform the actual operation.
662 //
663 // TODO(twp): this code would probably be better located in the
664 // VolumeManager interface. As an abstraction, the VolumeManager
665 // should be the only part of the code that cares about which volume a
666 // block is stored on, so it should be responsible for figuring out
667 // which volume to check for fetching blocks, storing blocks, etc.
668
669 // GetBlock fetches the block identified by "hash" into the provided
670 // buf, and returns the data size.
671 //
672 // If the block cannot be found on any volume, returns NotFoundError.
673 //
674 // If the block found does not have the correct MD5 hash, returns
675 // DiskHashError.
676 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
677         log := ctxlog.FromContext(ctx)
678
679         // Attempt to read the requested hash from a keep volume.
680         errorToCaller := NotFoundError
681
682         for _, vol := range volmgr.AllReadable() {
683                 size, err := vol.Get(ctx, hash, buf)
684                 select {
685                 case <-ctx.Done():
686                         return 0, ErrClientDisconnect
687                 default:
688                 }
689                 if err != nil {
690                         // IsNotExist is an expected error and may be
691                         // ignored. All other errors are logged. In
692                         // any case we continue trying to read other
693                         // volumes. If all volumes report IsNotExist,
694                         // we return a NotFoundError.
695                         if !os.IsNotExist(err) {
696                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
697                         }
698                         // If some volume returns a transient error, return it to the caller
699                         // instead of "Not found" so it can retry.
700                         if err == VolumeBusyError {
701                                 errorToCaller = err.(*KeepError)
702                         }
703                         continue
704                 }
705                 // Check the file checksum.
706                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
707                 if filehash != hash {
708                         // TODO: Try harder to tell a sysadmin about
709                         // this.
710                         log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
711                         errorToCaller = DiskHashError
712                         continue
713                 }
714                 if errorToCaller == DiskHashError {
715                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
716                 }
717                 return size, nil
718         }
719         return 0, errorToCaller
720 }
721
722 type putProgress struct {
723         classNeeded      map[string]bool
724         classTodo        map[string]bool
725         mountUsed        map[*VolumeMount]bool
726         totalReplication int
727         classDone        map[string]int
728 }
729
730 // Number of distinct replicas stored. "2" can mean the block was
731 // stored on 2 different volumes with replication 1, or on 1 volume
732 // with replication 2.
733 func (pr putProgress) TotalReplication() string {
734         return strconv.Itoa(pr.totalReplication)
735 }
736
737 // Number of replicas satisfying each storage class, formatted like
738 // "default=2; special=1".
739 func (pr putProgress) ClassReplication() string {
740         s := ""
741         for k, v := range pr.classDone {
742                 if len(s) > 0 {
743                         s += ", "
744                 }
745                 s += k + "=" + strconv.Itoa(v)
746         }
747         return s
748 }
749
750 func (pr *putProgress) Add(mnt *VolumeMount) {
751         if pr.mountUsed[mnt] {
752                 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
753                 return
754         }
755         pr.mountUsed[mnt] = true
756         pr.totalReplication += mnt.Replication
757         for class := range mnt.StorageClasses {
758                 pr.classDone[class] += mnt.Replication
759                 delete(pr.classTodo, class)
760         }
761 }
762
763 func (pr *putProgress) Sub(mnt *VolumeMount) {
764         if !pr.mountUsed[mnt] {
765                 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
766                 return
767         }
768         pr.mountUsed[mnt] = false
769         pr.totalReplication -= mnt.Replication
770         for class := range mnt.StorageClasses {
771                 pr.classDone[class] -= mnt.Replication
772                 if pr.classNeeded[class] {
773                         pr.classTodo[class] = true
774                 }
775         }
776 }
777
778 func (pr *putProgress) Done() bool {
779         return len(pr.classTodo) == 0 && pr.totalReplication > 0
780 }
781
782 func (pr *putProgress) Want(mnt *VolumeMount) bool {
783         if pr.Done() || pr.mountUsed[mnt] {
784                 return false
785         }
786         if len(pr.classTodo) == 0 {
787                 // none specified == "any"
788                 return true
789         }
790         for class := range mnt.StorageClasses {
791                 if pr.classTodo[class] {
792                         return true
793                 }
794         }
795         return false
796 }
797
798 func (pr *putProgress) Copy() *putProgress {
799         cp := putProgress{
800                 classNeeded:      pr.classNeeded,
801                 classTodo:        make(map[string]bool, len(pr.classTodo)),
802                 classDone:        make(map[string]int, len(pr.classDone)),
803                 mountUsed:        make(map[*VolumeMount]bool, len(pr.mountUsed)),
804                 totalReplication: pr.totalReplication,
805         }
806         for k, v := range pr.classTodo {
807                 cp.classTodo[k] = v
808         }
809         for k, v := range pr.classDone {
810                 cp.classDone[k] = v
811         }
812         for k, v := range pr.mountUsed {
813                 cp.mountUsed[k] = v
814         }
815         return &cp
816 }
817
818 func newPutProgress(classes []string) putProgress {
819         pr := putProgress{
820                 classNeeded: make(map[string]bool, len(classes)),
821                 classTodo:   make(map[string]bool, len(classes)),
822                 classDone:   map[string]int{},
823                 mountUsed:   map[*VolumeMount]bool{},
824         }
825         for _, c := range classes {
826                 if c != "" {
827                         pr.classNeeded[c] = true
828                         pr.classTodo[c] = true
829                 }
830         }
831         return pr
832 }
833
834 // PutBlock stores the given block on one or more volumes.
835 //
836 // The MD5 checksum of the block must match the given hash.
837 //
838 // The block is written to each writable volume (ordered by priority
839 // and then UUID, see volume.go) until at least one replica has been
840 // stored in each of the requested storage classes.
841 //
842 // The returned error, if any, is a KeepError with one of the
843 // following codes:
844 //
845 // 500 Collision
846 //
847 //      A different block with the same hash already exists on this
848 //      Keep server.
849 //
850 // 422 MD5Fail
851 //
852 //      The MD5 hash of the BLOCK does not match the argument HASH.
853 //
854 // 503 Full
855 //
856 //      There was not enough space left in any Keep volume to store
857 //      the object.
858 //
859 // 500 Fail
860 //
861 //      The object could not be stored for some other reason (e.g.
862 //      all writes failed). The text of the error message should
863 //      provide as much detail as possible.
864 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
865         log := ctxlog.FromContext(ctx)
866
867         // Check that BLOCK's checksum matches HASH.
868         blockhash := fmt.Sprintf("%x", md5.Sum(block))
869         if blockhash != hash {
870                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
871                 return putProgress{}, RequestHashError
872         }
873
874         result := newPutProgress(wantStorageClasses)
875
876         // If we already have this data, it's intact on disk, and we
877         // can update its timestamp, return success. If we have
878         // different data with the same hash, return failure.
879         if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
880                 return result, err
881         }
882         if ctx.Err() != nil {
883                 return result, ErrClientDisconnect
884         }
885
886         writables := volmgr.NextWritable()
887         if len(writables) == 0 {
888                 log.Error("no writable volumes")
889                 return result, FullError
890         }
891
892         var wg sync.WaitGroup
893         var mtx sync.Mutex
894         cond := sync.Cond{L: &mtx}
895         // pending predicts what result will be if all pending writes
896         // succeed.
897         pending := result.Copy()
898         var allFull atomic.Value
899         allFull.Store(true)
900
901         // We hold the lock for the duration of the "each volume" loop
902         // below, except when it is released during cond.Wait().
903         mtx.Lock()
904
905         for _, mnt := range writables {
906                 // Wait until our decision to use this mount does not
907                 // depend on the outcome of pending writes.
908                 for result.Want(mnt) && !pending.Want(mnt) {
909                         cond.Wait()
910                 }
911                 if !result.Want(mnt) {
912                         continue
913                 }
914                 mnt := mnt
915                 pending.Add(mnt)
916                 wg.Add(1)
917                 go func() {
918                         log.Debugf("PutBlock: start write to %s", mnt.UUID)
919                         defer wg.Done()
920                         err := mnt.Put(ctx, hash, block)
921
922                         mtx.Lock()
923                         if err != nil {
924                                 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
925                                 pending.Sub(mnt)
926                         } else {
927                                 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
928                                 result.Add(mnt)
929                         }
930                         cond.Broadcast()
931                         mtx.Unlock()
932
933                         if err != nil && err != FullError && ctx.Err() == nil {
934                                 // The volume is not full but the
935                                 // write did not succeed.  Report the
936                                 // error and continue trying.
937                                 allFull.Store(false)
938                                 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
939                         }
940                 }()
941         }
942         mtx.Unlock()
943         wg.Wait()
944         if ctx.Err() != nil {
945                 return result, ErrClientDisconnect
946         }
947         if result.Done() {
948                 return result, nil
949         }
950
951         if result.totalReplication > 0 {
952                 // Some, but not all, of the storage classes were
953                 // satisfied. This qualifies as success.
954                 return result, nil
955         } else if allFull.Load().(bool) {
956                 log.Error("all volumes with qualifying storage classes are full")
957                 return putProgress{}, FullError
958         } else {
959                 // Already logged the non-full errors.
960                 return putProgress{}, GenericError
961         }
962 }
963
964 // CompareAndTouch looks for volumes where the given content already
965 // exists and its modification time can be updated (i.e., it is
966 // protected from garbage collection), and updates result accordingly.
967 // It returns when the result is Done() or all volumes have been
968 // checked.
969 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
970         log := ctxlog.FromContext(ctx)
971         for _, mnt := range volmgr.AllWritable() {
972                 if !result.Want(mnt) {
973                         continue
974                 }
975                 err := mnt.Compare(ctx, hash, buf)
976                 if ctx.Err() != nil {
977                         return nil
978                 } else if err == CollisionError {
979                         // Stop if we have a block with same hash but
980                         // different content. (It will be impossible
981                         // to tell which one is wanted if we have
982                         // both, so there's no point writing it even
983                         // on a different volume.)
984                         log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
985                         return CollisionError
986                 } else if os.IsNotExist(err) {
987                         // Block does not exist. This is the only
988                         // "normal" error: we don't log anything.
989                         continue
990                 } else if err != nil {
991                         // Couldn't open file, data is corrupt on
992                         // disk, etc.: log this abnormal condition,
993                         // and try the next volume.
994                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
995                         continue
996                 }
997                 if err := mnt.Touch(hash); err != nil {
998                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
999                         continue
1000                 }
1001                 // Compare and Touch both worked --> done.
1002                 result.Add(mnt)
1003                 if result.Done() {
1004                         return nil
1005                 }
1006         }
1007         return nil
1008 }
1009
1010 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1011
1012 // IsValidLocator returns true if the specified string is a valid Keep
1013 // locator.  When Keep is extended to support hash types other than
1014 // MD5, this should be updated to cover those as well.
1015 func IsValidLocator(loc string) bool {
1016         return validLocatorRe.MatchString(loc)
1017 }
1018
1019 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1020
1021 // GetAPIToken returns the OAuth2 token from the Authorization
1022 // header of a HTTP request, or an empty string if no matching
1023 // token is found.
1024 func GetAPIToken(req *http.Request) string {
1025         if auth, ok := req.Header["Authorization"]; ok {
1026                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1027                         return match[2]
1028                 }
1029         }
1030         return ""
1031 }
1032
1033 // canDelete returns true if the user identified by apiToken is
1034 // allowed to delete blocks.
1035 func (rtr *router) canDelete(apiToken string) bool {
1036         if apiToken == "" {
1037                 return false
1038         }
1039         // Blocks may be deleted only when Keep has been configured with a
1040         // data manager.
1041         if rtr.isSystemAuth(apiToken) {
1042                 return true
1043         }
1044         // TODO(twp): look up apiToken with the API server
1045         // return true if is_admin is true and if the token
1046         // has unlimited scope
1047         return false
1048 }
1049
1050 // isSystemAuth returns true if the given token is allowed to perform
1051 // system level actions like deleting data.
1052 func (rtr *router) isSystemAuth(token string) bool {
1053         return token != "" && token == rtr.cluster.SystemRootToken
1054 }