Merge branch 'main' into 19385-cwl-fast-pack
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/lib/cmd"
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "git.arvados.org/arvados.git/sdk/go/health"
28         "git.arvados.org/arvados.git/sdk/go/httpserver"
29         "github.com/gorilla/mux"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32 )
33
34 type router struct {
35         *mux.Router
36         cluster     *arvados.Cluster
37         logger      logrus.FieldLogger
38         remoteProxy remoteProxy
39         metrics     *nodeMetrics
40         volmgr      *RRVolumeManager
41         pullq       *WorkQueue
42         trashq      *WorkQueue
43 }
44
45 // MakeRESTRouter returns a new router that forwards all Keep requests
46 // to the appropriate handlers.
47 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
48         rtr := &router{
49                 Router:  mux.NewRouter(),
50                 cluster: cluster,
51                 logger:  ctxlog.FromContext(ctx),
52                 metrics: &nodeMetrics{reg: reg},
53                 volmgr:  volmgr,
54                 pullq:   pullq,
55                 trashq:  trashq,
56         }
57
58         rtr.HandleFunc(
59                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
60         rtr.HandleFunc(
61                 `/{hash:[0-9a-f]{32}}+{hints}`,
62                 rtr.handleGET).Methods("GET", "HEAD")
63
64         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
65         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
66         // List all blocks stored here. Privileged client only.
67         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
68         // List blocks stored here whose hash has the given prefix.
69         // Privileged client only.
70         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
71         // Update timestamp on existing block. Privileged client only.
72         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
73
74         // Internals/debugging info (runtime.MemStats)
75         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
76
77         // List volumes: path, device number, bytes used/avail.
78         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
79
80         // List mounts: UUID, readonly, tier, device ID, ...
81         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
82         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
83         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
84
85         // Replace the current pull queue.
86         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
87
88         // Replace the current trash queue.
89         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
90
91         // Untrash moves blocks from trash back into store
92         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
93
94         rtr.Handle("/_health/{check}", &health.Handler{
95                 Token:  cluster.ManagementToken,
96                 Prefix: "/_health/",
97         }).Methods("GET")
98
99         // Any request which does not match any of these routes gets
100         // 400 Bad Request.
101         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
102
103         rtr.metrics.setupBufferPoolMetrics(bufs)
104         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
105         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
106
107         return rtr
108 }
109
110 // BadRequestHandler is a HandleFunc to address bad requests.
111 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
112         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
113 }
114
115 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
116         locator := req.URL.Path[1:]
117         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118                 rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
119                 return
120         }
121
122         if rtr.cluster.Collections.BlobSigning {
123                 locator := req.URL.Path[1:] // strip leading slash
124                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
126                         return
127                 }
128         }
129
130         // TODO: Probe volumes to check whether the block _might_
131         // exist. Some volumes/types could support a quick existence
132         // check without causing other operations to suffer. If all
133         // volumes support that, and assure us the block definitely
134         // isn't here, we can return 404 now instead of waiting for a
135         // buffer.
136
137         buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
138         if err != nil {
139                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
140                 return
141         }
142         defer bufs.Put(buf)
143
144         size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
145         if err != nil {
146                 code := http.StatusInternalServerError
147                 if err, ok := err.(*KeepError); ok {
148                         code = err.HTTPCode
149                 }
150                 http.Error(resp, err.Error(), code)
151                 return
152         }
153
154         resp.Header().Set("Content-Length", strconv.Itoa(size))
155         resp.Header().Set("Content-Type", "application/octet-stream")
156         resp.Write(buf[:size])
157 }
158
159 // Get a buffer from the pool -- but give up and return a non-nil
160 // error if ctx ends before we get a buffer.
161 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
162         bufReady := make(chan []byte)
163         go func() {
164                 bufReady <- bufs.Get(bufSize)
165         }()
166         select {
167         case buf := <-bufReady:
168                 return buf, nil
169         case <-ctx.Done():
170                 go func() {
171                         // Even if closeNotifier happened first, we
172                         // need to keep waiting for our buf so we can
173                         // return it to the pool.
174                         bufs.Put(<-bufReady)
175                 }()
176                 return nil, ErrClientDisconnect
177         }
178 }
179
180 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
181         if !rtr.isSystemAuth(GetAPIToken(req)) {
182                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
183                 return
184         }
185         hash := mux.Vars(req)["hash"]
186         vols := rtr.volmgr.AllWritable()
187         if len(vols) == 0 {
188                 http.Error(resp, "no volumes", http.StatusNotFound)
189                 return
190         }
191         var err error
192         for _, mnt := range vols {
193                 err = mnt.Touch(hash)
194                 if err == nil {
195                         break
196                 }
197         }
198         switch {
199         case err == nil:
200                 return
201         case os.IsNotExist(err):
202                 http.Error(resp, err.Error(), http.StatusNotFound)
203         default:
204                 http.Error(resp, err.Error(), http.StatusInternalServerError)
205         }
206 }
207
208 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
209         hash := mux.Vars(req)["hash"]
210
211         // Detect as many error conditions as possible before reading
212         // the body: avoid transmitting data that will not end up
213         // being written anyway.
214
215         if req.ContentLength == -1 {
216                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
217                 return
218         }
219
220         if req.ContentLength > BlockSize {
221                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
222                 return
223         }
224
225         if len(rtr.volmgr.AllWritable()) == 0 {
226                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
227                 return
228         }
229
230         var wantStorageClasses []string
231         if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
232                 wantStorageClasses = strings.Split(hdr, ",")
233                 for i, sc := range wantStorageClasses {
234                         wantStorageClasses[i] = strings.TrimSpace(sc)
235                 }
236         } else {
237                 // none specified -- use configured default
238                 for class, cfg := range rtr.cluster.StorageClasses {
239                         if cfg.Default {
240                                 wantStorageClasses = append(wantStorageClasses, class)
241                         }
242                 }
243         }
244
245         buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
246         if err != nil {
247                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
248                 return
249         }
250
251         _, err = io.ReadFull(req.Body, buf)
252         if err != nil {
253                 http.Error(resp, err.Error(), 500)
254                 bufs.Put(buf)
255                 return
256         }
257
258         result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
259         bufs.Put(buf)
260
261         if err != nil {
262                 code := http.StatusInternalServerError
263                 if err, ok := err.(*KeepError); ok {
264                         code = err.HTTPCode
265                 }
266                 http.Error(resp, err.Error(), code)
267                 return
268         }
269
270         // Success; add a size hint, sign the locator if possible, and
271         // return it to the client.
272         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
273         apiToken := GetAPIToken(req)
274         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
275                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
276                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
277         }
278         resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
279         resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
280         resp.Write([]byte(returnHash + "\n"))
281 }
282
283 // IndexHandler responds to "/index", "/index/{prefix}", and
284 // "/mounts/{uuid}/blocks" requests.
285 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
286         if !rtr.isSystemAuth(GetAPIToken(req)) {
287                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
288                 return
289         }
290
291         prefix := mux.Vars(req)["prefix"]
292         if prefix == "" {
293                 req.ParseForm()
294                 prefix = req.Form.Get("prefix")
295         }
296
297         uuid := mux.Vars(req)["uuid"]
298
299         var vols []*VolumeMount
300         if uuid == "" {
301                 vols = rtr.volmgr.AllReadable()
302         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
303                 http.Error(resp, "mount not found", http.StatusNotFound)
304                 return
305         } else {
306                 vols = []*VolumeMount{mnt}
307         }
308
309         for _, v := range vols {
310                 if err := v.IndexTo(prefix, resp); err != nil {
311                         // We can't send an error status/message to
312                         // the client because IndexTo() might have
313                         // already written body content. All we can do
314                         // is log the error in our own logs.
315                         //
316                         // The client must notice the lack of trailing
317                         // newline as an indication that the response
318                         // is incomplete.
319                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
320                         return
321                 }
322         }
323         // An empty line at EOF is the only way the client can be
324         // assured the entire index was received.
325         resp.Write([]byte{'\n'})
326 }
327
328 // MountsHandler responds to "GET /mounts" requests.
329 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
330         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
331         if err != nil {
332                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
333         }
334 }
335
336 // PoolStatus struct
337 type PoolStatus struct {
338         Alloc uint64 `json:"BytesAllocatedCumulative"`
339         Cap   int    `json:"BuffersMax"`
340         Len   int    `json:"BuffersInUse"`
341 }
342
343 type volumeStatusEnt struct {
344         Label         string
345         Status        *VolumeStatus `json:",omitempty"`
346         VolumeStats   *ioStats      `json:",omitempty"`
347         InternalStats interface{}   `json:",omitempty"`
348 }
349
350 // NodeStatus struct
351 type NodeStatus struct {
352         Volumes         []*volumeStatusEnt
353         BufferPool      PoolStatus
354         PullQueue       WorkQueueStatus
355         TrashQueue      WorkQueueStatus
356         RequestsCurrent int
357         RequestsMax     int
358         Version         string
359 }
360
361 var st NodeStatus
362 var stLock sync.Mutex
363
364 // DebugHandler addresses /debug.json requests.
365 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
366         type debugStats struct {
367                 MemStats runtime.MemStats
368         }
369         var ds debugStats
370         runtime.ReadMemStats(&ds.MemStats)
371         data, err := json.Marshal(&ds)
372         if err != nil {
373                 http.Error(resp, err.Error(), http.StatusInternalServerError)
374                 return
375         }
376         resp.Write(data)
377 }
378
379 // StatusHandler addresses /status.json requests.
380 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
381         stLock.Lock()
382         rtr.readNodeStatus(&st)
383         data, err := json.Marshal(&st)
384         stLock.Unlock()
385         if err != nil {
386                 http.Error(resp, err.Error(), http.StatusInternalServerError)
387                 return
388         }
389         resp.Write(data)
390 }
391
392 // populate the given NodeStatus struct with current values.
393 func (rtr *router) readNodeStatus(st *NodeStatus) {
394         st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
395         vols := rtr.volmgr.AllReadable()
396         if cap(st.Volumes) < len(vols) {
397                 st.Volumes = make([]*volumeStatusEnt, len(vols))
398         }
399         st.Volumes = st.Volumes[:0]
400         for _, vol := range vols {
401                 var internalStats interface{}
402                 if vol, ok := vol.Volume.(InternalStatser); ok {
403                         internalStats = vol.InternalStats()
404                 }
405                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
406                         Label:         vol.String(),
407                         Status:        vol.Status(),
408                         InternalStats: internalStats,
409                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
410                 })
411         }
412         st.BufferPool.Alloc = bufs.Alloc()
413         st.BufferPool.Cap = bufs.Cap()
414         st.BufferPool.Len = bufs.Len()
415         st.PullQueue = getWorkQueueStatus(rtr.pullq)
416         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
417 }
418
419 // return a WorkQueueStatus for the given queue. If q is nil (which
420 // should never happen except in test suites), return a zero status
421 // value instead of crashing.
422 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
423         if q == nil {
424                 // This should only happen during tests.
425                 return WorkQueueStatus{}
426         }
427         return q.Status()
428 }
429
430 // handleDELETE processes DELETE requests.
431 //
432 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
433 // from all connected volumes.
434 //
435 // Only the Data Manager, or an Arvados admin with scope "all", are
436 // allowed to issue DELETE requests.  If a DELETE request is not
437 // authenticated or is issued by a non-admin user, the server returns
438 // a PermissionError.
439 //
440 // Upon receiving a valid request from an authorized user,
441 // handleDELETE deletes all copies of the specified block on local
442 // writable volumes.
443 //
444 // Response format:
445 //
446 // If the requested blocks was not found on any volume, the response
447 // code is HTTP 404 Not Found.
448 //
449 // Otherwise, the response code is 200 OK, with a response body
450 // consisting of the JSON message
451 //
452 //    {"copies_deleted":d,"copies_failed":f}
453 //
454 // where d and f are integers representing the number of blocks that
455 // were successfully and unsuccessfully deleted.
456 //
457 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
458         hash := mux.Vars(req)["hash"]
459
460         // Confirm that this user is an admin and has a token with unlimited scope.
461         var tok = GetAPIToken(req)
462         if tok == "" || !rtr.canDelete(tok) {
463                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
464                 return
465         }
466
467         if !rtr.cluster.Collections.BlobTrash {
468                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
469                 return
470         }
471
472         // Delete copies of this block from all available volumes.
473         // Report how many blocks were successfully deleted, and how
474         // many were found on writable volumes but not deleted.
475         var result struct {
476                 Deleted int `json:"copies_deleted"`
477                 Failed  int `json:"copies_failed"`
478         }
479         for _, vol := range rtr.volmgr.AllWritable() {
480                 if err := vol.Trash(hash); err == nil {
481                         result.Deleted++
482                 } else if os.IsNotExist(err) {
483                         continue
484                 } else {
485                         result.Failed++
486                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
487                 }
488         }
489         if result.Deleted == 0 && result.Failed == 0 {
490                 resp.WriteHeader(http.StatusNotFound)
491                 return
492         }
493         body, err := json.Marshal(result)
494         if err != nil {
495                 http.Error(resp, err.Error(), http.StatusInternalServerError)
496                 return
497         }
498         resp.Write(body)
499 }
500
501 /* PullHandler processes "PUT /pull" requests for the data manager.
502    The request body is a JSON message containing a list of pull
503    requests in the following format:
504
505    [
506       {
507          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
508          "servers":[
509                         "keep0.qr1hi.arvadosapi.com:25107",
510                         "keep1.qr1hi.arvadosapi.com:25108"
511                  ]
512           },
513           {
514                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
515                  "servers":[
516                         "10.0.1.5:25107",
517                         "10.0.1.6:25107",
518                         "10.0.1.7:25108"
519                  ]
520           },
521           ...
522    ]
523
524    Each pull request in the list consists of a block locator string
525    and an ordered list of servers.  Keepstore should try to fetch the
526    block from each server in turn.
527
528    If the request has not been sent by the Data Manager, return 401
529    Unauthorized.
530
531    If the JSON unmarshalling fails, return 400 Bad Request.
532 */
533
534 // PullRequest consists of a block locator and an ordered list of servers
535 type PullRequest struct {
536         Locator string   `json:"locator"`
537         Servers []string `json:"servers"`
538
539         // Destination mount, or "" for "anywhere"
540         MountUUID string `json:"mount_uuid"`
541 }
542
543 // PullHandler processes "PUT /pull" requests for the data manager.
544 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
545         // Reject unauthorized requests.
546         if !rtr.isSystemAuth(GetAPIToken(req)) {
547                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
548                 return
549         }
550
551         // Parse the request body.
552         var pr []PullRequest
553         r := json.NewDecoder(req.Body)
554         if err := r.Decode(&pr); err != nil {
555                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
556                 return
557         }
558
559         // We have a properly formatted pull list sent from the data
560         // manager.  Report success and send the list to the pull list
561         // manager for further handling.
562         resp.WriteHeader(http.StatusOK)
563         resp.Write([]byte(
564                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
565
566         plist := list.New()
567         for _, p := range pr {
568                 plist.PushBack(p)
569         }
570         rtr.pullq.ReplaceQueue(plist)
571 }
572
573 // TrashRequest consists of a block locator and its Mtime
574 type TrashRequest struct {
575         Locator    string `json:"locator"`
576         BlockMtime int64  `json:"block_mtime"`
577
578         // Target mount, or "" for "everywhere"
579         MountUUID string `json:"mount_uuid"`
580 }
581
582 // TrashHandler processes /trash requests.
583 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
584         // Reject unauthorized requests.
585         if !rtr.isSystemAuth(GetAPIToken(req)) {
586                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
587                 return
588         }
589
590         // Parse the request body.
591         var trash []TrashRequest
592         r := json.NewDecoder(req.Body)
593         if err := r.Decode(&trash); err != nil {
594                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
595                 return
596         }
597
598         // We have a properly formatted trash list sent from the data
599         // manager.  Report success and send the list to the trash work
600         // queue for further handling.
601         resp.WriteHeader(http.StatusOK)
602         resp.Write([]byte(
603                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
604
605         tlist := list.New()
606         for _, t := range trash {
607                 tlist.PushBack(t)
608         }
609         rtr.trashq.ReplaceQueue(tlist)
610 }
611
612 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
613 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
614         // Reject unauthorized requests.
615         if !rtr.isSystemAuth(GetAPIToken(req)) {
616                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
617                 return
618         }
619
620         log := ctxlog.FromContext(req.Context())
621         hash := mux.Vars(req)["hash"]
622
623         if len(rtr.volmgr.AllWritable()) == 0 {
624                 http.Error(resp, "No writable volumes", http.StatusNotFound)
625                 return
626         }
627
628         var untrashedOn, failedOn []string
629         var numNotFound int
630         for _, vol := range rtr.volmgr.AllWritable() {
631                 err := vol.Untrash(hash)
632
633                 if os.IsNotExist(err) {
634                         numNotFound++
635                 } else if err != nil {
636                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
637                         failedOn = append(failedOn, vol.String())
638                 } else {
639                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
640                         untrashedOn = append(untrashedOn, vol.String())
641                 }
642         }
643
644         if numNotFound == len(rtr.volmgr.AllWritable()) {
645                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
646         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
647                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
648         } else {
649                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
650                 if len(failedOn) > 0 {
651                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
652                         http.Error(resp, respBody, http.StatusInternalServerError)
653                 } else {
654                         fmt.Fprintln(resp, respBody)
655                 }
656         }
657 }
658
659 // GetBlock and PutBlock implement lower-level code for handling
660 // blocks by rooting through volumes connected to the local machine.
661 // Once the handler has determined that system policy permits the
662 // request, it calls these methods to perform the actual operation.
663 //
664 // TODO(twp): this code would probably be better located in the
665 // VolumeManager interface. As an abstraction, the VolumeManager
666 // should be the only part of the code that cares about which volume a
667 // block is stored on, so it should be responsible for figuring out
668 // which volume to check for fetching blocks, storing blocks, etc.
669
670 // GetBlock fetches the block identified by "hash" into the provided
671 // buf, and returns the data size.
672 //
673 // If the block cannot be found on any volume, returns NotFoundError.
674 //
675 // If the block found does not have the correct MD5 hash, returns
676 // DiskHashError.
677 //
678 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
679         log := ctxlog.FromContext(ctx)
680
681         // Attempt to read the requested hash from a keep volume.
682         errorToCaller := NotFoundError
683
684         for _, vol := range volmgr.AllReadable() {
685                 size, err := vol.Get(ctx, hash, buf)
686                 select {
687                 case <-ctx.Done():
688                         return 0, ErrClientDisconnect
689                 default:
690                 }
691                 if err != nil {
692                         // IsNotExist is an expected error and may be
693                         // ignored. All other errors are logged. In
694                         // any case we continue trying to read other
695                         // volumes. If all volumes report IsNotExist,
696                         // we return a NotFoundError.
697                         if !os.IsNotExist(err) {
698                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
699                         }
700                         // If some volume returns a transient error, return it to the caller
701                         // instead of "Not found" so it can retry.
702                         if err == VolumeBusyError {
703                                 errorToCaller = err.(*KeepError)
704                         }
705                         continue
706                 }
707                 // Check the file checksum.
708                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
709                 if filehash != hash {
710                         // TODO: Try harder to tell a sysadmin about
711                         // this.
712                         log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
713                         errorToCaller = DiskHashError
714                         continue
715                 }
716                 if errorToCaller == DiskHashError {
717                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
718                 }
719                 return size, nil
720         }
721         return 0, errorToCaller
722 }
723
724 type putProgress struct {
725         classNeeded      map[string]bool
726         classTodo        map[string]bool
727         mountUsed        map[*VolumeMount]bool
728         totalReplication int
729         classDone        map[string]int
730 }
731
732 // Number of distinct replicas stored. "2" can mean the block was
733 // stored on 2 different volumes with replication 1, or on 1 volume
734 // with replication 2.
735 func (pr putProgress) TotalReplication() string {
736         return strconv.Itoa(pr.totalReplication)
737 }
738
739 // Number of replicas satisfying each storage class, formatted like
740 // "default=2; special=1".
741 func (pr putProgress) ClassReplication() string {
742         s := ""
743         for k, v := range pr.classDone {
744                 if len(s) > 0 {
745                         s += ", "
746                 }
747                 s += k + "=" + strconv.Itoa(v)
748         }
749         return s
750 }
751
752 func (pr *putProgress) Add(mnt *VolumeMount) {
753         if pr.mountUsed[mnt] {
754                 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
755                 return
756         }
757         pr.mountUsed[mnt] = true
758         pr.totalReplication += mnt.Replication
759         for class := range mnt.StorageClasses {
760                 pr.classDone[class] += mnt.Replication
761                 delete(pr.classTodo, class)
762         }
763 }
764
765 func (pr *putProgress) Sub(mnt *VolumeMount) {
766         if !pr.mountUsed[mnt] {
767                 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
768                 return
769         }
770         pr.mountUsed[mnt] = false
771         pr.totalReplication -= mnt.Replication
772         for class := range mnt.StorageClasses {
773                 pr.classDone[class] -= mnt.Replication
774                 if pr.classNeeded[class] {
775                         pr.classTodo[class] = true
776                 }
777         }
778 }
779
780 func (pr *putProgress) Done() bool {
781         return len(pr.classTodo) == 0 && pr.totalReplication > 0
782 }
783
784 func (pr *putProgress) Want(mnt *VolumeMount) bool {
785         if pr.Done() || pr.mountUsed[mnt] {
786                 return false
787         }
788         if len(pr.classTodo) == 0 {
789                 // none specified == "any"
790                 return true
791         }
792         for class := range mnt.StorageClasses {
793                 if pr.classTodo[class] {
794                         return true
795                 }
796         }
797         return false
798 }
799
800 func (pr *putProgress) Copy() *putProgress {
801         cp := putProgress{
802                 classNeeded:      pr.classNeeded,
803                 classTodo:        make(map[string]bool, len(pr.classTodo)),
804                 classDone:        make(map[string]int, len(pr.classDone)),
805                 mountUsed:        make(map[*VolumeMount]bool, len(pr.mountUsed)),
806                 totalReplication: pr.totalReplication,
807         }
808         for k, v := range pr.classTodo {
809                 cp.classTodo[k] = v
810         }
811         for k, v := range pr.classDone {
812                 cp.classDone[k] = v
813         }
814         for k, v := range pr.mountUsed {
815                 cp.mountUsed[k] = v
816         }
817         return &cp
818 }
819
820 func newPutProgress(classes []string) putProgress {
821         pr := putProgress{
822                 classNeeded: make(map[string]bool, len(classes)),
823                 classTodo:   make(map[string]bool, len(classes)),
824                 classDone:   map[string]int{},
825                 mountUsed:   map[*VolumeMount]bool{},
826         }
827         for _, c := range classes {
828                 if c != "" {
829                         pr.classNeeded[c] = true
830                         pr.classTodo[c] = true
831                 }
832         }
833         return pr
834 }
835
836 // PutBlock stores the given block on one or more volumes.
837 //
838 // The MD5 checksum of the block must match the given hash.
839 //
840 // The block is written to each writable volume (ordered by priority
841 // and then UUID, see volume.go) until at least one replica has been
842 // stored in each of the requested storage classes.
843 //
844 // The returned error, if any, is a KeepError with one of the
845 // following codes:
846 //
847 // 500 Collision
848 //        A different block with the same hash already exists on this
849 //        Keep server.
850 // 422 MD5Fail
851 //        The MD5 hash of the BLOCK does not match the argument HASH.
852 // 503 Full
853 //        There was not enough space left in any Keep volume to store
854 //        the object.
855 // 500 Fail
856 //        The object could not be stored for some other reason (e.g.
857 //        all writes failed). The text of the error message should
858 //        provide as much detail as possible.
859 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
860         log := ctxlog.FromContext(ctx)
861
862         // Check that BLOCK's checksum matches HASH.
863         blockhash := fmt.Sprintf("%x", md5.Sum(block))
864         if blockhash != hash {
865                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
866                 return putProgress{}, RequestHashError
867         }
868
869         result := newPutProgress(wantStorageClasses)
870
871         // If we already have this data, it's intact on disk, and we
872         // can update its timestamp, return success. If we have
873         // different data with the same hash, return failure.
874         if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
875                 return result, err
876         }
877         if ctx.Err() != nil {
878                 return result, ErrClientDisconnect
879         }
880
881         writables := volmgr.NextWritable()
882         if len(writables) == 0 {
883                 log.Error("no writable volumes")
884                 return result, FullError
885         }
886
887         var wg sync.WaitGroup
888         var mtx sync.Mutex
889         cond := sync.Cond{L: &mtx}
890         // pending predicts what result will be if all pending writes
891         // succeed.
892         pending := result.Copy()
893         var allFull atomic.Value
894         allFull.Store(true)
895
896         // We hold the lock for the duration of the "each volume" loop
897         // below, except when it is released during cond.Wait().
898         mtx.Lock()
899
900         for _, mnt := range writables {
901                 // Wait until our decision to use this mount does not
902                 // depend on the outcome of pending writes.
903                 for result.Want(mnt) && !pending.Want(mnt) {
904                         cond.Wait()
905                 }
906                 if !result.Want(mnt) {
907                         continue
908                 }
909                 mnt := mnt
910                 pending.Add(mnt)
911                 wg.Add(1)
912                 go func() {
913                         log.Debugf("PutBlock: start write to %s", mnt.UUID)
914                         defer wg.Done()
915                         err := mnt.Put(ctx, hash, block)
916
917                         mtx.Lock()
918                         if err != nil {
919                                 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
920                                 pending.Sub(mnt)
921                         } else {
922                                 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
923                                 result.Add(mnt)
924                         }
925                         cond.Broadcast()
926                         mtx.Unlock()
927
928                         if err != nil && err != FullError && ctx.Err() == nil {
929                                 // The volume is not full but the
930                                 // write did not succeed.  Report the
931                                 // error and continue trying.
932                                 allFull.Store(false)
933                                 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
934                         }
935                 }()
936         }
937         mtx.Unlock()
938         wg.Wait()
939         if ctx.Err() != nil {
940                 return result, ErrClientDisconnect
941         }
942         if result.Done() {
943                 return result, nil
944         }
945
946         if result.totalReplication > 0 {
947                 // Some, but not all, of the storage classes were
948                 // satisfied. This qualifies as success.
949                 return result, nil
950         } else if allFull.Load().(bool) {
951                 log.Error("all volumes with qualifying storage classes are full")
952                 return putProgress{}, FullError
953         } else {
954                 // Already logged the non-full errors.
955                 return putProgress{}, GenericError
956         }
957 }
958
959 // CompareAndTouch looks for volumes where the given content already
960 // exists and its modification time can be updated (i.e., it is
961 // protected from garbage collection), and updates result accordingly.
962 // It returns when the result is Done() or all volumes have been
963 // checked.
964 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
965         log := ctxlog.FromContext(ctx)
966         for _, mnt := range volmgr.AllWritable() {
967                 if !result.Want(mnt) {
968                         continue
969                 }
970                 err := mnt.Compare(ctx, hash, buf)
971                 if ctx.Err() != nil {
972                         return nil
973                 } else if err == CollisionError {
974                         // Stop if we have a block with same hash but
975                         // different content. (It will be impossible
976                         // to tell which one is wanted if we have
977                         // both, so there's no point writing it even
978                         // on a different volume.)
979                         log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
980                         return CollisionError
981                 } else if os.IsNotExist(err) {
982                         // Block does not exist. This is the only
983                         // "normal" error: we don't log anything.
984                         continue
985                 } else if err != nil {
986                         // Couldn't open file, data is corrupt on
987                         // disk, etc.: log this abnormal condition,
988                         // and try the next volume.
989                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
990                         continue
991                 }
992                 if err := mnt.Touch(hash); err != nil {
993                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
994                         continue
995                 }
996                 // Compare and Touch both worked --> done.
997                 result.Add(mnt)
998                 if result.Done() {
999                         return nil
1000                 }
1001         }
1002         return nil
1003 }
1004
1005 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1006
1007 // IsValidLocator returns true if the specified string is a valid Keep locator.
1008 //   When Keep is extended to support hash types other than MD5,
1009 //   this should be updated to cover those as well.
1010 //
1011 func IsValidLocator(loc string) bool {
1012         return validLocatorRe.MatchString(loc)
1013 }
1014
1015 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1016
1017 // GetAPIToken returns the OAuth2 token from the Authorization
1018 // header of a HTTP request, or an empty string if no matching
1019 // token is found.
1020 func GetAPIToken(req *http.Request) string {
1021         if auth, ok := req.Header["Authorization"]; ok {
1022                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1023                         return match[2]
1024                 }
1025         }
1026         return ""
1027 }
1028
1029 // canDelete returns true if the user identified by apiToken is
1030 // allowed to delete blocks.
1031 func (rtr *router) canDelete(apiToken string) bool {
1032         if apiToken == "" {
1033                 return false
1034         }
1035         // Blocks may be deleted only when Keep has been configured with a
1036         // data manager.
1037         if rtr.isSystemAuth(apiToken) {
1038                 return true
1039         }
1040         // TODO(twp): look up apiToken with the API server
1041         // return true if is_admin is true and if the token
1042         // has unlimited scope
1043         return false
1044 }
1045
1046 // isSystemAuth returns true if the given token is allowed to perform
1047 // system level actions like deleting data.
1048 func (rtr *router) isSystemAuth(token string) bool {
1049         return token != "" && token == rtr.cluster.SystemRootToken
1050 }