Merge branch '15713-controller-error-log'
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "log"
15         "net/http"
16         "os"
17         "regexp"
18         "runtime"
19         "strconv"
20         "strings"
21         "sync"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
26         "git.curoverse.com/arvados.git/sdk/go/health"
27         "git.curoverse.com/arvados.git/sdk/go/httpserver"
28         "github.com/gorilla/mux"
29         "github.com/prometheus/client_golang/prometheus"
30         "github.com/sirupsen/logrus"
31 )
32
33 type router struct {
34         *mux.Router
35         cluster     *arvados.Cluster
36         logger      logrus.FieldLogger
37         remoteProxy remoteProxy
38         metrics     *nodeMetrics
39         volmgr      *RRVolumeManager
40         pullq       *WorkQueue
41         trashq      *WorkQueue
42 }
43
44 // MakeRESTRouter returns a new router that forwards all Keep requests
45 // to the appropriate handlers.
46 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
47         rtr := &router{
48                 Router:  mux.NewRouter(),
49                 cluster: cluster,
50                 logger:  ctxlog.FromContext(ctx),
51                 metrics: &nodeMetrics{reg: reg},
52                 volmgr:  volmgr,
53                 pullq:   pullq,
54                 trashq:  trashq,
55         }
56
57         rtr.HandleFunc(
58                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
59         rtr.HandleFunc(
60                 `/{hash:[0-9a-f]{32}}+{hints}`,
61                 rtr.handleGET).Methods("GET", "HEAD")
62
63         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
64         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
65         // List all blocks stored here. Privileged client only.
66         rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
67         // List blocks stored here whose hash has the given prefix.
68         // Privileged client only.
69         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
70
71         // Internals/debugging info (runtime.MemStats)
72         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
73
74         // List volumes: path, device number, bytes used/avail.
75         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
76
77         // List mounts: UUID, readonly, tier, device ID, ...
78         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
79         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
80         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
81
82         // Replace the current pull queue.
83         rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
84
85         // Replace the current trash queue.
86         rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
87
88         // Untrash moves blocks from trash back into store
89         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
90
91         rtr.Handle("/_health/{check}", &health.Handler{
92                 Token:  cluster.ManagementToken,
93                 Prefix: "/_health/",
94         }).Methods("GET")
95
96         // Any request which does not match any of these routes gets
97         // 400 Bad Request.
98         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
99
100         rtr.metrics.setupBufferPoolMetrics(bufs)
101         rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
102         rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
103
104         return rtr
105 }
106
107 // BadRequestHandler is a HandleFunc to address bad requests.
108 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
109         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
110 }
111
112 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
113         ctx, cancel := contextForResponse(context.TODO(), resp)
114         defer cancel()
115
116         locator := req.URL.Path[1:]
117         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
119                 return
120         }
121
122         if rtr.cluster.Collections.BlobSigning {
123                 locator := req.URL.Path[1:] // strip leading slash
124                 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
126                         return
127                 }
128         }
129
130         // TODO: Probe volumes to check whether the block _might_
131         // exist. Some volumes/types could support a quick existence
132         // check without causing other operations to suffer. If all
133         // volumes support that, and assure us the block definitely
134         // isn't here, we can return 404 now instead of waiting for a
135         // buffer.
136
137         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
138         if err != nil {
139                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
140                 return
141         }
142         defer bufs.Put(buf)
143
144         size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
145         if err != nil {
146                 code := http.StatusInternalServerError
147                 if err, ok := err.(*KeepError); ok {
148                         code = err.HTTPCode
149                 }
150                 http.Error(resp, err.Error(), code)
151                 return
152         }
153
154         resp.Header().Set("Content-Length", strconv.Itoa(size))
155         resp.Header().Set("Content-Type", "application/octet-stream")
156         resp.Write(buf[:size])
157 }
158
159 // Return a new context that gets cancelled by resp's CloseNotifier.
160 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
161         ctx, cancel := context.WithCancel(parent)
162         if cn, ok := resp.(http.CloseNotifier); ok {
163                 go func(c <-chan bool) {
164                         select {
165                         case <-c:
166                                 cancel()
167                         case <-ctx.Done():
168                         }
169                 }(cn.CloseNotify())
170         }
171         return ctx, cancel
172 }
173
174 // Get a buffer from the pool -- but give up and return a non-nil
175 // error if ctx ends before we get a buffer.
176 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
177         bufReady := make(chan []byte)
178         go func() {
179                 bufReady <- bufs.Get(bufSize)
180         }()
181         select {
182         case buf := <-bufReady:
183                 return buf, nil
184         case <-ctx.Done():
185                 go func() {
186                         // Even if closeNotifier happened first, we
187                         // need to keep waiting for our buf so we can
188                         // return it to the pool.
189                         bufs.Put(<-bufReady)
190                 }()
191                 return nil, ErrClientDisconnect
192         }
193 }
194
195 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
196         ctx, cancel := contextForResponse(context.TODO(), resp)
197         defer cancel()
198
199         hash := mux.Vars(req)["hash"]
200
201         // Detect as many error conditions as possible before reading
202         // the body: avoid transmitting data that will not end up
203         // being written anyway.
204
205         if req.ContentLength == -1 {
206                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
207                 return
208         }
209
210         if req.ContentLength > BlockSize {
211                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
212                 return
213         }
214
215         if len(rtr.volmgr.AllWritable()) == 0 {
216                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
217                 return
218         }
219
220         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
221         if err != nil {
222                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
223                 return
224         }
225
226         _, err = io.ReadFull(req.Body, buf)
227         if err != nil {
228                 http.Error(resp, err.Error(), 500)
229                 bufs.Put(buf)
230                 return
231         }
232
233         replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
234         bufs.Put(buf)
235
236         if err != nil {
237                 code := http.StatusInternalServerError
238                 if err, ok := err.(*KeepError); ok {
239                         code = err.HTTPCode
240                 }
241                 http.Error(resp, err.Error(), code)
242                 return
243         }
244
245         // Success; add a size hint, sign the locator if possible, and
246         // return it to the client.
247         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
248         apiToken := GetAPIToken(req)
249         if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
250                 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
251                 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
252         }
253         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
254         resp.Write([]byte(returnHash + "\n"))
255 }
256
257 // IndexHandler responds to "/index", "/index/{prefix}", and
258 // "/mounts/{uuid}/blocks" requests.
259 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
260         if !rtr.isSystemAuth(GetAPIToken(req)) {
261                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
262                 return
263         }
264
265         prefix := mux.Vars(req)["prefix"]
266         if prefix == "" {
267                 req.ParseForm()
268                 prefix = req.Form.Get("prefix")
269         }
270
271         uuid := mux.Vars(req)["uuid"]
272
273         var vols []*VolumeMount
274         if uuid == "" {
275                 vols = rtr.volmgr.AllReadable()
276         } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
277                 http.Error(resp, "mount not found", http.StatusNotFound)
278                 return
279         } else {
280                 vols = []*VolumeMount{mnt}
281         }
282
283         for _, v := range vols {
284                 if err := v.IndexTo(prefix, resp); err != nil {
285                         // We can't send an error message to the
286                         // client because we might have already sent
287                         // headers and index content. All we can do is
288                         // log the error in our own logs, and (in
289                         // cases where headers haven't been sent yet)
290                         // set a 500 status.
291                         //
292                         // If headers have already been sent, the
293                         // client must notice the lack of trailing
294                         // newline as an indication that the response
295                         // is incomplete.
296                         log.Printf("index error from volume %s: %s", v, err)
297                         http.Error(resp, "", http.StatusInternalServerError)
298                         return
299                 }
300         }
301         // An empty line at EOF is the only way the client can be
302         // assured the entire index was received.
303         resp.Write([]byte{'\n'})
304 }
305
306 // MountsHandler responds to "GET /mounts" requests.
307 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
308         err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
309         if err != nil {
310                 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
311         }
312 }
313
314 // PoolStatus struct
315 type PoolStatus struct {
316         Alloc uint64 `json:"BytesAllocatedCumulative"`
317         Cap   int    `json:"BuffersMax"`
318         Len   int    `json:"BuffersInUse"`
319 }
320
321 type volumeStatusEnt struct {
322         Label         string
323         Status        *VolumeStatus `json:",omitempty"`
324         VolumeStats   *ioStats      `json:",omitempty"`
325         InternalStats interface{}   `json:",omitempty"`
326 }
327
328 // NodeStatus struct
329 type NodeStatus struct {
330         Volumes         []*volumeStatusEnt
331         BufferPool      PoolStatus
332         PullQueue       WorkQueueStatus
333         TrashQueue      WorkQueueStatus
334         RequestsCurrent int
335         RequestsMax     int
336         Version         string
337 }
338
339 var st NodeStatus
340 var stLock sync.Mutex
341
342 // DebugHandler addresses /debug.json requests.
343 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
344         type debugStats struct {
345                 MemStats runtime.MemStats
346         }
347         var ds debugStats
348         runtime.ReadMemStats(&ds.MemStats)
349         err := json.NewEncoder(resp).Encode(&ds)
350         if err != nil {
351                 http.Error(resp, err.Error(), 500)
352         }
353 }
354
355 // StatusHandler addresses /status.json requests.
356 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
357         stLock.Lock()
358         rtr.readNodeStatus(&st)
359         jstat, err := json.Marshal(&st)
360         stLock.Unlock()
361         if err == nil {
362                 resp.Write(jstat)
363         } else {
364                 log.Printf("json.Marshal: %s", err)
365                 log.Printf("NodeStatus = %v", &st)
366                 http.Error(resp, err.Error(), 500)
367         }
368 }
369
370 // populate the given NodeStatus struct with current values.
371 func (rtr *router) readNodeStatus(st *NodeStatus) {
372         st.Version = version
373         vols := rtr.volmgr.AllReadable()
374         if cap(st.Volumes) < len(vols) {
375                 st.Volumes = make([]*volumeStatusEnt, len(vols))
376         }
377         st.Volumes = st.Volumes[:0]
378         for _, vol := range vols {
379                 var internalStats interface{}
380                 if vol, ok := vol.Volume.(InternalStatser); ok {
381                         internalStats = vol.InternalStats()
382                 }
383                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
384                         Label:         vol.String(),
385                         Status:        vol.Status(),
386                         InternalStats: internalStats,
387                         //VolumeStats: rtr.volmgr.VolumeStats(vol),
388                 })
389         }
390         st.BufferPool.Alloc = bufs.Alloc()
391         st.BufferPool.Cap = bufs.Cap()
392         st.BufferPool.Len = bufs.Len()
393         st.PullQueue = getWorkQueueStatus(rtr.pullq)
394         st.TrashQueue = getWorkQueueStatus(rtr.trashq)
395 }
396
397 // return a WorkQueueStatus for the given queue. If q is nil (which
398 // should never happen except in test suites), return a zero status
399 // value instead of crashing.
400 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
401         if q == nil {
402                 // This should only happen during tests.
403                 return WorkQueueStatus{}
404         }
405         return q.Status()
406 }
407
408 // handleDELETE processes DELETE requests.
409 //
410 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
411 // from all connected volumes.
412 //
413 // Only the Data Manager, or an Arvados admin with scope "all", are
414 // allowed to issue DELETE requests.  If a DELETE request is not
415 // authenticated or is issued by a non-admin user, the server returns
416 // a PermissionError.
417 //
418 // Upon receiving a valid request from an authorized user,
419 // handleDELETE deletes all copies of the specified block on local
420 // writable volumes.
421 //
422 // Response format:
423 //
424 // If the requested blocks was not found on any volume, the response
425 // code is HTTP 404 Not Found.
426 //
427 // Otherwise, the response code is 200 OK, with a response body
428 // consisting of the JSON message
429 //
430 //    {"copies_deleted":d,"copies_failed":f}
431 //
432 // where d and f are integers representing the number of blocks that
433 // were successfully and unsuccessfully deleted.
434 //
435 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
436         hash := mux.Vars(req)["hash"]
437
438         // Confirm that this user is an admin and has a token with unlimited scope.
439         var tok = GetAPIToken(req)
440         if tok == "" || !rtr.canDelete(tok) {
441                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
442                 return
443         }
444
445         if !rtr.cluster.Collections.BlobTrash {
446                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
447                 return
448         }
449
450         // Delete copies of this block from all available volumes.
451         // Report how many blocks were successfully deleted, and how
452         // many were found on writable volumes but not deleted.
453         var result struct {
454                 Deleted int `json:"copies_deleted"`
455                 Failed  int `json:"copies_failed"`
456         }
457         for _, vol := range rtr.volmgr.AllWritable() {
458                 if err := vol.Trash(hash); err == nil {
459                         result.Deleted++
460                 } else if os.IsNotExist(err) {
461                         continue
462                 } else {
463                         result.Failed++
464                         log.Println("DeleteHandler:", err)
465                 }
466         }
467
468         var st int
469
470         if result.Deleted == 0 && result.Failed == 0 {
471                 st = http.StatusNotFound
472         } else {
473                 st = http.StatusOK
474         }
475
476         resp.WriteHeader(st)
477
478         if st == http.StatusOK {
479                 if body, err := json.Marshal(result); err == nil {
480                         resp.Write(body)
481                 } else {
482                         log.Printf("json.Marshal: %s (result = %v)", err, result)
483                         http.Error(resp, err.Error(), 500)
484                 }
485         }
486 }
487
488 /* PullHandler processes "PUT /pull" requests for the data manager.
489    The request body is a JSON message containing a list of pull
490    requests in the following format:
491
492    [
493       {
494          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
495          "servers":[
496                         "keep0.qr1hi.arvadosapi.com:25107",
497                         "keep1.qr1hi.arvadosapi.com:25108"
498                  ]
499           },
500           {
501                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
502                  "servers":[
503                         "10.0.1.5:25107",
504                         "10.0.1.6:25107",
505                         "10.0.1.7:25108"
506                  ]
507           },
508           ...
509    ]
510
511    Each pull request in the list consists of a block locator string
512    and an ordered list of servers.  Keepstore should try to fetch the
513    block from each server in turn.
514
515    If the request has not been sent by the Data Manager, return 401
516    Unauthorized.
517
518    If the JSON unmarshalling fails, return 400 Bad Request.
519 */
520
521 // PullRequest consists of a block locator and an ordered list of servers
522 type PullRequest struct {
523         Locator string   `json:"locator"`
524         Servers []string `json:"servers"`
525
526         // Destination mount, or "" for "anywhere"
527         MountUUID string `json:"mount_uuid"`
528 }
529
530 // PullHandler processes "PUT /pull" requests for the data manager.
531 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
532         // Reject unauthorized requests.
533         if !rtr.isSystemAuth(GetAPIToken(req)) {
534                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
535                 return
536         }
537
538         // Parse the request body.
539         var pr []PullRequest
540         r := json.NewDecoder(req.Body)
541         if err := r.Decode(&pr); err != nil {
542                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
543                 return
544         }
545
546         // We have a properly formatted pull list sent from the data
547         // manager.  Report success and send the list to the pull list
548         // manager for further handling.
549         resp.WriteHeader(http.StatusOK)
550         resp.Write([]byte(
551                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
552
553         plist := list.New()
554         for _, p := range pr {
555                 plist.PushBack(p)
556         }
557         rtr.pullq.ReplaceQueue(plist)
558 }
559
560 // TrashRequest consists of a block locator and its Mtime
561 type TrashRequest struct {
562         Locator    string `json:"locator"`
563         BlockMtime int64  `json:"block_mtime"`
564
565         // Target mount, or "" for "everywhere"
566         MountUUID string `json:"mount_uuid"`
567 }
568
569 // TrashHandler processes /trash requests.
570 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
571         // Reject unauthorized requests.
572         if !rtr.isSystemAuth(GetAPIToken(req)) {
573                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
574                 return
575         }
576
577         // Parse the request body.
578         var trash []TrashRequest
579         r := json.NewDecoder(req.Body)
580         if err := r.Decode(&trash); err != nil {
581                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
582                 return
583         }
584
585         // We have a properly formatted trash list sent from the data
586         // manager.  Report success and send the list to the trash work
587         // queue for further handling.
588         resp.WriteHeader(http.StatusOK)
589         resp.Write([]byte(
590                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
591
592         tlist := list.New()
593         for _, t := range trash {
594                 tlist.PushBack(t)
595         }
596         rtr.trashq.ReplaceQueue(tlist)
597 }
598
599 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
600 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
601         // Reject unauthorized requests.
602         if !rtr.isSystemAuth(GetAPIToken(req)) {
603                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
604                 return
605         }
606
607         hash := mux.Vars(req)["hash"]
608
609         if len(rtr.volmgr.AllWritable()) == 0 {
610                 http.Error(resp, "No writable volumes", http.StatusNotFound)
611                 return
612         }
613
614         var untrashedOn, failedOn []string
615         var numNotFound int
616         for _, vol := range rtr.volmgr.AllWritable() {
617                 err := vol.Untrash(hash)
618
619                 if os.IsNotExist(err) {
620                         numNotFound++
621                 } else if err != nil {
622                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
623                         failedOn = append(failedOn, vol.String())
624                 } else {
625                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
626                         untrashedOn = append(untrashedOn, vol.String())
627                 }
628         }
629
630         if numNotFound == len(rtr.volmgr.AllWritable()) {
631                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
632                 return
633         }
634
635         if len(failedOn) == len(rtr.volmgr.AllWritable()) {
636                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
637         } else {
638                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
639                 if len(failedOn) > 0 {
640                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
641                 }
642                 resp.Write([]byte(respBody))
643         }
644 }
645
646 // GetBlock and PutBlock implement lower-level code for handling
647 // blocks by rooting through volumes connected to the local machine.
648 // Once the handler has determined that system policy permits the
649 // request, it calls these methods to perform the actual operation.
650 //
651 // TODO(twp): this code would probably be better located in the
652 // VolumeManager interface. As an abstraction, the VolumeManager
653 // should be the only part of the code that cares about which volume a
654 // block is stored on, so it should be responsible for figuring out
655 // which volume to check for fetching blocks, storing blocks, etc.
656
657 // GetBlock fetches the block identified by "hash" into the provided
658 // buf, and returns the data size.
659 //
660 // If the block cannot be found on any volume, returns NotFoundError.
661 //
662 // If the block found does not have the correct MD5 hash, returns
663 // DiskHashError.
664 //
665 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
666         // Attempt to read the requested hash from a keep volume.
667         errorToCaller := NotFoundError
668
669         for _, vol := range volmgr.AllReadable() {
670                 size, err := vol.Get(ctx, hash, buf)
671                 select {
672                 case <-ctx.Done():
673                         return 0, ErrClientDisconnect
674                 default:
675                 }
676                 if err != nil {
677                         // IsNotExist is an expected error and may be
678                         // ignored. All other errors are logged. In
679                         // any case we continue trying to read other
680                         // volumes. If all volumes report IsNotExist,
681                         // we return a NotFoundError.
682                         if !os.IsNotExist(err) {
683                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
684                         }
685                         // If some volume returns a transient error, return it to the caller
686                         // instead of "Not found" so it can retry.
687                         if err == VolumeBusyError {
688                                 errorToCaller = err.(*KeepError)
689                         }
690                         continue
691                 }
692                 // Check the file checksum.
693                 //
694                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
695                 if filehash != hash {
696                         // TODO: Try harder to tell a sysadmin about
697                         // this.
698                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
699                                 vol, hash, filehash)
700                         errorToCaller = DiskHashError
701                         continue
702                 }
703                 if errorToCaller == DiskHashError {
704                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
705                                 vol, hash)
706                 }
707                 return size, nil
708         }
709         return 0, errorToCaller
710 }
711
712 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
713 //
714 // PutBlock(ctx, block, hash)
715 //   Stores the BLOCK (identified by the content id HASH) in Keep.
716 //
717 //   The MD5 checksum of the block must be identical to the content id HASH.
718 //   If not, an error is returned.
719 //
720 //   PutBlock stores the BLOCK on the first Keep volume with free space.
721 //   A failure code is returned to the user only if all volumes fail.
722 //
723 //   On success, PutBlock returns nil.
724 //   On failure, it returns a KeepError with one of the following codes:
725 //
726 //   500 Collision
727 //          A different block with the same hash already exists on this
728 //          Keep server.
729 //   422 MD5Fail
730 //          The MD5 hash of the BLOCK does not match the argument HASH.
731 //   503 Full
732 //          There was not enough space left in any Keep volume to store
733 //          the object.
734 //   500 Fail
735 //          The object could not be stored for some other reason (e.g.
736 //          all writes failed). The text of the error message should
737 //          provide as much detail as possible.
738 //
739 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
740         // Check that BLOCK's checksum matches HASH.
741         blockhash := fmt.Sprintf("%x", md5.Sum(block))
742         if blockhash != hash {
743                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
744                 return 0, RequestHashError
745         }
746
747         // If we already have this data, it's intact on disk, and we
748         // can update its timestamp, return success. If we have
749         // different data with the same hash, return failure.
750         if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
751                 return n, err
752         } else if ctx.Err() != nil {
753                 return 0, ErrClientDisconnect
754         }
755
756         // Choose a Keep volume to write to.
757         // If this volume fails, try all of the volumes in order.
758         if mnt := volmgr.NextWritable(); mnt != nil {
759                 if err := mnt.Put(ctx, hash, block); err == nil {
760                         return mnt.Replication, nil // success!
761                 }
762                 if ctx.Err() != nil {
763                         return 0, ErrClientDisconnect
764                 }
765         }
766
767         writables := volmgr.AllWritable()
768         if len(writables) == 0 {
769                 log.Print("No writable volumes.")
770                 return 0, FullError
771         }
772
773         allFull := true
774         for _, vol := range writables {
775                 err := vol.Put(ctx, hash, block)
776                 if ctx.Err() != nil {
777                         return 0, ErrClientDisconnect
778                 }
779                 if err == nil {
780                         return vol.Replication, nil // success!
781                 }
782                 if err != FullError {
783                         // The volume is not full but the
784                         // write did not succeed.  Report the
785                         // error and continue trying.
786                         allFull = false
787                         log.Printf("%s: Write(%s): %s", vol, hash, err)
788                 }
789         }
790
791         if allFull {
792                 log.Print("All volumes are full.")
793                 return 0, FullError
794         }
795         // Already logged the non-full errors.
796         return 0, GenericError
797 }
798
799 // CompareAndTouch returns the current replication level if one of the
800 // volumes already has the given content and it successfully updates
801 // the relevant block's modification time in order to protect it from
802 // premature garbage collection. Otherwise, it returns a non-nil
803 // error.
804 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
805         var bestErr error = NotFoundError
806         for _, mnt := range volmgr.AllWritable() {
807                 err := mnt.Compare(ctx, hash, buf)
808                 if ctx.Err() != nil {
809                         return 0, ctx.Err()
810                 } else if err == CollisionError {
811                         // Stop if we have a block with same hash but
812                         // different content. (It will be impossible
813                         // to tell which one is wanted if we have
814                         // both, so there's no point writing it even
815                         // on a different volume.)
816                         log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
817                         return 0, err
818                 } else if os.IsNotExist(err) {
819                         // Block does not exist. This is the only
820                         // "normal" error: we don't log anything.
821                         continue
822                 } else if err != nil {
823                         // Couldn't open file, data is corrupt on
824                         // disk, etc.: log this abnormal condition,
825                         // and try the next volume.
826                         log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
827                         continue
828                 }
829                 if err := mnt.Touch(hash); err != nil {
830                         log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
831                         bestErr = err
832                         continue
833                 }
834                 // Compare and Touch both worked --> done.
835                 return mnt.Replication, nil
836         }
837         return 0, bestErr
838 }
839
840 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
841
842 // IsValidLocator returns true if the specified string is a valid Keep locator.
843 //   When Keep is extended to support hash types other than MD5,
844 //   this should be updated to cover those as well.
845 //
846 func IsValidLocator(loc string) bool {
847         return validLocatorRe.MatchString(loc)
848 }
849
850 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
851
852 // GetAPIToken returns the OAuth2 token from the Authorization
853 // header of a HTTP request, or an empty string if no matching
854 // token is found.
855 func GetAPIToken(req *http.Request) string {
856         if auth, ok := req.Header["Authorization"]; ok {
857                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
858                         return match[2]
859                 }
860         }
861         return ""
862 }
863
864 // IsExpired returns true if the given Unix timestamp (expressed as a
865 // hexadecimal string) is in the past, or if timestampHex cannot be
866 // parsed as a hexadecimal string.
867 func IsExpired(timestampHex string) bool {
868         ts, err := strconv.ParseInt(timestampHex, 16, 0)
869         if err != nil {
870                 log.Printf("IsExpired: %s", err)
871                 return true
872         }
873         return time.Unix(ts, 0).Before(time.Now())
874 }
875
876 // canDelete returns true if the user identified by apiToken is
877 // allowed to delete blocks.
878 func (rtr *router) canDelete(apiToken string) bool {
879         if apiToken == "" {
880                 return false
881         }
882         // Blocks may be deleted only when Keep has been configured with a
883         // data manager.
884         if rtr.isSystemAuth(apiToken) {
885                 return true
886         }
887         // TODO(twp): look up apiToken with the API server
888         // return true if is_admin is true and if the token
889         // has unlimited scope
890         return false
891 }
892
893 // isSystemAuth returns true if the given token is allowed to perform
894 // system level actions like deleting data.
895 func (rtr *router) isSystemAuth(token string) bool {
896         return token != "" && token == rtr.cluster.SystemRootToken
897 }