17755: Merge branch 'main' into 17755-add-singularity-to-compute-image
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "git.arvados.org/arvados.git/sdk/go/health"
26         "git.arvados.org/arvados.git/sdk/go/httpserver"
27         "github.com/gorilla/mux"
28         "github.com/prometheus/client_golang/prometheus"
29         "github.com/sirupsen/logrus"
30 )
31
32 type router struct {
33         *mux.Router
34         cluster     *arvados.Cluster
35         logger      logrus.FieldLogger
36         remoteProxy remoteProxy
37         metrics     *nodeMetrics
38         volmgr      *RRVolumeManager
39         pullq       *WorkQueue
40         trashq      *WorkQueue
41 }
42
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
46         rtr := &router{
47                 Router:  mux.NewRouter(),
48                 cluster: cluster,
49                 logger:  ctxlog.FromContext(ctx),
50                 metrics: &nodeMetrics{reg: reg},
51                 volmgr:  volmgr,
52                 pullq:   pullq,
53                 trashq:  trashq,
54         }
55
56         rtr.HandleFunc(
57                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
58         rtr.HandleFunc(
59                 `/{hash:[0-9a-f]{32}}+{hints}`,
60                 rtr.handleGET).Methods("GET", "HEAD")
61
62         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64         // List all blocks stored here. Privileged client only.
65         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66         // List blocks stored here whose hash has the given prefix.
67         // Privileged client only.
68         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
69         // Update timestamp on existing block. Privileged client only.
70         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
71
72         // Internals/debugging info (runtime.MemStats)
73         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
74
75         // List volumes: path, device number, bytes used/avail.
76         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
77
78         // List mounts: UUID, readonly, tier, device ID, ...
79         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
80         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
81         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
82
83         // Replace the current pull queue.
84         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
85
86         // Replace the current trash queue.
87         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
88
89         // Untrash moves blocks from trash back into store
90         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
91
92         rtr.Handle("/_health/{check}", &health.Handler{
93                 Token:  cluster.ManagementToken,
94                 Prefix: "/_health/",
95         }).Methods("GET")
96
97         // Any request which does not match any of these routes gets
98         // 400 Bad Request.
99         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
100
101         rtr.metrics.setupBufferPoolMetrics(bufs)
102         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
103         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
104
105         return rtr
106 }
107
108 // BadRequestHandler is a HandleFunc to address bad requests.
109 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
110         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
111 }
112
113 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
114         ctx, cancel := contextForResponse(context.TODO(), resp)
115         defer cancel()
116
117         locator := req.URL.Path[1:]
118         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
119                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
120                 return
121         }
122
123         if rtr.cluster.Collections.BlobSigning {
124                 locator := req.URL.Path[1:] // strip leading slash
125                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
126                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
127                         return
128                 }
129         }
130
131         // TODO: Probe volumes to check whether the block _might_
132         // exist. Some volumes/types could support a quick existence
133         // check without causing other operations to suffer. If all
134         // volumes support that, and assure us the block definitely
135         // isn't here, we can return 404 now instead of waiting for a
136         // buffer.
137
138         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
139         if err != nil {
140                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
141                 return
142         }
143         defer bufs.Put(buf)
144
145         size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
146         if err != nil {
147                 code := http.StatusInternalServerError
148                 if err, ok := err.(*KeepError); ok {
149                         code = err.HTTPCode
150                 }
151                 http.Error(resp, err.Error(), code)
152                 return
153         }
154
155         resp.Header().Set("Content-Length", strconv.Itoa(size))
156         resp.Header().Set("Content-Type", "application/octet-stream")
157         resp.Write(buf[:size])
158 }
159
160 // Return a new context that gets cancelled by resp's CloseNotifier.
161 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
162         ctx, cancel := context.WithCancel(parent)
163         if cn, ok := resp.(http.CloseNotifier); ok {
164                 go func(c <-chan bool) {
165                         select {
166                         case <-c:
167                                 cancel()
168                         case <-ctx.Done():
169                         }
170                 }(cn.CloseNotify())
171         }
172         return ctx, cancel
173 }
174
175 // Get a buffer from the pool -- but give up and return a non-nil
176 // error if ctx ends before we get a buffer.
177 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
178         bufReady := make(chan []byte)
179         go func() {
180                 bufReady <- bufs.Get(bufSize)
181         }()
182         select {
183         case buf := <-bufReady:
184                 return buf, nil
185         case <-ctx.Done():
186                 go func() {
187                         // Even if closeNotifier happened first, we
188                         // need to keep waiting for our buf so we can
189                         // return it to the pool.
190                         bufs.Put(<-bufReady)
191                 }()
192                 return nil, ErrClientDisconnect
193         }
194 }
195
196 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
197         if !rtr.isSystemAuth(GetAPIToken(req)) {
198                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
199                 return
200         }
201         hash := mux.Vars(req)["hash"]
202         vols := rtr.volmgr.AllWritable()
203         if len(vols) == 0 {
204                 http.Error(resp, "no volumes", http.StatusNotFound)
205                 return
206         }
207         var err error
208         for _, mnt := range vols {
209                 err = mnt.Touch(hash)
210                 if err == nil {
211                         break
212                 }
213         }
214         switch {
215         case err == nil:
216                 return
217         case os.IsNotExist(err):
218                 http.Error(resp, err.Error(), http.StatusNotFound)
219         default:
220                 http.Error(resp, err.Error(), http.StatusInternalServerError)
221         }
222 }
223
224 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
225         ctx, cancel := contextForResponse(context.TODO(), resp)
226         defer cancel()
227
228         hash := mux.Vars(req)["hash"]
229
230         // Detect as many error conditions as possible before reading
231         // the body: avoid transmitting data that will not end up
232         // being written anyway.
233
234         if req.ContentLength == -1 {
235                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
236                 return
237         }
238
239         if req.ContentLength > BlockSize {
240                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
241                 return
242         }
243
244         if len(rtr.volmgr.AllWritable()) == 0 {
245                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
246                 return
247         }
248
249         var wantStorageClasses []string
250         if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
251                 wantStorageClasses = strings.Split(hdr, ",")
252                 for i, sc := range wantStorageClasses {
253                         wantStorageClasses[i] = strings.TrimSpace(sc)
254                 }
255         } else {
256                 // none specified -- use configured default
257                 for class, cfg := range rtr.cluster.StorageClasses {
258                         if cfg.Default {
259                                 wantStorageClasses = append(wantStorageClasses, class)
260                         }
261                 }
262         }
263
264         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
265         if err != nil {
266                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
267                 return
268         }
269
270         _, err = io.ReadFull(req.Body, buf)
271         if err != nil {
272                 http.Error(resp, err.Error(), 500)
273                 bufs.Put(buf)
274                 return
275         }
276
277         result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
278         bufs.Put(buf)
279
280         if err != nil {
281                 code := http.StatusInternalServerError
282                 if err, ok := err.(*KeepError); ok {
283                         code = err.HTTPCode
284                 }
285                 http.Error(resp, err.Error(), code)
286                 return
287         }
288
289         // Success; add a size hint, sign the locator if possible, and
290         // return it to the client.
291         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
292         apiToken := GetAPIToken(req)
293         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
294                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
295                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
296         }
297         resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
298         resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
299         resp.Write([]byte(returnHash + "\n"))
300 }
301
302 // IndexHandler responds to "/index", "/index/{prefix}", and
303 // "/mounts/{uuid}/blocks" requests.
304 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
305         if !rtr.isSystemAuth(GetAPIToken(req)) {
306                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
307                 return
308         }
309
310         prefix := mux.Vars(req)["prefix"]
311         if prefix == "" {
312                 req.ParseForm()
313                 prefix = req.Form.Get("prefix")
314         }
315
316         uuid := mux.Vars(req)["uuid"]
317
318         var vols []*VolumeMount
319         if uuid == "" {
320                 vols = rtr.volmgr.AllReadable()
321         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
322                 http.Error(resp, "mount not found", http.StatusNotFound)
323                 return
324         } else {
325                 vols = []*VolumeMount{mnt}
326         }
327
328         for _, v := range vols {
329                 if err := v.IndexTo(prefix, resp); err != nil {
330                         // We can't send an error status/message to
331                         // the client because IndexTo() might have
332                         // already written body content. All we can do
333                         // is log the error in our own logs.
334                         //
335                         // The client must notice the lack of trailing
336                         // newline as an indication that the response
337                         // is incomplete.
338                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
339                         return
340                 }
341         }
342         // An empty line at EOF is the only way the client can be
343         // assured the entire index was received.
344         resp.Write([]byte{'\n'})
345 }
346
347 // MountsHandler responds to "GET /mounts" requests.
348 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
349         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
350         if err != nil {
351                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
352         }
353 }
354
355 // PoolStatus struct
356 type PoolStatus struct {
357         Alloc uint64 `json:"BytesAllocatedCumulative"`
358         Cap   int    `json:"BuffersMax"`
359         Len   int    `json:"BuffersInUse"`
360 }
361
362 type volumeStatusEnt struct {
363         Label         string
364         Status        *VolumeStatus `json:",omitempty"`
365         VolumeStats   *ioStats      `json:",omitempty"`
366         InternalStats interface{}   `json:",omitempty"`
367 }
368
369 // NodeStatus struct
370 type NodeStatus struct {
371         Volumes         []*volumeStatusEnt
372         BufferPool      PoolStatus
373         PullQueue       WorkQueueStatus
374         TrashQueue      WorkQueueStatus
375         RequestsCurrent int
376         RequestsMax     int
377         Version         string
378 }
379
380 var st NodeStatus
381 var stLock sync.Mutex
382
383 // DebugHandler addresses /debug.json requests.
384 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
385         type debugStats struct {
386                 MemStats runtime.MemStats
387         }
388         var ds debugStats
389         runtime.ReadMemStats(&ds.MemStats)
390         data, err := json.Marshal(&ds)
391         if err != nil {
392                 http.Error(resp, err.Error(), http.StatusInternalServerError)
393                 return
394         }
395         resp.Write(data)
396 }
397
398 // StatusHandler addresses /status.json requests.
399 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
400         stLock.Lock()
401         rtr.readNodeStatus(&st)
402         data, err := json.Marshal(&st)
403         stLock.Unlock()
404         if err != nil {
405                 http.Error(resp, err.Error(), http.StatusInternalServerError)
406                 return
407         }
408         resp.Write(data)
409 }
410
411 // populate the given NodeStatus struct with current values.
412 func (rtr *router) readNodeStatus(st *NodeStatus) {
413         st.Version = version
414         vols := rtr.volmgr.AllReadable()
415         if cap(st.Volumes) < len(vols) {
416                 st.Volumes = make([]*volumeStatusEnt, len(vols))
417         }
418         st.Volumes = st.Volumes[:0]
419         for _, vol := range vols {
420                 var internalStats interface{}
421                 if vol, ok := vol.Volume.(InternalStatser); ok {
422                         internalStats = vol.InternalStats()
423                 }
424                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
425                         Label:         vol.String(),
426                         Status:        vol.Status(),
427                         InternalStats: internalStats,
428                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
429                 })
430         }
431         st.BufferPool.Alloc = bufs.Alloc()
432         st.BufferPool.Cap = bufs.Cap()
433         st.BufferPool.Len = bufs.Len()
434         st.PullQueue = getWorkQueueStatus(rtr.pullq)
435         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
436 }
437
438 // return a WorkQueueStatus for the given queue. If q is nil (which
439 // should never happen except in test suites), return a zero status
440 // value instead of crashing.
441 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
442         if q == nil {
443                 // This should only happen during tests.
444                 return WorkQueueStatus{}
445         }
446         return q.Status()
447 }
448
449 // handleDELETE processes DELETE requests.
450 //
451 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
452 // from all connected volumes.
453 //
454 // Only the Data Manager, or an Arvados admin with scope "all", are
455 // allowed to issue DELETE requests.  If a DELETE request is not
456 // authenticated or is issued by a non-admin user, the server returns
457 // a PermissionError.
458 //
459 // Upon receiving a valid request from an authorized user,
460 // handleDELETE deletes all copies of the specified block on local
461 // writable volumes.
462 //
463 // Response format:
464 //
465 // If the requested blocks was not found on any volume, the response
466 // code is HTTP 404 Not Found.
467 //
468 // Otherwise, the response code is 200 OK, with a response body
469 // consisting of the JSON message
470 //
471 //    {"copies_deleted":d,"copies_failed":f}
472 //
473 // where d and f are integers representing the number of blocks that
474 // were successfully and unsuccessfully deleted.
475 //
476 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
477         hash := mux.Vars(req)["hash"]
478
479         // Confirm that this user is an admin and has a token with unlimited scope.
480         var tok = GetAPIToken(req)
481         if tok == "" || !rtr.canDelete(tok) {
482                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
483                 return
484         }
485
486         if !rtr.cluster.Collections.BlobTrash {
487                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
488                 return
489         }
490
491         // Delete copies of this block from all available volumes.
492         // Report how many blocks were successfully deleted, and how
493         // many were found on writable volumes but not deleted.
494         var result struct {
495                 Deleted int `json:"copies_deleted"`
496                 Failed  int `json:"copies_failed"`
497         }
498         for _, vol := range rtr.volmgr.AllWritable() {
499                 if err := vol.Trash(hash); err == nil {
500                         result.Deleted++
501                 } else if os.IsNotExist(err) {
502                         continue
503                 } else {
504                         result.Failed++
505                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
506                 }
507         }
508         if result.Deleted == 0 && result.Failed == 0 {
509                 resp.WriteHeader(http.StatusNotFound)
510                 return
511         }
512         body, err := json.Marshal(result)
513         if err != nil {
514                 http.Error(resp, err.Error(), http.StatusInternalServerError)
515                 return
516         }
517         resp.Write(body)
518 }
519
520 /* PullHandler processes "PUT /pull" requests for the data manager.
521    The request body is a JSON message containing a list of pull
522    requests in the following format:
523
524    [
525       {
526          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
527          "servers":[
528                         "keep0.qr1hi.arvadosapi.com:25107",
529                         "keep1.qr1hi.arvadosapi.com:25108"
530                  ]
531           },
532           {
533                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
534                  "servers":[
535                         "10.0.1.5:25107",
536                         "10.0.1.6:25107",
537                         "10.0.1.7:25108"
538                  ]
539           },
540           ...
541    ]
542
543    Each pull request in the list consists of a block locator string
544    and an ordered list of servers.  Keepstore should try to fetch the
545    block from each server in turn.
546
547    If the request has not been sent by the Data Manager, return 401
548    Unauthorized.
549
550    If the JSON unmarshalling fails, return 400 Bad Request.
551 */
552
553 // PullRequest consists of a block locator and an ordered list of servers
554 type PullRequest struct {
555         Locator string   `json:"locator"`
556         Servers []string `json:"servers"`
557
558         // Destination mount, or "" for "anywhere"
559         MountUUID string `json:"mount_uuid"`
560 }
561
562 // PullHandler processes "PUT /pull" requests for the data manager.
563 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
564         // Reject unauthorized requests.
565         if !rtr.isSystemAuth(GetAPIToken(req)) {
566                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
567                 return
568         }
569
570         // Parse the request body.
571         var pr []PullRequest
572         r := json.NewDecoder(req.Body)
573         if err := r.Decode(&pr); err != nil {
574                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
575                 return
576         }
577
578         // We have a properly formatted pull list sent from the data
579         // manager.  Report success and send the list to the pull list
580         // manager for further handling.
581         resp.WriteHeader(http.StatusOK)
582         resp.Write([]byte(
583                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
584
585         plist := list.New()
586         for _, p := range pr {
587                 plist.PushBack(p)
588         }
589         rtr.pullq.ReplaceQueue(plist)
590 }
591
592 // TrashRequest consists of a block locator and its Mtime
593 type TrashRequest struct {
594         Locator    string `json:"locator"`
595         BlockMtime int64  `json:"block_mtime"`
596
597         // Target mount, or "" for "everywhere"
598         MountUUID string `json:"mount_uuid"`
599 }
600
601 // TrashHandler processes /trash requests.
602 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
603         // Reject unauthorized requests.
604         if !rtr.isSystemAuth(GetAPIToken(req)) {
605                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
606                 return
607         }
608
609         // Parse the request body.
610         var trash []TrashRequest
611         r := json.NewDecoder(req.Body)
612         if err := r.Decode(&trash); err != nil {
613                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
614                 return
615         }
616
617         // We have a properly formatted trash list sent from the data
618         // manager.  Report success and send the list to the trash work
619         // queue for further handling.
620         resp.WriteHeader(http.StatusOK)
621         resp.Write([]byte(
622                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
623
624         tlist := list.New()
625         for _, t := range trash {
626                 tlist.PushBack(t)
627         }
628         rtr.trashq.ReplaceQueue(tlist)
629 }
630
631 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
632 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
633         // Reject unauthorized requests.
634         if !rtr.isSystemAuth(GetAPIToken(req)) {
635                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
636                 return
637         }
638
639         log := ctxlog.FromContext(req.Context())
640         hash := mux.Vars(req)["hash"]
641
642         if len(rtr.volmgr.AllWritable()) == 0 {
643                 http.Error(resp, "No writable volumes", http.StatusNotFound)
644                 return
645         }
646
647         var untrashedOn, failedOn []string
648         var numNotFound int
649         for _, vol := range rtr.volmgr.AllWritable() {
650                 err := vol.Untrash(hash)
651
652                 if os.IsNotExist(err) {
653                         numNotFound++
654                 } else if err != nil {
655                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
656                         failedOn = append(failedOn, vol.String())
657                 } else {
658                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
659                         untrashedOn = append(untrashedOn, vol.String())
660                 }
661         }
662
663         if numNotFound == len(rtr.volmgr.AllWritable()) {
664                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
665         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
666                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
667         } else {
668                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
669                 if len(failedOn) > 0 {
670                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
671                         http.Error(resp, respBody, http.StatusInternalServerError)
672                 } else {
673                         fmt.Fprintln(resp, respBody)
674                 }
675         }
676 }
677
678 // GetBlock and PutBlock implement lower-level code for handling
679 // blocks by rooting through volumes connected to the local machine.
680 // Once the handler has determined that system policy permits the
681 // request, it calls these methods to perform the actual operation.
682 //
683 // TODO(twp): this code would probably be better located in the
684 // VolumeManager interface. As an abstraction, the VolumeManager
685 // should be the only part of the code that cares about which volume a
686 // block is stored on, so it should be responsible for figuring out
687 // which volume to check for fetching blocks, storing blocks, etc.
688
689 // GetBlock fetches the block identified by "hash" into the provided
690 // buf, and returns the data size.
691 //
692 // If the block cannot be found on any volume, returns NotFoundError.
693 //
694 // If the block found does not have the correct MD5 hash, returns
695 // DiskHashError.
696 //
697 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
698         log := ctxlog.FromContext(ctx)
699
700         // Attempt to read the requested hash from a keep volume.
701         errorToCaller := NotFoundError
702
703         for _, vol := range volmgr.AllReadable() {
704                 size, err := vol.Get(ctx, hash, buf)
705                 select {
706                 case <-ctx.Done():
707                         return 0, ErrClientDisconnect
708                 default:
709                 }
710                 if err != nil {
711                         // IsNotExist is an expected error and may be
712                         // ignored. All other errors are logged. In
713                         // any case we continue trying to read other
714                         // volumes. If all volumes report IsNotExist,
715                         // we return a NotFoundError.
716                         if !os.IsNotExist(err) {
717                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
718                         }
719                         // If some volume returns a transient error, return it to the caller
720                         // instead of "Not found" so it can retry.
721                         if err == VolumeBusyError {
722                                 errorToCaller = err.(*KeepError)
723                         }
724                         continue
725                 }
726                 // Check the file checksum.
727                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
728                 if filehash != hash {
729                         // TODO: Try harder to tell a sysadmin about
730                         // this.
731                         log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
732                         errorToCaller = DiskHashError
733                         continue
734                 }
735                 if errorToCaller == DiskHashError {
736                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
737                 }
738                 return size, nil
739         }
740         return 0, errorToCaller
741 }
742
743 type putProgress struct {
744         classTodo        map[string]bool
745         mountUsed        map[*VolumeMount]bool
746         totalReplication int
747         classDone        map[string]int
748 }
749
750 // Number of distinct replicas stored. "2" can mean the block was
751 // stored on 2 different volumes with replication 1, or on 1 volume
752 // with replication 2.
753 func (pr putProgress) TotalReplication() string {
754         return strconv.Itoa(pr.totalReplication)
755 }
756
757 // Number of replicas satisfying each storage class, formatted like
758 // "default=2; special=1".
759 func (pr putProgress) ClassReplication() string {
760         s := ""
761         for k, v := range pr.classDone {
762                 if len(s) > 0 {
763                         s += ", "
764                 }
765                 s += k + "=" + strconv.Itoa(v)
766         }
767         return s
768 }
769
770 func (pr *putProgress) Add(mnt *VolumeMount) {
771         if pr.mountUsed[mnt] {
772                 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
773                 return
774         }
775         pr.mountUsed[mnt] = true
776         pr.totalReplication += mnt.Replication
777         for class := range mnt.StorageClasses {
778                 pr.classDone[class] += mnt.Replication
779                 delete(pr.classTodo, class)
780         }
781 }
782
783 func (pr *putProgress) Done() bool {
784         return len(pr.classTodo) == 0 && pr.totalReplication > 0
785 }
786
787 func (pr *putProgress) Want(mnt *VolumeMount) bool {
788         if pr.Done() || pr.mountUsed[mnt] {
789                 return false
790         }
791         if len(pr.classTodo) == 0 {
792                 // none specified == "any"
793                 return true
794         }
795         for class := range mnt.StorageClasses {
796                 if pr.classTodo[class] {
797                         return true
798                 }
799         }
800         return false
801 }
802
803 func newPutResult(classes []string) putProgress {
804         pr := putProgress{
805                 classTodo: make(map[string]bool, len(classes)),
806                 classDone: map[string]int{},
807                 mountUsed: map[*VolumeMount]bool{},
808         }
809         for _, c := range classes {
810                 if c != "" {
811                         pr.classTodo[c] = true
812                 }
813         }
814         return pr
815 }
816
817 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
818 //
819 // PutBlock(ctx, block, hash)
820 //   Stores the BLOCK (identified by the content id HASH) in Keep.
821 //
822 //   The MD5 checksum of the block must be identical to the content id HASH.
823 //   If not, an error is returned.
824 //
825 //   PutBlock stores the BLOCK on the first Keep volume with free space.
826 //   A failure code is returned to the user only if all volumes fail.
827 //
828 //   On success, PutBlock returns nil.
829 //   On failure, it returns a KeepError with one of the following codes:
830 //
831 //   500 Collision
832 //          A different block with the same hash already exists on this
833 //          Keep server.
834 //   422 MD5Fail
835 //          The MD5 hash of the BLOCK does not match the argument HASH.
836 //   503 Full
837 //          There was not enough space left in any Keep volume to store
838 //          the object.
839 //   500 Fail
840 //          The object could not be stored for some other reason (e.g.
841 //          all writes failed). The text of the error message should
842 //          provide as much detail as possible.
843 //
844 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
845         log := ctxlog.FromContext(ctx)
846
847         // Check that BLOCK's checksum matches HASH.
848         blockhash := fmt.Sprintf("%x", md5.Sum(block))
849         if blockhash != hash {
850                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
851                 return putProgress{}, RequestHashError
852         }
853
854         result := newPutResult(wantStorageClasses)
855
856         // If we already have this data, it's intact on disk, and we
857         // can update its timestamp, return success. If we have
858         // different data with the same hash, return failure.
859         if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
860                 return result, err
861         }
862         if ctx.Err() != nil {
863                 return result, ErrClientDisconnect
864         }
865
866         // Choose a Keep volume to write to.
867         // If this volume fails, try all of the volumes in order.
868         if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
869                 // fall through to "try all volumes" below
870         } else if err := mnt.Put(ctx, hash, block); err != nil {
871                 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
872         } else {
873                 result.Add(mnt)
874                 if result.Done() {
875                         return result, nil
876                 }
877         }
878         if ctx.Err() != nil {
879                 return putProgress{}, ErrClientDisconnect
880         }
881
882         writables := volmgr.AllWritable()
883         if len(writables) == 0 {
884                 log.Error("no writable volumes")
885                 return putProgress{}, FullError
886         }
887
888         allFull := true
889         for _, mnt := range writables {
890                 if !result.Want(mnt) {
891                         continue
892                 }
893                 err := mnt.Put(ctx, hash, block)
894                 if ctx.Err() != nil {
895                         return result, ErrClientDisconnect
896                 }
897                 switch err {
898                 case nil:
899                         result.Add(mnt)
900                         if result.Done() {
901                                 return result, nil
902                         }
903                         continue
904                 case FullError:
905                         continue
906                 default:
907                         // The volume is not full but the
908                         // write did not succeed.  Report the
909                         // error and continue trying.
910                         allFull = false
911                         log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
912                 }
913         }
914
915         if result.totalReplication > 0 {
916                 // Some, but not all, of the storage classes were
917                 // satisfied. This qualifies as success.
918                 return result, nil
919         } else if allFull {
920                 log.Error("all volumes with qualifying storage classes are full")
921                 return putProgress{}, FullError
922         } else {
923                 // Already logged the non-full errors.
924                 return putProgress{}, GenericError
925         }
926 }
927
928 // CompareAndTouch looks for volumes where the given content already
929 // exists and its modification time can be updated (i.e., it is
930 // protected from garbage collection), and updates result accordingly.
931 // It returns when the result is Done() or all volumes have been
932 // checked.
933 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
934         log := ctxlog.FromContext(ctx)
935         for _, mnt := range volmgr.AllWritable() {
936                 if !result.Want(mnt) {
937                         continue
938                 }
939                 err := mnt.Compare(ctx, hash, buf)
940                 if ctx.Err() != nil {
941                         return nil
942                 } else if err == CollisionError {
943                         // Stop if we have a block with same hash but
944                         // different content. (It will be impossible
945                         // to tell which one is wanted if we have
946                         // both, so there's no point writing it even
947                         // on a different volume.)
948                         log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
949                         return CollisionError
950                 } else if os.IsNotExist(err) {
951                         // Block does not exist. This is the only
952                         // "normal" error: we don't log anything.
953                         continue
954                 } else if err != nil {
955                         // Couldn't open file, data is corrupt on
956                         // disk, etc.: log this abnormal condition,
957                         // and try the next volume.
958                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
959                         continue
960                 }
961                 if err := mnt.Touch(hash); err != nil {
962                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
963                         continue
964                 }
965                 // Compare and Touch both worked --> done.
966                 result.Add(mnt)
967                 if result.Done() {
968                         return nil
969                 }
970         }
971         return nil
972 }
973
974 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
975
976 // IsValidLocator returns true if the specified string is a valid Keep locator.
977 //   When Keep is extended to support hash types other than MD5,
978 //   this should be updated to cover those as well.
979 //
980 func IsValidLocator(loc string) bool {
981         return validLocatorRe.MatchString(loc)
982 }
983
984 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
985
986 // GetAPIToken returns the OAuth2 token from the Authorization
987 // header of a HTTP request, or an empty string if no matching
988 // token is found.
989 func GetAPIToken(req *http.Request) string {
990         if auth, ok := req.Header["Authorization"]; ok {
991                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
992                         return match[2]
993                 }
994         }
995         return ""
996 }
997
998 // canDelete returns true if the user identified by apiToken is
999 // allowed to delete blocks.
1000 func (rtr *router) canDelete(apiToken string) bool {
1001         if apiToken == "" {
1002                 return false
1003         }
1004         // Blocks may be deleted only when Keep has been configured with a
1005         // data manager.
1006         if rtr.isSystemAuth(apiToken) {
1007                 return true
1008         }
1009         // TODO(twp): look up apiToken with the API server
1010         // return true if is_admin is true and if the token
1011         // has unlimited scope
1012         return false
1013 }
1014
1015 // isSystemAuth returns true if the given token is allowed to perform
1016 // system level actions like deleting data.
1017 func (rtr *router) isSystemAuth(token string) bool {
1018         return token != "" && token == rtr.cluster.SystemRootToken
1019 }