Merge branch '14018-acr-set-container-properties' into main
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/ctxlog"
26         "git.arvados.org/arvados.git/sdk/go/health"
27         "git.arvados.org/arvados.git/sdk/go/httpserver"
28         "github.com/gorilla/mux"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31 )
32
33 type router struct {
34         *mux.Router
35         cluster     *arvados.Cluster
36         logger      logrus.FieldLogger
37         remoteProxy remoteProxy
38         metrics     *nodeMetrics
39         volmgr      *RRVolumeManager
40         pullq       *WorkQueue
41         trashq      *WorkQueue
42 }
43
44 // MakeRESTRouter returns a new router that forwards all Keep requests
45 // to the appropriate handlers.
46 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
47         rtr := &router{
48                 Router:  mux.NewRouter(),
49                 cluster: cluster,
50                 logger:  ctxlog.FromContext(ctx),
51                 metrics: &nodeMetrics{reg: reg},
52                 volmgr:  volmgr,
53                 pullq:   pullq,
54                 trashq:  trashq,
55         }
56
57         rtr.HandleFunc(
58                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
59         rtr.HandleFunc(
60                 `/{hash:[0-9a-f]{32}}+{hints}`,
61                 rtr.handleGET).Methods("GET", "HEAD")
62
63         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
64         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
65         // List all blocks stored here. Privileged client only.
66         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
67         // List blocks stored here whose hash has the given prefix.
68         // Privileged client only.
69         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
70         // Update timestamp on existing block. Privileged client only.
71         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
72
73         // Internals/debugging info (runtime.MemStats)
74         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
75
76         // List volumes: path, device number, bytes used/avail.
77         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
78
79         // List mounts: UUID, readonly, tier, device ID, ...
80         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
81         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
82         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
83
84         // Replace the current pull queue.
85         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
86
87         // Replace the current trash queue.
88         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
89
90         // Untrash moves blocks from trash back into store
91         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
92
93         rtr.Handle("/_health/{check}", &health.Handler{
94                 Token:  cluster.ManagementToken,
95                 Prefix: "/_health/",
96         }).Methods("GET")
97
98         // Any request which does not match any of these routes gets
99         // 400 Bad Request.
100         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
101
102         rtr.metrics.setupBufferPoolMetrics(bufs)
103         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
104         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
105
106         return rtr
107 }
108
109 // BadRequestHandler is a HandleFunc to address bad requests.
110 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
111         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
112 }
113
114 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
115         ctx, cancel := contextForResponse(context.TODO(), resp)
116         defer cancel()
117
118         locator := req.URL.Path[1:]
119         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
120                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
121                 return
122         }
123
124         if rtr.cluster.Collections.BlobSigning {
125                 locator := req.URL.Path[1:] // strip leading slash
126                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
127                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
128                         return
129                 }
130         }
131
132         // TODO: Probe volumes to check whether the block _might_
133         // exist. Some volumes/types could support a quick existence
134         // check without causing other operations to suffer. If all
135         // volumes support that, and assure us the block definitely
136         // isn't here, we can return 404 now instead of waiting for a
137         // buffer.
138
139         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
140         if err != nil {
141                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
142                 return
143         }
144         defer bufs.Put(buf)
145
146         size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
147         if err != nil {
148                 code := http.StatusInternalServerError
149                 if err, ok := err.(*KeepError); ok {
150                         code = err.HTTPCode
151                 }
152                 http.Error(resp, err.Error(), code)
153                 return
154         }
155
156         resp.Header().Set("Content-Length", strconv.Itoa(size))
157         resp.Header().Set("Content-Type", "application/octet-stream")
158         resp.Write(buf[:size])
159 }
160
161 // Return a new context that gets cancelled by resp's CloseNotifier.
162 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
163         ctx, cancel := context.WithCancel(parent)
164         if cn, ok := resp.(http.CloseNotifier); ok {
165                 go func(c <-chan bool) {
166                         select {
167                         case <-c:
168                                 cancel()
169                         case <-ctx.Done():
170                         }
171                 }(cn.CloseNotify())
172         }
173         return ctx, cancel
174 }
175
176 // Get a buffer from the pool -- but give up and return a non-nil
177 // error if ctx ends before we get a buffer.
178 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
179         bufReady := make(chan []byte)
180         go func() {
181                 bufReady <- bufs.Get(bufSize)
182         }()
183         select {
184         case buf := <-bufReady:
185                 return buf, nil
186         case <-ctx.Done():
187                 go func() {
188                         // Even if closeNotifier happened first, we
189                         // need to keep waiting for our buf so we can
190                         // return it to the pool.
191                         bufs.Put(<-bufReady)
192                 }()
193                 return nil, ErrClientDisconnect
194         }
195 }
196
197 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
198         if !rtr.isSystemAuth(GetAPIToken(req)) {
199                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
200                 return
201         }
202         hash := mux.Vars(req)["hash"]
203         vols := rtr.volmgr.AllWritable()
204         if len(vols) == 0 {
205                 http.Error(resp, "no volumes", http.StatusNotFound)
206                 return
207         }
208         var err error
209         for _, mnt := range vols {
210                 err = mnt.Touch(hash)
211                 if err == nil {
212                         break
213                 }
214         }
215         switch {
216         case err == nil:
217                 return
218         case os.IsNotExist(err):
219                 http.Error(resp, err.Error(), http.StatusNotFound)
220         default:
221                 http.Error(resp, err.Error(), http.StatusInternalServerError)
222         }
223 }
224
225 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
226         ctx, cancel := contextForResponse(context.TODO(), resp)
227         defer cancel()
228
229         hash := mux.Vars(req)["hash"]
230
231         // Detect as many error conditions as possible before reading
232         // the body: avoid transmitting data that will not end up
233         // being written anyway.
234
235         if req.ContentLength == -1 {
236                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
237                 return
238         }
239
240         if req.ContentLength > BlockSize {
241                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
242                 return
243         }
244
245         if len(rtr.volmgr.AllWritable()) == 0 {
246                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
247                 return
248         }
249
250         var wantStorageClasses []string
251         if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
252                 wantStorageClasses = strings.Split(hdr, ",")
253                 for i, sc := range wantStorageClasses {
254                         wantStorageClasses[i] = strings.TrimSpace(sc)
255                 }
256         } else {
257                 // none specified -- use configured default
258                 for class, cfg := range rtr.cluster.StorageClasses {
259                         if cfg.Default {
260                                 wantStorageClasses = append(wantStorageClasses, class)
261                         }
262                 }
263         }
264
265         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
266         if err != nil {
267                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
268                 return
269         }
270
271         _, err = io.ReadFull(req.Body, buf)
272         if err != nil {
273                 http.Error(resp, err.Error(), 500)
274                 bufs.Put(buf)
275                 return
276         }
277
278         result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
279         bufs.Put(buf)
280
281         if err != nil {
282                 code := http.StatusInternalServerError
283                 if err, ok := err.(*KeepError); ok {
284                         code = err.HTTPCode
285                 }
286                 http.Error(resp, err.Error(), code)
287                 return
288         }
289
290         // Success; add a size hint, sign the locator if possible, and
291         // return it to the client.
292         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
293         apiToken := GetAPIToken(req)
294         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
295                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
296                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
297         }
298         resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
299         resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
300         resp.Write([]byte(returnHash + "\n"))
301 }
302
303 // IndexHandler responds to "/index", "/index/{prefix}", and
304 // "/mounts/{uuid}/blocks" requests.
305 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
306         if !rtr.isSystemAuth(GetAPIToken(req)) {
307                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
308                 return
309         }
310
311         prefix := mux.Vars(req)["prefix"]
312         if prefix == "" {
313                 req.ParseForm()
314                 prefix = req.Form.Get("prefix")
315         }
316
317         uuid := mux.Vars(req)["uuid"]
318
319         var vols []*VolumeMount
320         if uuid == "" {
321                 vols = rtr.volmgr.AllReadable()
322         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
323                 http.Error(resp, "mount not found", http.StatusNotFound)
324                 return
325         } else {
326                 vols = []*VolumeMount{mnt}
327         }
328
329         for _, v := range vols {
330                 if err := v.IndexTo(prefix, resp); err != nil {
331                         // We can't send an error status/message to
332                         // the client because IndexTo() might have
333                         // already written body content. All we can do
334                         // is log the error in our own logs.
335                         //
336                         // The client must notice the lack of trailing
337                         // newline as an indication that the response
338                         // is incomplete.
339                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
340                         return
341                 }
342         }
343         // An empty line at EOF is the only way the client can be
344         // assured the entire index was received.
345         resp.Write([]byte{'\n'})
346 }
347
348 // MountsHandler responds to "GET /mounts" requests.
349 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
350         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
351         if err != nil {
352                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
353         }
354 }
355
356 // PoolStatus struct
357 type PoolStatus struct {
358         Alloc uint64 `json:"BytesAllocatedCumulative"`
359         Cap   int    `json:"BuffersMax"`
360         Len   int    `json:"BuffersInUse"`
361 }
362
363 type volumeStatusEnt struct {
364         Label         string
365         Status        *VolumeStatus `json:",omitempty"`
366         VolumeStats   *ioStats      `json:",omitempty"`
367         InternalStats interface{}   `json:",omitempty"`
368 }
369
370 // NodeStatus struct
371 type NodeStatus struct {
372         Volumes         []*volumeStatusEnt
373         BufferPool      PoolStatus
374         PullQueue       WorkQueueStatus
375         TrashQueue      WorkQueueStatus
376         RequestsCurrent int
377         RequestsMax     int
378         Version         string
379 }
380
381 var st NodeStatus
382 var stLock sync.Mutex
383
384 // DebugHandler addresses /debug.json requests.
385 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
386         type debugStats struct {
387                 MemStats runtime.MemStats
388         }
389         var ds debugStats
390         runtime.ReadMemStats(&ds.MemStats)
391         data, err := json.Marshal(&ds)
392         if err != nil {
393                 http.Error(resp, err.Error(), http.StatusInternalServerError)
394                 return
395         }
396         resp.Write(data)
397 }
398
399 // StatusHandler addresses /status.json requests.
400 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
401         stLock.Lock()
402         rtr.readNodeStatus(&st)
403         data, err := json.Marshal(&st)
404         stLock.Unlock()
405         if err != nil {
406                 http.Error(resp, err.Error(), http.StatusInternalServerError)
407                 return
408         }
409         resp.Write(data)
410 }
411
412 // populate the given NodeStatus struct with current values.
413 func (rtr *router) readNodeStatus(st *NodeStatus) {
414         st.Version = version
415         vols := rtr.volmgr.AllReadable()
416         if cap(st.Volumes) < len(vols) {
417                 st.Volumes = make([]*volumeStatusEnt, len(vols))
418         }
419         st.Volumes = st.Volumes[:0]
420         for _, vol := range vols {
421                 var internalStats interface{}
422                 if vol, ok := vol.Volume.(InternalStatser); ok {
423                         internalStats = vol.InternalStats()
424                 }
425                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
426                         Label:         vol.String(),
427                         Status:        vol.Status(),
428                         InternalStats: internalStats,
429                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
430                 })
431         }
432         st.BufferPool.Alloc = bufs.Alloc()
433         st.BufferPool.Cap = bufs.Cap()
434         st.BufferPool.Len = bufs.Len()
435         st.PullQueue = getWorkQueueStatus(rtr.pullq)
436         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
437 }
438
439 // return a WorkQueueStatus for the given queue. If q is nil (which
440 // should never happen except in test suites), return a zero status
441 // value instead of crashing.
442 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
443         if q == nil {
444                 // This should only happen during tests.
445                 return WorkQueueStatus{}
446         }
447         return q.Status()
448 }
449
450 // handleDELETE processes DELETE requests.
451 //
452 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
453 // from all connected volumes.
454 //
455 // Only the Data Manager, or an Arvados admin with scope "all", are
456 // allowed to issue DELETE requests.  If a DELETE request is not
457 // authenticated or is issued by a non-admin user, the server returns
458 // a PermissionError.
459 //
460 // Upon receiving a valid request from an authorized user,
461 // handleDELETE deletes all copies of the specified block on local
462 // writable volumes.
463 //
464 // Response format:
465 //
466 // If the requested blocks was not found on any volume, the response
467 // code is HTTP 404 Not Found.
468 //
469 // Otherwise, the response code is 200 OK, with a response body
470 // consisting of the JSON message
471 //
472 //    {"copies_deleted":d,"copies_failed":f}
473 //
474 // where d and f are integers representing the number of blocks that
475 // were successfully and unsuccessfully deleted.
476 //
477 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
478         hash := mux.Vars(req)["hash"]
479
480         // Confirm that this user is an admin and has a token with unlimited scope.
481         var tok = GetAPIToken(req)
482         if tok == "" || !rtr.canDelete(tok) {
483                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
484                 return
485         }
486
487         if !rtr.cluster.Collections.BlobTrash {
488                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
489                 return
490         }
491
492         // Delete copies of this block from all available volumes.
493         // Report how many blocks were successfully deleted, and how
494         // many were found on writable volumes but not deleted.
495         var result struct {
496                 Deleted int `json:"copies_deleted"`
497                 Failed  int `json:"copies_failed"`
498         }
499         for _, vol := range rtr.volmgr.AllWritable() {
500                 if err := vol.Trash(hash); err == nil {
501                         result.Deleted++
502                 } else if os.IsNotExist(err) {
503                         continue
504                 } else {
505                         result.Failed++
506                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
507                 }
508         }
509         if result.Deleted == 0 && result.Failed == 0 {
510                 resp.WriteHeader(http.StatusNotFound)
511                 return
512         }
513         body, err := json.Marshal(result)
514         if err != nil {
515                 http.Error(resp, err.Error(), http.StatusInternalServerError)
516                 return
517         }
518         resp.Write(body)
519 }
520
521 /* PullHandler processes "PUT /pull" requests for the data manager.
522    The request body is a JSON message containing a list of pull
523    requests in the following format:
524
525    [
526       {
527          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
528          "servers":[
529                         "keep0.qr1hi.arvadosapi.com:25107",
530                         "keep1.qr1hi.arvadosapi.com:25108"
531                  ]
532           },
533           {
534                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
535                  "servers":[
536                         "10.0.1.5:25107",
537                         "10.0.1.6:25107",
538                         "10.0.1.7:25108"
539                  ]
540           },
541           ...
542    ]
543
544    Each pull request in the list consists of a block locator string
545    and an ordered list of servers.  Keepstore should try to fetch the
546    block from each server in turn.
547
548    If the request has not been sent by the Data Manager, return 401
549    Unauthorized.
550
551    If the JSON unmarshalling fails, return 400 Bad Request.
552 */
553
554 // PullRequest consists of a block locator and an ordered list of servers
555 type PullRequest struct {
556         Locator string   `json:"locator"`
557         Servers []string `json:"servers"`
558
559         // Destination mount, or "" for "anywhere"
560         MountUUID string `json:"mount_uuid"`
561 }
562
563 // PullHandler processes "PUT /pull" requests for the data manager.
564 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
565         // Reject unauthorized requests.
566         if !rtr.isSystemAuth(GetAPIToken(req)) {
567                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
568                 return
569         }
570
571         // Parse the request body.
572         var pr []PullRequest
573         r := json.NewDecoder(req.Body)
574         if err := r.Decode(&pr); err != nil {
575                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
576                 return
577         }
578
579         // We have a properly formatted pull list sent from the data
580         // manager.  Report success and send the list to the pull list
581         // manager for further handling.
582         resp.WriteHeader(http.StatusOK)
583         resp.Write([]byte(
584                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
585
586         plist := list.New()
587         for _, p := range pr {
588                 plist.PushBack(p)
589         }
590         rtr.pullq.ReplaceQueue(plist)
591 }
592
593 // TrashRequest consists of a block locator and its Mtime
594 type TrashRequest struct {
595         Locator    string `json:"locator"`
596         BlockMtime int64  `json:"block_mtime"`
597
598         // Target mount, or "" for "everywhere"
599         MountUUID string `json:"mount_uuid"`
600 }
601
602 // TrashHandler processes /trash requests.
603 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
604         // Reject unauthorized requests.
605         if !rtr.isSystemAuth(GetAPIToken(req)) {
606                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
607                 return
608         }
609
610         // Parse the request body.
611         var trash []TrashRequest
612         r := json.NewDecoder(req.Body)
613         if err := r.Decode(&trash); err != nil {
614                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
615                 return
616         }
617
618         // We have a properly formatted trash list sent from the data
619         // manager.  Report success and send the list to the trash work
620         // queue for further handling.
621         resp.WriteHeader(http.StatusOK)
622         resp.Write([]byte(
623                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
624
625         tlist := list.New()
626         for _, t := range trash {
627                 tlist.PushBack(t)
628         }
629         rtr.trashq.ReplaceQueue(tlist)
630 }
631
632 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
633 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
634         // Reject unauthorized requests.
635         if !rtr.isSystemAuth(GetAPIToken(req)) {
636                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
637                 return
638         }
639
640         log := ctxlog.FromContext(req.Context())
641         hash := mux.Vars(req)["hash"]
642
643         if len(rtr.volmgr.AllWritable()) == 0 {
644                 http.Error(resp, "No writable volumes", http.StatusNotFound)
645                 return
646         }
647
648         var untrashedOn, failedOn []string
649         var numNotFound int
650         for _, vol := range rtr.volmgr.AllWritable() {
651                 err := vol.Untrash(hash)
652
653                 if os.IsNotExist(err) {
654                         numNotFound++
655                 } else if err != nil {
656                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
657                         failedOn = append(failedOn, vol.String())
658                 } else {
659                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
660                         untrashedOn = append(untrashedOn, vol.String())
661                 }
662         }
663
664         if numNotFound == len(rtr.volmgr.AllWritable()) {
665                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
666         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
667                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
668         } else {
669                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
670                 if len(failedOn) > 0 {
671                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
672                         http.Error(resp, respBody, http.StatusInternalServerError)
673                 } else {
674                         fmt.Fprintln(resp, respBody)
675                 }
676         }
677 }
678
679 // GetBlock and PutBlock implement lower-level code for handling
680 // blocks by rooting through volumes connected to the local machine.
681 // Once the handler has determined that system policy permits the
682 // request, it calls these methods to perform the actual operation.
683 //
684 // TODO(twp): this code would probably be better located in the
685 // VolumeManager interface. As an abstraction, the VolumeManager
686 // should be the only part of the code that cares about which volume a
687 // block is stored on, so it should be responsible for figuring out
688 // which volume to check for fetching blocks, storing blocks, etc.
689
690 // GetBlock fetches the block identified by "hash" into the provided
691 // buf, and returns the data size.
692 //
693 // If the block cannot be found on any volume, returns NotFoundError.
694 //
695 // If the block found does not have the correct MD5 hash, returns
696 // DiskHashError.
697 //
698 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
699         log := ctxlog.FromContext(ctx)
700
701         // Attempt to read the requested hash from a keep volume.
702         errorToCaller := NotFoundError
703
704         for _, vol := range volmgr.AllReadable() {
705                 size, err := vol.Get(ctx, hash, buf)
706                 select {
707                 case <-ctx.Done():
708                         return 0, ErrClientDisconnect
709                 default:
710                 }
711                 if err != nil {
712                         // IsNotExist is an expected error and may be
713                         // ignored. All other errors are logged. In
714                         // any case we continue trying to read other
715                         // volumes. If all volumes report IsNotExist,
716                         // we return a NotFoundError.
717                         if !os.IsNotExist(err) {
718                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
719                         }
720                         // If some volume returns a transient error, return it to the caller
721                         // instead of "Not found" so it can retry.
722                         if err == VolumeBusyError {
723                                 errorToCaller = err.(*KeepError)
724                         }
725                         continue
726                 }
727                 // Check the file checksum.
728                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
729                 if filehash != hash {
730                         // TODO: Try harder to tell a sysadmin about
731                         // this.
732                         log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
733                         errorToCaller = DiskHashError
734                         continue
735                 }
736                 if errorToCaller == DiskHashError {
737                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
738                 }
739                 return size, nil
740         }
741         return 0, errorToCaller
742 }
743
744 type putProgress struct {
745         classNeeded      map[string]bool
746         classTodo        map[string]bool
747         mountUsed        map[*VolumeMount]bool
748         totalReplication int
749         classDone        map[string]int
750 }
751
752 // Number of distinct replicas stored. "2" can mean the block was
753 // stored on 2 different volumes with replication 1, or on 1 volume
754 // with replication 2.
755 func (pr putProgress) TotalReplication() string {
756         return strconv.Itoa(pr.totalReplication)
757 }
758
759 // Number of replicas satisfying each storage class, formatted like
760 // "default=2; special=1".
761 func (pr putProgress) ClassReplication() string {
762         s := ""
763         for k, v := range pr.classDone {
764                 if len(s) > 0 {
765                         s += ", "
766                 }
767                 s += k + "=" + strconv.Itoa(v)
768         }
769         return s
770 }
771
772 func (pr *putProgress) Add(mnt *VolumeMount) {
773         if pr.mountUsed[mnt] {
774                 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
775                 return
776         }
777         pr.mountUsed[mnt] = true
778         pr.totalReplication += mnt.Replication
779         for class := range mnt.StorageClasses {
780                 pr.classDone[class] += mnt.Replication
781                 delete(pr.classTodo, class)
782         }
783 }
784
785 func (pr *putProgress) Sub(mnt *VolumeMount) {
786         if !pr.mountUsed[mnt] {
787                 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
788                 return
789         }
790         pr.mountUsed[mnt] = false
791         pr.totalReplication -= mnt.Replication
792         for class := range mnt.StorageClasses {
793                 pr.classDone[class] -= mnt.Replication
794                 if pr.classNeeded[class] {
795                         pr.classTodo[class] = true
796                 }
797         }
798 }
799
800 func (pr *putProgress) Done() bool {
801         return len(pr.classTodo) == 0 && pr.totalReplication > 0
802 }
803
804 func (pr *putProgress) Want(mnt *VolumeMount) bool {
805         if pr.Done() || pr.mountUsed[mnt] {
806                 return false
807         }
808         if len(pr.classTodo) == 0 {
809                 // none specified == "any"
810                 return true
811         }
812         for class := range mnt.StorageClasses {
813                 if pr.classTodo[class] {
814                         return true
815                 }
816         }
817         return false
818 }
819
820 func (pr *putProgress) Copy() *putProgress {
821         cp := putProgress{
822                 classNeeded:      pr.classNeeded,
823                 classTodo:        make(map[string]bool, len(pr.classTodo)),
824                 classDone:        make(map[string]int, len(pr.classDone)),
825                 mountUsed:        make(map[*VolumeMount]bool, len(pr.mountUsed)),
826                 totalReplication: pr.totalReplication,
827         }
828         for k, v := range pr.classTodo {
829                 cp.classTodo[k] = v
830         }
831         for k, v := range pr.classDone {
832                 cp.classDone[k] = v
833         }
834         for k, v := range pr.mountUsed {
835                 cp.mountUsed[k] = v
836         }
837         return &cp
838 }
839
840 func newPutProgress(classes []string) putProgress {
841         pr := putProgress{
842                 classNeeded: make(map[string]bool, len(classes)),
843                 classTodo:   make(map[string]bool, len(classes)),
844                 classDone:   map[string]int{},
845                 mountUsed:   map[*VolumeMount]bool{},
846         }
847         for _, c := range classes {
848                 if c != "" {
849                         pr.classNeeded[c] = true
850                         pr.classTodo[c] = true
851                 }
852         }
853         return pr
854 }
855
856 // PutBlock stores the given block on one or more volumes.
857 //
858 // The MD5 checksum of the block must match the given hash.
859 //
860 // The block is written to each writable volume (ordered by priority
861 // and then UUID, see volume.go) until at least one replica has been
862 // stored in each of the requested storage classes.
863 //
864 // The returned error, if any, is a KeepError with one of the
865 // following codes:
866 //
867 // 500 Collision
868 //        A different block with the same hash already exists on this
869 //        Keep server.
870 // 422 MD5Fail
871 //        The MD5 hash of the BLOCK does not match the argument HASH.
872 // 503 Full
873 //        There was not enough space left in any Keep volume to store
874 //        the object.
875 // 500 Fail
876 //        The object could not be stored for some other reason (e.g.
877 //        all writes failed). The text of the error message should
878 //        provide as much detail as possible.
879 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
880         log := ctxlog.FromContext(ctx)
881
882         // Check that BLOCK's checksum matches HASH.
883         blockhash := fmt.Sprintf("%x", md5.Sum(block))
884         if blockhash != hash {
885                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
886                 return putProgress{}, RequestHashError
887         }
888
889         result := newPutProgress(wantStorageClasses)
890
891         // If we already have this data, it's intact on disk, and we
892         // can update its timestamp, return success. If we have
893         // different data with the same hash, return failure.
894         if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
895                 return result, err
896         }
897         if ctx.Err() != nil {
898                 return result, ErrClientDisconnect
899         }
900
901         writables := volmgr.NextWritable()
902         if len(writables) == 0 {
903                 log.Error("no writable volumes")
904                 return result, FullError
905         }
906
907         var wg sync.WaitGroup
908         var mtx sync.Mutex
909         cond := sync.Cond{L: &mtx}
910         // pending predicts what result will be if all pending writes
911         // succeed.
912         pending := result.Copy()
913         var allFull atomic.Value
914         allFull.Store(true)
915
916         // We hold the lock for the duration of the "each volume" loop
917         // below, except when it is released during cond.Wait().
918         mtx.Lock()
919
920         for _, mnt := range writables {
921                 // Wait until our decision to use this mount does not
922                 // depend on the outcome of pending writes.
923                 for result.Want(mnt) && !pending.Want(mnt) {
924                         cond.Wait()
925                 }
926                 if !result.Want(mnt) {
927                         continue
928                 }
929                 mnt := mnt
930                 pending.Add(mnt)
931                 wg.Add(1)
932                 go func() {
933                         log.Debugf("PutBlock: start write to %s", mnt.UUID)
934                         defer wg.Done()
935                         err := mnt.Put(ctx, hash, block)
936
937                         mtx.Lock()
938                         if err != nil {
939                                 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
940                                 pending.Sub(mnt)
941                         } else {
942                                 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
943                                 result.Add(mnt)
944                         }
945                         cond.Broadcast()
946                         mtx.Unlock()
947
948                         if err != nil && err != FullError && ctx.Err() == nil {
949                                 // The volume is not full but the
950                                 // write did not succeed.  Report the
951                                 // error and continue trying.
952                                 allFull.Store(false)
953                                 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
954                         }
955                 }()
956         }
957         mtx.Unlock()
958         wg.Wait()
959         if ctx.Err() != nil {
960                 return result, ErrClientDisconnect
961         }
962         if result.Done() {
963                 return result, nil
964         }
965
966         if result.totalReplication > 0 {
967                 // Some, but not all, of the storage classes were
968                 // satisfied. This qualifies as success.
969                 return result, nil
970         } else if allFull.Load().(bool) {
971                 log.Error("all volumes with qualifying storage classes are full")
972                 return putProgress{}, FullError
973         } else {
974                 // Already logged the non-full errors.
975                 return putProgress{}, GenericError
976         }
977 }
978
979 // CompareAndTouch looks for volumes where the given content already
980 // exists and its modification time can be updated (i.e., it is
981 // protected from garbage collection), and updates result accordingly.
982 // It returns when the result is Done() or all volumes have been
983 // checked.
984 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
985         log := ctxlog.FromContext(ctx)
986         for _, mnt := range volmgr.AllWritable() {
987                 if !result.Want(mnt) {
988                         continue
989                 }
990                 err := mnt.Compare(ctx, hash, buf)
991                 if ctx.Err() != nil {
992                         return nil
993                 } else if err == CollisionError {
994                         // Stop if we have a block with same hash but
995                         // different content. (It will be impossible
996                         // to tell which one is wanted if we have
997                         // both, so there's no point writing it even
998                         // on a different volume.)
999                         log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
1000                         return CollisionError
1001                 } else if os.IsNotExist(err) {
1002                         // Block does not exist. This is the only
1003                         // "normal" error: we don't log anything.
1004                         continue
1005                 } else if err != nil {
1006                         // Couldn't open file, data is corrupt on
1007                         // disk, etc.: log this abnormal condition,
1008                         // and try the next volume.
1009                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
1010                         continue
1011                 }
1012                 if err := mnt.Touch(hash); err != nil {
1013                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
1014                         continue
1015                 }
1016                 // Compare and Touch both worked --> done.
1017                 result.Add(mnt)
1018                 if result.Done() {
1019                         return nil
1020                 }
1021         }
1022         return nil
1023 }
1024
1025 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1026
1027 // IsValidLocator returns true if the specified string is a valid Keep locator.
1028 //   When Keep is extended to support hash types other than MD5,
1029 //   this should be updated to cover those as well.
1030 //
1031 func IsValidLocator(loc string) bool {
1032         return validLocatorRe.MatchString(loc)
1033 }
1034
1035 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1036
1037 // GetAPIToken returns the OAuth2 token from the Authorization
1038 // header of a HTTP request, or an empty string if no matching
1039 // token is found.
1040 func GetAPIToken(req *http.Request) string {
1041         if auth, ok := req.Header["Authorization"]; ok {
1042                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1043                         return match[2]
1044                 }
1045         }
1046         return ""
1047 }
1048
1049 // canDelete returns true if the user identified by apiToken is
1050 // allowed to delete blocks.
1051 func (rtr *router) canDelete(apiToken string) bool {
1052         if apiToken == "" {
1053                 return false
1054         }
1055         // Blocks may be deleted only when Keep has been configured with a
1056         // data manager.
1057         if rtr.isSystemAuth(apiToken) {
1058                 return true
1059         }
1060         // TODO(twp): look up apiToken with the API server
1061         // return true if is_admin is true and if the token
1062         // has unlimited scope
1063         return false
1064 }
1065
1066 // isSystemAuth returns true if the given token is allowed to perform
1067 // system level actions like deleting data.
1068 func (rtr *router) isSystemAuth(token string) bool {
1069         return token != "" && token == rtr.cluster.SystemRootToken
1070 }