16347: Merge branch 'main'
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/ctxlog"
26         "git.arvados.org/arvados.git/sdk/go/health"
27         "git.arvados.org/arvados.git/sdk/go/httpserver"
28         "github.com/gorilla/mux"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31 )
32
33 type router struct {
34         *mux.Router
35         cluster     *arvados.Cluster
36         logger      logrus.FieldLogger
37         remoteProxy remoteProxy
38         metrics     *nodeMetrics
39         volmgr      *RRVolumeManager
40         pullq       *WorkQueue
41         trashq      *WorkQueue
42 }
43
44 // MakeRESTRouter returns a new router that forwards all Keep requests
45 // to the appropriate handlers.
46 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
47         rtr := &router{
48                 Router:  mux.NewRouter(),
49                 cluster: cluster,
50                 logger:  ctxlog.FromContext(ctx),
51                 metrics: &nodeMetrics{reg: reg},
52                 volmgr:  volmgr,
53                 pullq:   pullq,
54                 trashq:  trashq,
55         }
56
57         rtr.HandleFunc(
58                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
59         rtr.HandleFunc(
60                 `/{hash:[0-9a-f]{32}}+{hints}`,
61                 rtr.handleGET).Methods("GET", "HEAD")
62
63         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
64         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
65         // List all blocks stored here. Privileged client only.
66         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
67         // List blocks stored here whose hash has the given prefix.
68         // Privileged client only.
69         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
70         // Update timestamp on existing block. Privileged client only.
71         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
72
73         // Internals/debugging info (runtime.MemStats)
74         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
75
76         // List volumes: path, device number, bytes used/avail.
77         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
78
79         // List mounts: UUID, readonly, tier, device ID, ...
80         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
81         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
82         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
83
84         // Replace the current pull queue.
85         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
86
87         // Replace the current trash queue.
88         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
89
90         // Untrash moves blocks from trash back into store
91         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
92
93         rtr.Handle("/_health/{check}", &health.Handler{
94                 Token:  cluster.ManagementToken,
95                 Prefix: "/_health/",
96         }).Methods("GET")
97
98         // Any request which does not match any of these routes gets
99         // 400 Bad Request.
100         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
101
102         rtr.metrics.setupBufferPoolMetrics(bufs)
103         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
104         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
105
106         return rtr
107 }
108
109 // BadRequestHandler is a HandleFunc to address bad requests.
110 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
111         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
112 }
113
114 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
115         locator := req.URL.Path[1:]
116         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
117                 rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
118                 return
119         }
120
121         if rtr.cluster.Collections.BlobSigning {
122                 locator := req.URL.Path[1:] // strip leading slash
123                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
124                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
125                         return
126                 }
127         }
128
129         // TODO: Probe volumes to check whether the block _might_
130         // exist. Some volumes/types could support a quick existence
131         // check without causing other operations to suffer. If all
132         // volumes support that, and assure us the block definitely
133         // isn't here, we can return 404 now instead of waiting for a
134         // buffer.
135
136         buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
137         if err != nil {
138                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
139                 return
140         }
141         defer bufs.Put(buf)
142
143         size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
144         if err != nil {
145                 code := http.StatusInternalServerError
146                 if err, ok := err.(*KeepError); ok {
147                         code = err.HTTPCode
148                 }
149                 http.Error(resp, err.Error(), code)
150                 return
151         }
152
153         resp.Header().Set("Content-Length", strconv.Itoa(size))
154         resp.Header().Set("Content-Type", "application/octet-stream")
155         resp.Write(buf[:size])
156 }
157
158 // Get a buffer from the pool -- but give up and return a non-nil
159 // error if ctx ends before we get a buffer.
160 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
161         bufReady := make(chan []byte)
162         go func() {
163                 bufReady <- bufs.Get(bufSize)
164         }()
165         select {
166         case buf := <-bufReady:
167                 return buf, nil
168         case <-ctx.Done():
169                 go func() {
170                         // Even if closeNotifier happened first, we
171                         // need to keep waiting for our buf so we can
172                         // return it to the pool.
173                         bufs.Put(<-bufReady)
174                 }()
175                 return nil, ErrClientDisconnect
176         }
177 }
178
179 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
180         if !rtr.isSystemAuth(GetAPIToken(req)) {
181                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
182                 return
183         }
184         hash := mux.Vars(req)["hash"]
185         vols := rtr.volmgr.AllWritable()
186         if len(vols) == 0 {
187                 http.Error(resp, "no volumes", http.StatusNotFound)
188                 return
189         }
190         var err error
191         for _, mnt := range vols {
192                 err = mnt.Touch(hash)
193                 if err == nil {
194                         break
195                 }
196         }
197         switch {
198         case err == nil:
199                 return
200         case os.IsNotExist(err):
201                 http.Error(resp, err.Error(), http.StatusNotFound)
202         default:
203                 http.Error(resp, err.Error(), http.StatusInternalServerError)
204         }
205 }
206
207 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
208         hash := mux.Vars(req)["hash"]
209
210         // Detect as many error conditions as possible before reading
211         // the body: avoid transmitting data that will not end up
212         // being written anyway.
213
214         if req.ContentLength == -1 {
215                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
216                 return
217         }
218
219         if req.ContentLength > BlockSize {
220                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
221                 return
222         }
223
224         if len(rtr.volmgr.AllWritable()) == 0 {
225                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
226                 return
227         }
228
229         var wantStorageClasses []string
230         if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
231                 wantStorageClasses = strings.Split(hdr, ",")
232                 for i, sc := range wantStorageClasses {
233                         wantStorageClasses[i] = strings.TrimSpace(sc)
234                 }
235         } else {
236                 // none specified -- use configured default
237                 for class, cfg := range rtr.cluster.StorageClasses {
238                         if cfg.Default {
239                                 wantStorageClasses = append(wantStorageClasses, class)
240                         }
241                 }
242         }
243
244         buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
245         if err != nil {
246                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
247                 return
248         }
249
250         _, err = io.ReadFull(req.Body, buf)
251         if err != nil {
252                 http.Error(resp, err.Error(), 500)
253                 bufs.Put(buf)
254                 return
255         }
256
257         result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
258         bufs.Put(buf)
259
260         if err != nil {
261                 code := http.StatusInternalServerError
262                 if err, ok := err.(*KeepError); ok {
263                         code = err.HTTPCode
264                 }
265                 http.Error(resp, err.Error(), code)
266                 return
267         }
268
269         // Success; add a size hint, sign the locator if possible, and
270         // return it to the client.
271         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
272         apiToken := GetAPIToken(req)
273         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
274                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
275                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
276         }
277         resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
278         resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
279         resp.Write([]byte(returnHash + "\n"))
280 }
281
282 // IndexHandler responds to "/index", "/index/{prefix}", and
283 // "/mounts/{uuid}/blocks" requests.
284 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
285         if !rtr.isSystemAuth(GetAPIToken(req)) {
286                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
287                 return
288         }
289
290         prefix := mux.Vars(req)["prefix"]
291         if prefix == "" {
292                 req.ParseForm()
293                 prefix = req.Form.Get("prefix")
294         }
295
296         uuid := mux.Vars(req)["uuid"]
297
298         var vols []*VolumeMount
299         if uuid == "" {
300                 vols = rtr.volmgr.AllReadable()
301         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
302                 http.Error(resp, "mount not found", http.StatusNotFound)
303                 return
304         } else {
305                 vols = []*VolumeMount{mnt}
306         }
307
308         for _, v := range vols {
309                 if err := v.IndexTo(prefix, resp); err != nil {
310                         // We can't send an error status/message to
311                         // the client because IndexTo() might have
312                         // already written body content. All we can do
313                         // is log the error in our own logs.
314                         //
315                         // The client must notice the lack of trailing
316                         // newline as an indication that the response
317                         // is incomplete.
318                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
319                         return
320                 }
321         }
322         // An empty line at EOF is the only way the client can be
323         // assured the entire index was received.
324         resp.Write([]byte{'\n'})
325 }
326
327 // MountsHandler responds to "GET /mounts" requests.
328 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
329         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
330         if err != nil {
331                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
332         }
333 }
334
335 // PoolStatus struct
336 type PoolStatus struct {
337         Alloc uint64 `json:"BytesAllocatedCumulative"`
338         Cap   int    `json:"BuffersMax"`
339         Len   int    `json:"BuffersInUse"`
340 }
341
342 type volumeStatusEnt struct {
343         Label         string
344         Status        *VolumeStatus `json:",omitempty"`
345         VolumeStats   *ioStats      `json:",omitempty"`
346         InternalStats interface{}   `json:",omitempty"`
347 }
348
349 // NodeStatus struct
350 type NodeStatus struct {
351         Volumes         []*volumeStatusEnt
352         BufferPool      PoolStatus
353         PullQueue       WorkQueueStatus
354         TrashQueue      WorkQueueStatus
355         RequestsCurrent int
356         RequestsMax     int
357         Version         string
358 }
359
360 var st NodeStatus
361 var stLock sync.Mutex
362
363 // DebugHandler addresses /debug.json requests.
364 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
365         type debugStats struct {
366                 MemStats runtime.MemStats
367         }
368         var ds debugStats
369         runtime.ReadMemStats(&ds.MemStats)
370         data, err := json.Marshal(&ds)
371         if err != nil {
372                 http.Error(resp, err.Error(), http.StatusInternalServerError)
373                 return
374         }
375         resp.Write(data)
376 }
377
378 // StatusHandler addresses /status.json requests.
379 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
380         stLock.Lock()
381         rtr.readNodeStatus(&st)
382         data, err := json.Marshal(&st)
383         stLock.Unlock()
384         if err != nil {
385                 http.Error(resp, err.Error(), http.StatusInternalServerError)
386                 return
387         }
388         resp.Write(data)
389 }
390
391 // populate the given NodeStatus struct with current values.
392 func (rtr *router) readNodeStatus(st *NodeStatus) {
393         st.Version = version
394         vols := rtr.volmgr.AllReadable()
395         if cap(st.Volumes) < len(vols) {
396                 st.Volumes = make([]*volumeStatusEnt, len(vols))
397         }
398         st.Volumes = st.Volumes[:0]
399         for _, vol := range vols {
400                 var internalStats interface{}
401                 if vol, ok := vol.Volume.(InternalStatser); ok {
402                         internalStats = vol.InternalStats()
403                 }
404                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
405                         Label:         vol.String(),
406                         Status:        vol.Status(),
407                         InternalStats: internalStats,
408                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
409                 })
410         }
411         st.BufferPool.Alloc = bufs.Alloc()
412         st.BufferPool.Cap = bufs.Cap()
413         st.BufferPool.Len = bufs.Len()
414         st.PullQueue = getWorkQueueStatus(rtr.pullq)
415         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
416 }
417
418 // return a WorkQueueStatus for the given queue. If q is nil (which
419 // should never happen except in test suites), return a zero status
420 // value instead of crashing.
421 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
422         if q == nil {
423                 // This should only happen during tests.
424                 return WorkQueueStatus{}
425         }
426         return q.Status()
427 }
428
429 // handleDELETE processes DELETE requests.
430 //
431 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
432 // from all connected volumes.
433 //
434 // Only the Data Manager, or an Arvados admin with scope "all", are
435 // allowed to issue DELETE requests.  If a DELETE request is not
436 // authenticated or is issued by a non-admin user, the server returns
437 // a PermissionError.
438 //
439 // Upon receiving a valid request from an authorized user,
440 // handleDELETE deletes all copies of the specified block on local
441 // writable volumes.
442 //
443 // Response format:
444 //
445 // If the requested blocks was not found on any volume, the response
446 // code is HTTP 404 Not Found.
447 //
448 // Otherwise, the response code is 200 OK, with a response body
449 // consisting of the JSON message
450 //
451 //    {"copies_deleted":d,"copies_failed":f}
452 //
453 // where d and f are integers representing the number of blocks that
454 // were successfully and unsuccessfully deleted.
455 //
456 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
457         hash := mux.Vars(req)["hash"]
458
459         // Confirm that this user is an admin and has a token with unlimited scope.
460         var tok = GetAPIToken(req)
461         if tok == "" || !rtr.canDelete(tok) {
462                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
463                 return
464         }
465
466         if !rtr.cluster.Collections.BlobTrash {
467                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
468                 return
469         }
470
471         // Delete copies of this block from all available volumes.
472         // Report how many blocks were successfully deleted, and how
473         // many were found on writable volumes but not deleted.
474         var result struct {
475                 Deleted int `json:"copies_deleted"`
476                 Failed  int `json:"copies_failed"`
477         }
478         for _, vol := range rtr.volmgr.AllWritable() {
479                 if err := vol.Trash(hash); err == nil {
480                         result.Deleted++
481                 } else if os.IsNotExist(err) {
482                         continue
483                 } else {
484                         result.Failed++
485                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
486                 }
487         }
488         if result.Deleted == 0 && result.Failed == 0 {
489                 resp.WriteHeader(http.StatusNotFound)
490                 return
491         }
492         body, err := json.Marshal(result)
493         if err != nil {
494                 http.Error(resp, err.Error(), http.StatusInternalServerError)
495                 return
496         }
497         resp.Write(body)
498 }
499
500 /* PullHandler processes "PUT /pull" requests for the data manager.
501    The request body is a JSON message containing a list of pull
502    requests in the following format:
503
504    [
505       {
506          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
507          "servers":[
508                         "keep0.qr1hi.arvadosapi.com:25107",
509                         "keep1.qr1hi.arvadosapi.com:25108"
510                  ]
511           },
512           {
513                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
514                  "servers":[
515                         "10.0.1.5:25107",
516                         "10.0.1.6:25107",
517                         "10.0.1.7:25108"
518                  ]
519           },
520           ...
521    ]
522
523    Each pull request in the list consists of a block locator string
524    and an ordered list of servers.  Keepstore should try to fetch the
525    block from each server in turn.
526
527    If the request has not been sent by the Data Manager, return 401
528    Unauthorized.
529
530    If the JSON unmarshalling fails, return 400 Bad Request.
531 */
532
533 // PullRequest consists of a block locator and an ordered list of servers
534 type PullRequest struct {
535         Locator string   `json:"locator"`
536         Servers []string `json:"servers"`
537
538         // Destination mount, or "" for "anywhere"
539         MountUUID string `json:"mount_uuid"`
540 }
541
542 // PullHandler processes "PUT /pull" requests for the data manager.
543 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
544         // Reject unauthorized requests.
545         if !rtr.isSystemAuth(GetAPIToken(req)) {
546                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
547                 return
548         }
549
550         // Parse the request body.
551         var pr []PullRequest
552         r := json.NewDecoder(req.Body)
553         if err := r.Decode(&pr); err != nil {
554                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
555                 return
556         }
557
558         // We have a properly formatted pull list sent from the data
559         // manager.  Report success and send the list to the pull list
560         // manager for further handling.
561         resp.WriteHeader(http.StatusOK)
562         resp.Write([]byte(
563                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
564
565         plist := list.New()
566         for _, p := range pr {
567                 plist.PushBack(p)
568         }
569         rtr.pullq.ReplaceQueue(plist)
570 }
571
572 // TrashRequest consists of a block locator and its Mtime
573 type TrashRequest struct {
574         Locator    string `json:"locator"`
575         BlockMtime int64  `json:"block_mtime"`
576
577         // Target mount, or "" for "everywhere"
578         MountUUID string `json:"mount_uuid"`
579 }
580
581 // TrashHandler processes /trash requests.
582 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
583         // Reject unauthorized requests.
584         if !rtr.isSystemAuth(GetAPIToken(req)) {
585                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
586                 return
587         }
588
589         // Parse the request body.
590         var trash []TrashRequest
591         r := json.NewDecoder(req.Body)
592         if err := r.Decode(&trash); err != nil {
593                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
594                 return
595         }
596
597         // We have a properly formatted trash list sent from the data
598         // manager.  Report success and send the list to the trash work
599         // queue for further handling.
600         resp.WriteHeader(http.StatusOK)
601         resp.Write([]byte(
602                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
603
604         tlist := list.New()
605         for _, t := range trash {
606                 tlist.PushBack(t)
607         }
608         rtr.trashq.ReplaceQueue(tlist)
609 }
610
611 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
612 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
613         // Reject unauthorized requests.
614         if !rtr.isSystemAuth(GetAPIToken(req)) {
615                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
616                 return
617         }
618
619         log := ctxlog.FromContext(req.Context())
620         hash := mux.Vars(req)["hash"]
621
622         if len(rtr.volmgr.AllWritable()) == 0 {
623                 http.Error(resp, "No writable volumes", http.StatusNotFound)
624                 return
625         }
626
627         var untrashedOn, failedOn []string
628         var numNotFound int
629         for _, vol := range rtr.volmgr.AllWritable() {
630                 err := vol.Untrash(hash)
631
632                 if os.IsNotExist(err) {
633                         numNotFound++
634                 } else if err != nil {
635                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
636                         failedOn = append(failedOn, vol.String())
637                 } else {
638                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
639                         untrashedOn = append(untrashedOn, vol.String())
640                 }
641         }
642
643         if numNotFound == len(rtr.volmgr.AllWritable()) {
644                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
645         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
646                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
647         } else {
648                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
649                 if len(failedOn) > 0 {
650                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
651                         http.Error(resp, respBody, http.StatusInternalServerError)
652                 } else {
653                         fmt.Fprintln(resp, respBody)
654                 }
655         }
656 }
657
658 // GetBlock and PutBlock implement lower-level code for handling
659 // blocks by rooting through volumes connected to the local machine.
660 // Once the handler has determined that system policy permits the
661 // request, it calls these methods to perform the actual operation.
662 //
663 // TODO(twp): this code would probably be better located in the
664 // VolumeManager interface. As an abstraction, the VolumeManager
665 // should be the only part of the code that cares about which volume a
666 // block is stored on, so it should be responsible for figuring out
667 // which volume to check for fetching blocks, storing blocks, etc.
668
669 // GetBlock fetches the block identified by "hash" into the provided
670 // buf, and returns the data size.
671 //
672 // If the block cannot be found on any volume, returns NotFoundError.
673 //
674 // If the block found does not have the correct MD5 hash, returns
675 // DiskHashError.
676 //
677 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
678         log := ctxlog.FromContext(ctx)
679
680         // Attempt to read the requested hash from a keep volume.
681         errorToCaller := NotFoundError
682
683         for _, vol := range volmgr.AllReadable() {
684                 size, err := vol.Get(ctx, hash, buf)
685                 select {
686                 case <-ctx.Done():
687                         return 0, ErrClientDisconnect
688                 default:
689                 }
690                 if err != nil {
691                         // IsNotExist is an expected error and may be
692                         // ignored. All other errors are logged. In
693                         // any case we continue trying to read other
694                         // volumes. If all volumes report IsNotExist,
695                         // we return a NotFoundError.
696                         if !os.IsNotExist(err) {
697                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
698                         }
699                         // If some volume returns a transient error, return it to the caller
700                         // instead of "Not found" so it can retry.
701                         if err == VolumeBusyError {
702                                 errorToCaller = err.(*KeepError)
703                         }
704                         continue
705                 }
706                 // Check the file checksum.
707                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
708                 if filehash != hash {
709                         // TODO: Try harder to tell a sysadmin about
710                         // this.
711                         log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
712                         errorToCaller = DiskHashError
713                         continue
714                 }
715                 if errorToCaller == DiskHashError {
716                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
717                 }
718                 return size, nil
719         }
720         return 0, errorToCaller
721 }
722
723 type putProgress struct {
724         classNeeded      map[string]bool
725         classTodo        map[string]bool
726         mountUsed        map[*VolumeMount]bool
727         totalReplication int
728         classDone        map[string]int
729 }
730
731 // Number of distinct replicas stored. "2" can mean the block was
732 // stored on 2 different volumes with replication 1, or on 1 volume
733 // with replication 2.
734 func (pr putProgress) TotalReplication() string {
735         return strconv.Itoa(pr.totalReplication)
736 }
737
738 // Number of replicas satisfying each storage class, formatted like
739 // "default=2; special=1".
740 func (pr putProgress) ClassReplication() string {
741         s := ""
742         for k, v := range pr.classDone {
743                 if len(s) > 0 {
744                         s += ", "
745                 }
746                 s += k + "=" + strconv.Itoa(v)
747         }
748         return s
749 }
750
751 func (pr *putProgress) Add(mnt *VolumeMount) {
752         if pr.mountUsed[mnt] {
753                 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
754                 return
755         }
756         pr.mountUsed[mnt] = true
757         pr.totalReplication += mnt.Replication
758         for class := range mnt.StorageClasses {
759                 pr.classDone[class] += mnt.Replication
760                 delete(pr.classTodo, class)
761         }
762 }
763
764 func (pr *putProgress) Sub(mnt *VolumeMount) {
765         if !pr.mountUsed[mnt] {
766                 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
767                 return
768         }
769         pr.mountUsed[mnt] = false
770         pr.totalReplication -= mnt.Replication
771         for class := range mnt.StorageClasses {
772                 pr.classDone[class] -= mnt.Replication
773                 if pr.classNeeded[class] {
774                         pr.classTodo[class] = true
775                 }
776         }
777 }
778
779 func (pr *putProgress) Done() bool {
780         return len(pr.classTodo) == 0 && pr.totalReplication > 0
781 }
782
783 func (pr *putProgress) Want(mnt *VolumeMount) bool {
784         if pr.Done() || pr.mountUsed[mnt] {
785                 return false
786         }
787         if len(pr.classTodo) == 0 {
788                 // none specified == "any"
789                 return true
790         }
791         for class := range mnt.StorageClasses {
792                 if pr.classTodo[class] {
793                         return true
794                 }
795         }
796         return false
797 }
798
799 func (pr *putProgress) Copy() *putProgress {
800         cp := putProgress{
801                 classNeeded:      pr.classNeeded,
802                 classTodo:        make(map[string]bool, len(pr.classTodo)),
803                 classDone:        make(map[string]int, len(pr.classDone)),
804                 mountUsed:        make(map[*VolumeMount]bool, len(pr.mountUsed)),
805                 totalReplication: pr.totalReplication,
806         }
807         for k, v := range pr.classTodo {
808                 cp.classTodo[k] = v
809         }
810         for k, v := range pr.classDone {
811                 cp.classDone[k] = v
812         }
813         for k, v := range pr.mountUsed {
814                 cp.mountUsed[k] = v
815         }
816         return &cp
817 }
818
819 func newPutProgress(classes []string) putProgress {
820         pr := putProgress{
821                 classNeeded: make(map[string]bool, len(classes)),
822                 classTodo:   make(map[string]bool, len(classes)),
823                 classDone:   map[string]int{},
824                 mountUsed:   map[*VolumeMount]bool{},
825         }
826         for _, c := range classes {
827                 if c != "" {
828                         pr.classNeeded[c] = true
829                         pr.classTodo[c] = true
830                 }
831         }
832         return pr
833 }
834
835 // PutBlock stores the given block on one or more volumes.
836 //
837 // The MD5 checksum of the block must match the given hash.
838 //
839 // The block is written to each writable volume (ordered by priority
840 // and then UUID, see volume.go) until at least one replica has been
841 // stored in each of the requested storage classes.
842 //
843 // The returned error, if any, is a KeepError with one of the
844 // following codes:
845 //
846 // 500 Collision
847 //        A different block with the same hash already exists on this
848 //        Keep server.
849 // 422 MD5Fail
850 //        The MD5 hash of the BLOCK does not match the argument HASH.
851 // 503 Full
852 //        There was not enough space left in any Keep volume to store
853 //        the object.
854 // 500 Fail
855 //        The object could not be stored for some other reason (e.g.
856 //        all writes failed). The text of the error message should
857 //        provide as much detail as possible.
858 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
859         log := ctxlog.FromContext(ctx)
860
861         // Check that BLOCK's checksum matches HASH.
862         blockhash := fmt.Sprintf("%x", md5.Sum(block))
863         if blockhash != hash {
864                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
865                 return putProgress{}, RequestHashError
866         }
867
868         result := newPutProgress(wantStorageClasses)
869
870         // If we already have this data, it's intact on disk, and we
871         // can update its timestamp, return success. If we have
872         // different data with the same hash, return failure.
873         if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
874                 return result, err
875         }
876         if ctx.Err() != nil {
877                 return result, ErrClientDisconnect
878         }
879
880         writables := volmgr.NextWritable()
881         if len(writables) == 0 {
882                 log.Error("no writable volumes")
883                 return result, FullError
884         }
885
886         var wg sync.WaitGroup
887         var mtx sync.Mutex
888         cond := sync.Cond{L: &mtx}
889         // pending predicts what result will be if all pending writes
890         // succeed.
891         pending := result.Copy()
892         var allFull atomic.Value
893         allFull.Store(true)
894
895         // We hold the lock for the duration of the "each volume" loop
896         // below, except when it is released during cond.Wait().
897         mtx.Lock()
898
899         for _, mnt := range writables {
900                 // Wait until our decision to use this mount does not
901                 // depend on the outcome of pending writes.
902                 for result.Want(mnt) && !pending.Want(mnt) {
903                         cond.Wait()
904                 }
905                 if !result.Want(mnt) {
906                         continue
907                 }
908                 mnt := mnt
909                 pending.Add(mnt)
910                 wg.Add(1)
911                 go func() {
912                         log.Debugf("PutBlock: start write to %s", mnt.UUID)
913                         defer wg.Done()
914                         err := mnt.Put(ctx, hash, block)
915
916                         mtx.Lock()
917                         if err != nil {
918                                 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
919                                 pending.Sub(mnt)
920                         } else {
921                                 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
922                                 result.Add(mnt)
923                         }
924                         cond.Broadcast()
925                         mtx.Unlock()
926
927                         if err != nil && err != FullError && ctx.Err() == nil {
928                                 // The volume is not full but the
929                                 // write did not succeed.  Report the
930                                 // error and continue trying.
931                                 allFull.Store(false)
932                                 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
933                         }
934                 }()
935         }
936         mtx.Unlock()
937         wg.Wait()
938         if ctx.Err() != nil {
939                 return result, ErrClientDisconnect
940         }
941         if result.Done() {
942                 return result, nil
943         }
944
945         if result.totalReplication > 0 {
946                 // Some, but not all, of the storage classes were
947                 // satisfied. This qualifies as success.
948                 return result, nil
949         } else if allFull.Load().(bool) {
950                 log.Error("all volumes with qualifying storage classes are full")
951                 return putProgress{}, FullError
952         } else {
953                 // Already logged the non-full errors.
954                 return putProgress{}, GenericError
955         }
956 }
957
958 // CompareAndTouch looks for volumes where the given content already
959 // exists and its modification time can be updated (i.e., it is
960 // protected from garbage collection), and updates result accordingly.
961 // It returns when the result is Done() or all volumes have been
962 // checked.
963 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
964         log := ctxlog.FromContext(ctx)
965         for _, mnt := range volmgr.AllWritable() {
966                 if !result.Want(mnt) {
967                         continue
968                 }
969                 err := mnt.Compare(ctx, hash, buf)
970                 if ctx.Err() != nil {
971                         return nil
972                 } else if err == CollisionError {
973                         // Stop if we have a block with same hash but
974                         // different content. (It will be impossible
975                         // to tell which one is wanted if we have
976                         // both, so there's no point writing it even
977                         // on a different volume.)
978                         log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
979                         return CollisionError
980                 } else if os.IsNotExist(err) {
981                         // Block does not exist. This is the only
982                         // "normal" error: we don't log anything.
983                         continue
984                 } else if err != nil {
985                         // Couldn't open file, data is corrupt on
986                         // disk, etc.: log this abnormal condition,
987                         // and try the next volume.
988                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
989                         continue
990                 }
991                 if err := mnt.Touch(hash); err != nil {
992                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
993                         continue
994                 }
995                 // Compare and Touch both worked --> done.
996                 result.Add(mnt)
997                 if result.Done() {
998                         return nil
999                 }
1000         }
1001         return nil
1002 }
1003
1004 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1005
1006 // IsValidLocator returns true if the specified string is a valid Keep locator.
1007 //   When Keep is extended to support hash types other than MD5,
1008 //   this should be updated to cover those as well.
1009 //
1010 func IsValidLocator(loc string) bool {
1011         return validLocatorRe.MatchString(loc)
1012 }
1013
1014 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1015
1016 // GetAPIToken returns the OAuth2 token from the Authorization
1017 // header of a HTTP request, or an empty string if no matching
1018 // token is found.
1019 func GetAPIToken(req *http.Request) string {
1020         if auth, ok := req.Header["Authorization"]; ok {
1021                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1022                         return match[2]
1023                 }
1024         }
1025         return ""
1026 }
1027
1028 // canDelete returns true if the user identified by apiToken is
1029 // allowed to delete blocks.
1030 func (rtr *router) canDelete(apiToken string) bool {
1031         if apiToken == "" {
1032                 return false
1033         }
1034         // Blocks may be deleted only when Keep has been configured with a
1035         // data manager.
1036         if rtr.isSystemAuth(apiToken) {
1037                 return true
1038         }
1039         // TODO(twp): look up apiToken with the API server
1040         // return true if is_admin is true and if the token
1041         // has unlimited scope
1042         return false
1043 }
1044
1045 // isSystemAuth returns true if the given token is allowed to perform
1046 // system level actions like deleting data.
1047 func (rtr *router) isSystemAuth(token string) bool {
1048         return token != "" && token == rtr.cluster.SystemRootToken
1049 }