Warn if MaxKeepBlobBuffers > MaxConcurrentRequests.
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "git.arvados.org/arvados.git/sdk/go/health"
26         "git.arvados.org/arvados.git/sdk/go/httpserver"
27         "github.com/gorilla/mux"
28         "github.com/prometheus/client_golang/prometheus"
29         "github.com/sirupsen/logrus"
30 )
31
32 type router struct {
33         *mux.Router
34         cluster     *arvados.Cluster
35         logger      logrus.FieldLogger
36         remoteProxy remoteProxy
37         metrics     *nodeMetrics
38         volmgr      *RRVolumeManager
39         pullq       *WorkQueue
40         trashq      *WorkQueue
41 }
42
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
46         rtr := &router{
47                 Router:  mux.NewRouter(),
48                 cluster: cluster,
49                 logger:  ctxlog.FromContext(ctx),
50                 metrics: &nodeMetrics{reg: reg},
51                 volmgr:  volmgr,
52                 pullq:   pullq,
53                 trashq:  trashq,
54         }
55
56         rtr.HandleFunc(
57                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
58         rtr.HandleFunc(
59                 `/{hash:[0-9a-f]{32}}+{hints}`,
60                 rtr.handleGET).Methods("GET", "HEAD")
61
62         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64         // List all blocks stored here. Privileged client only.
65         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66         // List blocks stored here whose hash has the given prefix.
67         // Privileged client only.
68         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
69
70         // Internals/debugging info (runtime.MemStats)
71         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
72
73         // List volumes: path, device number, bytes used/avail.
74         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
75
76         // List mounts: UUID, readonly, tier, device ID, ...
77         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
78         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
79         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
80
81         // Replace the current pull queue.
82         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
83
84         // Replace the current trash queue.
85         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
86
87         // Untrash moves blocks from trash back into store
88         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
89
90         rtr.Handle("/_health/{check}", &health.Handler{
91                 Token:  cluster.ManagementToken,
92                 Prefix: "/_health/",
93         }).Methods("GET")
94
95         // Any request which does not match any of these routes gets
96         // 400 Bad Request.
97         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
98
99         rtr.metrics.setupBufferPoolMetrics(bufs)
100         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
101         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
102
103         return rtr
104 }
105
106 // BadRequestHandler is a HandleFunc to address bad requests.
107 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
108         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
109 }
110
111 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
112         ctx, cancel := contextForResponse(context.TODO(), resp)
113         defer cancel()
114
115         locator := req.URL.Path[1:]
116         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
117                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
118                 return
119         }
120
121         if rtr.cluster.Collections.BlobSigning {
122                 locator := req.URL.Path[1:] // strip leading slash
123                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
124                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
125                         return
126                 }
127         }
128
129         // TODO: Probe volumes to check whether the block _might_
130         // exist. Some volumes/types could support a quick existence
131         // check without causing other operations to suffer. If all
132         // volumes support that, and assure us the block definitely
133         // isn't here, we can return 404 now instead of waiting for a
134         // buffer.
135
136         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
137         if err != nil {
138                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
139                 return
140         }
141         defer bufs.Put(buf)
142
143         size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
144         if err != nil {
145                 code := http.StatusInternalServerError
146                 if err, ok := err.(*KeepError); ok {
147                         code = err.HTTPCode
148                 }
149                 http.Error(resp, err.Error(), code)
150                 return
151         }
152
153         resp.Header().Set("Content-Length", strconv.Itoa(size))
154         resp.Header().Set("Content-Type", "application/octet-stream")
155         resp.Write(buf[:size])
156 }
157
158 // Return a new context that gets cancelled by resp's CloseNotifier.
159 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
160         ctx, cancel := context.WithCancel(parent)
161         if cn, ok := resp.(http.CloseNotifier); ok {
162                 go func(c <-chan bool) {
163                         select {
164                         case <-c:
165                                 cancel()
166                         case <-ctx.Done():
167                         }
168                 }(cn.CloseNotify())
169         }
170         return ctx, cancel
171 }
172
173 // Get a buffer from the pool -- but give up and return a non-nil
174 // error if ctx ends before we get a buffer.
175 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
176         bufReady := make(chan []byte)
177         go func() {
178                 bufReady <- bufs.Get(bufSize)
179         }()
180         select {
181         case buf := <-bufReady:
182                 return buf, nil
183         case <-ctx.Done():
184                 go func() {
185                         // Even if closeNotifier happened first, we
186                         // need to keep waiting for our buf so we can
187                         // return it to the pool.
188                         bufs.Put(<-bufReady)
189                 }()
190                 return nil, ErrClientDisconnect
191         }
192 }
193
194 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
195         ctx, cancel := contextForResponse(context.TODO(), resp)
196         defer cancel()
197
198         hash := mux.Vars(req)["hash"]
199
200         // Detect as many error conditions as possible before reading
201         // the body: avoid transmitting data that will not end up
202         // being written anyway.
203
204         if req.ContentLength == -1 {
205                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
206                 return
207         }
208
209         if req.ContentLength > BlockSize {
210                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
211                 return
212         }
213
214         if len(rtr.volmgr.AllWritable()) == 0 {
215                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
216                 return
217         }
218
219         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
220         if err != nil {
221                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
222                 return
223         }
224
225         _, err = io.ReadFull(req.Body, buf)
226         if err != nil {
227                 http.Error(resp, err.Error(), 500)
228                 bufs.Put(buf)
229                 return
230         }
231
232         replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
233         bufs.Put(buf)
234
235         if err != nil {
236                 code := http.StatusInternalServerError
237                 if err, ok := err.(*KeepError); ok {
238                         code = err.HTTPCode
239                 }
240                 http.Error(resp, err.Error(), code)
241                 return
242         }
243
244         // Success; add a size hint, sign the locator if possible, and
245         // return it to the client.
246         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
247         apiToken := GetAPIToken(req)
248         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
249                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
250                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
251         }
252         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
253         resp.Write([]byte(returnHash + "\n"))
254 }
255
256 // IndexHandler responds to "/index", "/index/{prefix}", and
257 // "/mounts/{uuid}/blocks" requests.
258 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
259         if !rtr.isSystemAuth(GetAPIToken(req)) {
260                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
261                 return
262         }
263
264         prefix := mux.Vars(req)["prefix"]
265         if prefix == "" {
266                 req.ParseForm()
267                 prefix = req.Form.Get("prefix")
268         }
269
270         uuid := mux.Vars(req)["uuid"]
271
272         var vols []*VolumeMount
273         if uuid == "" {
274                 vols = rtr.volmgr.AllReadable()
275         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
276                 http.Error(resp, "mount not found", http.StatusNotFound)
277                 return
278         } else {
279                 vols = []*VolumeMount{mnt}
280         }
281
282         for _, v := range vols {
283                 if err := v.IndexTo(prefix, resp); err != nil {
284                         // We can't send an error status/message to
285                         // the client because IndexTo() might have
286                         // already written body content. All we can do
287                         // is log the error in our own logs.
288                         //
289                         // The client must notice the lack of trailing
290                         // newline as an indication that the response
291                         // is incomplete.
292                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
293                         return
294                 }
295         }
296         // An empty line at EOF is the only way the client can be
297         // assured the entire index was received.
298         resp.Write([]byte{'\n'})
299 }
300
301 // MountsHandler responds to "GET /mounts" requests.
302 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
303         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
304         if err != nil {
305                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
306         }
307 }
308
309 // PoolStatus struct
310 type PoolStatus struct {
311         Alloc uint64 `json:"BytesAllocatedCumulative"`
312         Cap   int    `json:"BuffersMax"`
313         Len   int    `json:"BuffersInUse"`
314 }
315
316 type volumeStatusEnt struct {
317         Label         string
318         Status        *VolumeStatus `json:",omitempty"`
319         VolumeStats   *ioStats      `json:",omitempty"`
320         InternalStats interface{}   `json:",omitempty"`
321 }
322
323 // NodeStatus struct
324 type NodeStatus struct {
325         Volumes         []*volumeStatusEnt
326         BufferPool      PoolStatus
327         PullQueue       WorkQueueStatus
328         TrashQueue      WorkQueueStatus
329         RequestsCurrent int
330         RequestsMax     int
331         Version         string
332 }
333
334 var st NodeStatus
335 var stLock sync.Mutex
336
337 // DebugHandler addresses /debug.json requests.
338 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
339         type debugStats struct {
340                 MemStats runtime.MemStats
341         }
342         var ds debugStats
343         runtime.ReadMemStats(&ds.MemStats)
344         data, err := json.Marshal(&ds)
345         if err != nil {
346                 http.Error(resp, err.Error(), http.StatusInternalServerError)
347                 return
348         }
349         resp.Write(data)
350 }
351
352 // StatusHandler addresses /status.json requests.
353 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
354         stLock.Lock()
355         rtr.readNodeStatus(&st)
356         data, err := json.Marshal(&st)
357         stLock.Unlock()
358         if err != nil {
359                 http.Error(resp, err.Error(), http.StatusInternalServerError)
360                 return
361         }
362         resp.Write(data)
363 }
364
365 // populate the given NodeStatus struct with current values.
366 func (rtr *router) readNodeStatus(st *NodeStatus) {
367         st.Version = version
368         vols := rtr.volmgr.AllReadable()
369         if cap(st.Volumes) < len(vols) {
370                 st.Volumes = make([]*volumeStatusEnt, len(vols))
371         }
372         st.Volumes = st.Volumes[:0]
373         for _, vol := range vols {
374                 var internalStats interface{}
375                 if vol, ok := vol.Volume.(InternalStatser); ok {
376                         internalStats = vol.InternalStats()
377                 }
378                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
379                         Label:         vol.String(),
380                         Status:        vol.Status(),
381                         InternalStats: internalStats,
382                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
383                 })
384         }
385         st.BufferPool.Alloc = bufs.Alloc()
386         st.BufferPool.Cap = bufs.Cap()
387         st.BufferPool.Len = bufs.Len()
388         st.PullQueue = getWorkQueueStatus(rtr.pullq)
389         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
390 }
391
392 // return a WorkQueueStatus for the given queue. If q is nil (which
393 // should never happen except in test suites), return a zero status
394 // value instead of crashing.
395 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
396         if q == nil {
397                 // This should only happen during tests.
398                 return WorkQueueStatus{}
399         }
400         return q.Status()
401 }
402
403 // handleDELETE processes DELETE requests.
404 //
405 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
406 // from all connected volumes.
407 //
408 // Only the Data Manager, or an Arvados admin with scope "all", are
409 // allowed to issue DELETE requests.  If a DELETE request is not
410 // authenticated or is issued by a non-admin user, the server returns
411 // a PermissionError.
412 //
413 // Upon receiving a valid request from an authorized user,
414 // handleDELETE deletes all copies of the specified block on local
415 // writable volumes.
416 //
417 // Response format:
418 //
419 // If the requested blocks was not found on any volume, the response
420 // code is HTTP 404 Not Found.
421 //
422 // Otherwise, the response code is 200 OK, with a response body
423 // consisting of the JSON message
424 //
425 //    {"copies_deleted":d,"copies_failed":f}
426 //
427 // where d and f are integers representing the number of blocks that
428 // were successfully and unsuccessfully deleted.
429 //
430 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
431         hash := mux.Vars(req)["hash"]
432
433         // Confirm that this user is an admin and has a token with unlimited scope.
434         var tok = GetAPIToken(req)
435         if tok == "" || !rtr.canDelete(tok) {
436                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
437                 return
438         }
439
440         if !rtr.cluster.Collections.BlobTrash {
441                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
442                 return
443         }
444
445         // Delete copies of this block from all available volumes.
446         // Report how many blocks were successfully deleted, and how
447         // many were found on writable volumes but not deleted.
448         var result struct {
449                 Deleted int `json:"copies_deleted"`
450                 Failed  int `json:"copies_failed"`
451         }
452         for _, vol := range rtr.volmgr.AllWritable() {
453                 if err := vol.Trash(hash); err == nil {
454                         result.Deleted++
455                 } else if os.IsNotExist(err) {
456                         continue
457                 } else {
458                         result.Failed++
459                         ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
460                 }
461         }
462         if result.Deleted == 0 && result.Failed == 0 {
463                 resp.WriteHeader(http.StatusNotFound)
464                 return
465         }
466         body, err := json.Marshal(result)
467         if err != nil {
468                 http.Error(resp, err.Error(), http.StatusInternalServerError)
469                 return
470         }
471         resp.Write(body)
472 }
473
474 /* PullHandler processes "PUT /pull" requests for the data manager.
475    The request body is a JSON message containing a list of pull
476    requests in the following format:
477
478    [
479       {
480          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
481          "servers":[
482                         "keep0.qr1hi.arvadosapi.com:25107",
483                         "keep1.qr1hi.arvadosapi.com:25108"
484                  ]
485           },
486           {
487                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
488                  "servers":[
489                         "10.0.1.5:25107",
490                         "10.0.1.6:25107",
491                         "10.0.1.7:25108"
492                  ]
493           },
494           ...
495    ]
496
497    Each pull request in the list consists of a block locator string
498    and an ordered list of servers.  Keepstore should try to fetch the
499    block from each server in turn.
500
501    If the request has not been sent by the Data Manager, return 401
502    Unauthorized.
503
504    If the JSON unmarshalling fails, return 400 Bad Request.
505 */
506
507 // PullRequest consists of a block locator and an ordered list of servers
508 type PullRequest struct {
509         Locator string   `json:"locator"`
510         Servers []string `json:"servers"`
511
512         // Destination mount, or "" for "anywhere"
513         MountUUID string `json:"mount_uuid"`
514 }
515
516 // PullHandler processes "PUT /pull" requests for the data manager.
517 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
518         // Reject unauthorized requests.
519         if !rtr.isSystemAuth(GetAPIToken(req)) {
520                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
521                 return
522         }
523
524         // Parse the request body.
525         var pr []PullRequest
526         r := json.NewDecoder(req.Body)
527         if err := r.Decode(&pr); err != nil {
528                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
529                 return
530         }
531
532         // We have a properly formatted pull list sent from the data
533         // manager.  Report success and send the list to the pull list
534         // manager for further handling.
535         resp.WriteHeader(http.StatusOK)
536         resp.Write([]byte(
537                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
538
539         plist := list.New()
540         for _, p := range pr {
541                 plist.PushBack(p)
542         }
543         rtr.pullq.ReplaceQueue(plist)
544 }
545
546 // TrashRequest consists of a block locator and its Mtime
547 type TrashRequest struct {
548         Locator    string `json:"locator"`
549         BlockMtime int64  `json:"block_mtime"`
550
551         // Target mount, or "" for "everywhere"
552         MountUUID string `json:"mount_uuid"`
553 }
554
555 // TrashHandler processes /trash requests.
556 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
557         // Reject unauthorized requests.
558         if !rtr.isSystemAuth(GetAPIToken(req)) {
559                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
560                 return
561         }
562
563         // Parse the request body.
564         var trash []TrashRequest
565         r := json.NewDecoder(req.Body)
566         if err := r.Decode(&trash); err != nil {
567                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
568                 return
569         }
570
571         // We have a properly formatted trash list sent from the data
572         // manager.  Report success and send the list to the trash work
573         // queue for further handling.
574         resp.WriteHeader(http.StatusOK)
575         resp.Write([]byte(
576                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
577
578         tlist := list.New()
579         for _, t := range trash {
580                 tlist.PushBack(t)
581         }
582         rtr.trashq.ReplaceQueue(tlist)
583 }
584
585 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
586 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
587         // Reject unauthorized requests.
588         if !rtr.isSystemAuth(GetAPIToken(req)) {
589                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
590                 return
591         }
592
593         log := ctxlog.FromContext(req.Context())
594         hash := mux.Vars(req)["hash"]
595
596         if len(rtr.volmgr.AllWritable()) == 0 {
597                 http.Error(resp, "No writable volumes", http.StatusNotFound)
598                 return
599         }
600
601         var untrashedOn, failedOn []string
602         var numNotFound int
603         for _, vol := range rtr.volmgr.AllWritable() {
604                 err := vol.Untrash(hash)
605
606                 if os.IsNotExist(err) {
607                         numNotFound++
608                 } else if err != nil {
609                         log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
610                         failedOn = append(failedOn, vol.String())
611                 } else {
612                         log.Infof("Untrashed %v on volume %v", hash, vol.String())
613                         untrashedOn = append(untrashedOn, vol.String())
614                 }
615         }
616
617         if numNotFound == len(rtr.volmgr.AllWritable()) {
618                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
619         } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
620                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
621         } else {
622                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
623                 if len(failedOn) > 0 {
624                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
625                         http.Error(resp, respBody, http.StatusInternalServerError)
626                 } else {
627                         fmt.Fprintln(resp, respBody)
628                 }
629         }
630 }
631
632 // GetBlock and PutBlock implement lower-level code for handling
633 // blocks by rooting through volumes connected to the local machine.
634 // Once the handler has determined that system policy permits the
635 // request, it calls these methods to perform the actual operation.
636 //
637 // TODO(twp): this code would probably be better located in the
638 // VolumeManager interface. As an abstraction, the VolumeManager
639 // should be the only part of the code that cares about which volume a
640 // block is stored on, so it should be responsible for figuring out
641 // which volume to check for fetching blocks, storing blocks, etc.
642
643 // GetBlock fetches the block identified by "hash" into the provided
644 // buf, and returns the data size.
645 //
646 // If the block cannot be found on any volume, returns NotFoundError.
647 //
648 // If the block found does not have the correct MD5 hash, returns
649 // DiskHashError.
650 //
651 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
652         log := ctxlog.FromContext(ctx)
653
654         // Attempt to read the requested hash from a keep volume.
655         errorToCaller := NotFoundError
656
657         for _, vol := range volmgr.AllReadable() {
658                 size, err := vol.Get(ctx, hash, buf)
659                 select {
660                 case <-ctx.Done():
661                         return 0, ErrClientDisconnect
662                 default:
663                 }
664                 if err != nil {
665                         // IsNotExist is an expected error and may be
666                         // ignored. All other errors are logged. In
667                         // any case we continue trying to read other
668                         // volumes. If all volumes report IsNotExist,
669                         // we return a NotFoundError.
670                         if !os.IsNotExist(err) {
671                                 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
672                         }
673                         // If some volume returns a transient error, return it to the caller
674                         // instead of "Not found" so it can retry.
675                         if err == VolumeBusyError {
676                                 errorToCaller = err.(*KeepError)
677                         }
678                         continue
679                 }
680                 // Check the file checksum.
681                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
682                 if filehash != hash {
683                         // TODO: Try harder to tell a sysadmin about
684                         // this.
685                         log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
686                         errorToCaller = DiskHashError
687                         continue
688                 }
689                 if errorToCaller == DiskHashError {
690                         log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
691                 }
692                 return size, nil
693         }
694         return 0, errorToCaller
695 }
696
697 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
698 //
699 // PutBlock(ctx, block, hash)
700 //   Stores the BLOCK (identified by the content id HASH) in Keep.
701 //
702 //   The MD5 checksum of the block must be identical to the content id HASH.
703 //   If not, an error is returned.
704 //
705 //   PutBlock stores the BLOCK on the first Keep volume with free space.
706 //   A failure code is returned to the user only if all volumes fail.
707 //
708 //   On success, PutBlock returns nil.
709 //   On failure, it returns a KeepError with one of the following codes:
710 //
711 //   500 Collision
712 //          A different block with the same hash already exists on this
713 //          Keep server.
714 //   422 MD5Fail
715 //          The MD5 hash of the BLOCK does not match the argument HASH.
716 //   503 Full
717 //          There was not enough space left in any Keep volume to store
718 //          the object.
719 //   500 Fail
720 //          The object could not be stored for some other reason (e.g.
721 //          all writes failed). The text of the error message should
722 //          provide as much detail as possible.
723 //
724 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
725         log := ctxlog.FromContext(ctx)
726
727         // Check that BLOCK's checksum matches HASH.
728         blockhash := fmt.Sprintf("%x", md5.Sum(block))
729         if blockhash != hash {
730                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
731                 return 0, RequestHashError
732         }
733
734         // If we already have this data, it's intact on disk, and we
735         // can update its timestamp, return success. If we have
736         // different data with the same hash, return failure.
737         if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
738                 return n, err
739         } else if ctx.Err() != nil {
740                 return 0, ErrClientDisconnect
741         }
742
743         // Choose a Keep volume to write to.
744         // If this volume fails, try all of the volumes in order.
745         if mnt := volmgr.NextWritable(); mnt != nil {
746                 if err := mnt.Put(ctx, hash, block); err != nil {
747                         log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
748                 } else {
749                         return mnt.Replication, nil // success!
750                 }
751         }
752         if ctx.Err() != nil {
753                 return 0, ErrClientDisconnect
754         }
755
756         writables := volmgr.AllWritable()
757         if len(writables) == 0 {
758                 log.Error("no writable volumes")
759                 return 0, FullError
760         }
761
762         allFull := true
763         for _, vol := range writables {
764                 err := vol.Put(ctx, hash, block)
765                 if ctx.Err() != nil {
766                         return 0, ErrClientDisconnect
767                 }
768                 switch err {
769                 case nil:
770                         return vol.Replication, nil // success!
771                 case FullError:
772                         continue
773                 default:
774                         // The volume is not full but the
775                         // write did not succeed.  Report the
776                         // error and continue trying.
777                         allFull = false
778                         log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
779                 }
780         }
781
782         if allFull {
783                 log.Error("all volumes are full")
784                 return 0, FullError
785         }
786         // Already logged the non-full errors.
787         return 0, GenericError
788 }
789
790 // CompareAndTouch returns the current replication level if one of the
791 // volumes already has the given content and it successfully updates
792 // the relevant block's modification time in order to protect it from
793 // premature garbage collection. Otherwise, it returns a non-nil
794 // error.
795 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
796         log := ctxlog.FromContext(ctx)
797         var bestErr error = NotFoundError
798         for _, mnt := range volmgr.AllWritable() {
799                 err := mnt.Compare(ctx, hash, buf)
800                 if ctx.Err() != nil {
801                         return 0, ctx.Err()
802                 } else if err == CollisionError {
803                         // Stop if we have a block with same hash but
804                         // different content. (It will be impossible
805                         // to tell which one is wanted if we have
806                         // both, so there's no point writing it even
807                         // on a different volume.)
808                         log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
809                         return 0, err
810                 } else if os.IsNotExist(err) {
811                         // Block does not exist. This is the only
812                         // "normal" error: we don't log anything.
813                         continue
814                 } else if err != nil {
815                         // Couldn't open file, data is corrupt on
816                         // disk, etc.: log this abnormal condition,
817                         // and try the next volume.
818                         log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
819                         continue
820                 }
821                 if err := mnt.Touch(hash); err != nil {
822                         log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
823                         bestErr = err
824                         continue
825                 }
826                 // Compare and Touch both worked --> done.
827                 return mnt.Replication, nil
828         }
829         return 0, bestErr
830 }
831
832 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
833
834 // IsValidLocator returns true if the specified string is a valid Keep locator.
835 //   When Keep is extended to support hash types other than MD5,
836 //   this should be updated to cover those as well.
837 //
838 func IsValidLocator(loc string) bool {
839         return validLocatorRe.MatchString(loc)
840 }
841
842 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
843
844 // GetAPIToken returns the OAuth2 token from the Authorization
845 // header of a HTTP request, or an empty string if no matching
846 // token is found.
847 func GetAPIToken(req *http.Request) string {
848         if auth, ok := req.Header["Authorization"]; ok {
849                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
850                         return match[2]
851                 }
852         }
853         return ""
854 }
855
856 // canDelete returns true if the user identified by apiToken is
857 // allowed to delete blocks.
858 func (rtr *router) canDelete(apiToken string) bool {
859         if apiToken == "" {
860                 return false
861         }
862         // Blocks may be deleted only when Keep has been configured with a
863         // data manager.
864         if rtr.isSystemAuth(apiToken) {
865                 return true
866         }
867         // TODO(twp): look up apiToken with the API server
868         // return true if is_admin is true and if the token
869         // has unlimited scope
870         return false
871 }
872
873 // isSystemAuth returns true if the given token is allowed to perform
874 // system level actions like deleting data.
875 func (rtr *router) isSystemAuth(token string) bool {
876         return token != "" && token == rtr.cluster.SystemRootToken
877 }