11906: pass a func() to healthCheckDo from HealthCheckPingHandler
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // REST handlers for Keep are implemented here.
8 //
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler    (GET /index, GET /index/prefix)
12 // StatusHandler   (GET /status.json)
13
14 import (
15         "container/list"
16         "context"
17         "crypto/md5"
18         "encoding/json"
19         "fmt"
20         "io"
21         "net/http"
22         "os"
23         "regexp"
24         "runtime"
25         "strconv"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gorilla/mux"
31
32         "git.curoverse.com/arvados.git/sdk/go/httpserver"
33         log "github.com/Sirupsen/logrus"
34 )
35
36 type router struct {
37         *mux.Router
38         limiter httpserver.RequestCounter
39 }
40
41 // MakeRESTRouter returns a new router that forwards all Keep requests
42 // to the appropriate handlers.
43 func MakeRESTRouter() *router {
44         rest := mux.NewRouter()
45         rtr := &router{Router: rest}
46
47         rest.HandleFunc(
48                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
49         rest.HandleFunc(
50                 `/{hash:[0-9a-f]{32}}+{hints}`,
51                 GetBlockHandler).Methods("GET", "HEAD")
52
53         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
54         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
55         // List all blocks stored here. Privileged client only.
56         rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
57         // List blocks stored here whose hash has the given prefix.
58         // Privileged client only.
59         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
60
61         // Internals/debugging info (runtime.MemStats)
62         rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
63
64         // List volumes: path, device number, bytes used/avail.
65         rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
66
67         // List mounts: UUID, readonly, tier, device ID, ...
68         rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
69         rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
70         rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
71
72         // Replace the current pull queue.
73         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
74
75         // Replace the current trash queue.
76         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
77
78         // Untrash moves blocks from trash back into store
79         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
80
81         // Health check ping
82         rest.HandleFunc(`/_health/ping`, HealthCheckPingHandler).Methods("GET")
83
84         // Any request which does not match any of these routes gets
85         // 400 Bad Request.
86         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
87
88         return rtr
89 }
90
91 // BadRequestHandler is a HandleFunc to address bad requests.
92 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
93         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
94 }
95
96 // GetBlockHandler is a HandleFunc to address Get block requests.
97 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
98         ctx, cancel := contextForResponse(context.TODO(), resp)
99         defer cancel()
100
101         if theConfig.RequireSignatures {
102                 locator := req.URL.Path[1:] // strip leading slash
103                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
104                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
105                         return
106                 }
107         }
108
109         // TODO: Probe volumes to check whether the block _might_
110         // exist. Some volumes/types could support a quick existence
111         // check without causing other operations to suffer. If all
112         // volumes support that, and assure us the block definitely
113         // isn't here, we can return 404 now instead of waiting for a
114         // buffer.
115
116         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
117         if err != nil {
118                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
119                 return
120         }
121         defer bufs.Put(buf)
122
123         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
124         if err != nil {
125                 code := http.StatusInternalServerError
126                 if err, ok := err.(*KeepError); ok {
127                         code = err.HTTPCode
128                 }
129                 http.Error(resp, err.Error(), code)
130                 return
131         }
132
133         resp.Header().Set("Content-Length", strconv.Itoa(size))
134         resp.Header().Set("Content-Type", "application/octet-stream")
135         resp.Write(buf[:size])
136 }
137
138 // Return a new context that gets cancelled by resp's CloseNotifier.
139 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
140         ctx, cancel := context.WithCancel(parent)
141         if cn, ok := resp.(http.CloseNotifier); ok {
142                 go func(c <-chan bool) {
143                         select {
144                         case <-c:
145                                 theConfig.debugLogf("cancel context")
146                                 cancel()
147                         case <-ctx.Done():
148                         }
149                 }(cn.CloseNotify())
150         }
151         return ctx, cancel
152 }
153
154 // Get a buffer from the pool -- but give up and return a non-nil
155 // error if ctx ends before we get a buffer.
156 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
157         bufReady := make(chan []byte)
158         go func() {
159                 bufReady <- bufs.Get(bufSize)
160         }()
161         select {
162         case buf := <-bufReady:
163                 return buf, nil
164         case <-ctx.Done():
165                 go func() {
166                         // Even if closeNotifier happened first, we
167                         // need to keep waiting for our buf so we can
168                         // return it to the pool.
169                         bufs.Put(<-bufReady)
170                 }()
171                 return nil, ErrClientDisconnect
172         }
173 }
174
175 // PutBlockHandler is a HandleFunc to address Put block requests.
176 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
177         ctx, cancel := contextForResponse(context.TODO(), resp)
178         defer cancel()
179
180         hash := mux.Vars(req)["hash"]
181
182         // Detect as many error conditions as possible before reading
183         // the body: avoid transmitting data that will not end up
184         // being written anyway.
185
186         if req.ContentLength == -1 {
187                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
188                 return
189         }
190
191         if req.ContentLength > BlockSize {
192                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
193                 return
194         }
195
196         if len(KeepVM.AllWritable()) == 0 {
197                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
198                 return
199         }
200
201         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
202         if err != nil {
203                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
204                 return
205         }
206
207         _, err = io.ReadFull(req.Body, buf)
208         if err != nil {
209                 http.Error(resp, err.Error(), 500)
210                 bufs.Put(buf)
211                 return
212         }
213
214         replication, err := PutBlock(ctx, buf, hash)
215         bufs.Put(buf)
216
217         if err != nil {
218                 code := http.StatusInternalServerError
219                 if err, ok := err.(*KeepError); ok {
220                         code = err.HTTPCode
221                 }
222                 http.Error(resp, err.Error(), code)
223                 return
224         }
225
226         // Success; add a size hint, sign the locator if possible, and
227         // return it to the client.
228         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
229         apiToken := GetAPIToken(req)
230         if theConfig.blobSigningKey != nil && apiToken != "" {
231                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
232                 returnHash = SignLocator(returnHash, apiToken, expiry)
233         }
234         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
235         resp.Write([]byte(returnHash + "\n"))
236 }
237
238 // IndexHandler responds to "/index", "/index/{prefix}", and
239 // "/mounts/{uuid}/blocks" requests.
240 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
241         if !IsSystemAuth(GetAPIToken(req)) {
242                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
243                 return
244         }
245
246         prefix := mux.Vars(req)["prefix"]
247         if prefix == "" {
248                 req.ParseForm()
249                 prefix = req.Form.Get("prefix")
250         }
251
252         uuid := mux.Vars(req)["uuid"]
253
254         var vols []Volume
255         if uuid == "" {
256                 vols = KeepVM.AllReadable()
257         } else if v := KeepVM.Lookup(uuid, false); v == nil {
258                 http.Error(resp, "mount not found", http.StatusNotFound)
259                 return
260         } else {
261                 vols = []Volume{v}
262         }
263
264         for _, v := range vols {
265                 if err := v.IndexTo(prefix, resp); err != nil {
266                         // The only errors returned by IndexTo are
267                         // write errors returned by resp.Write(),
268                         // which probably means the client has
269                         // disconnected and this error will never be
270                         // reported to the client -- but it will
271                         // appear in our own error log.
272                         http.Error(resp, err.Error(), http.StatusInternalServerError)
273                         return
274                 }
275         }
276         // An empty line at EOF is the only way the client can be
277         // assured the entire index was received.
278         resp.Write([]byte{'\n'})
279 }
280
281 // MountsHandler responds to "GET /mounts" requests.
282 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
283         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
284         if err != nil {
285                 http.Error(resp, err.Error(), http.StatusInternalServerError)
286         }
287 }
288
289 // PoolStatus struct
290 type PoolStatus struct {
291         Alloc uint64 `json:"BytesAllocated"`
292         Cap   int    `json:"BuffersMax"`
293         Len   int    `json:"BuffersInUse"`
294 }
295
296 type volumeStatusEnt struct {
297         Label         string
298         Status        *VolumeStatus `json:",omitempty"`
299         VolumeStats   *ioStats      `json:",omitempty"`
300         InternalStats interface{}   `json:",omitempty"`
301 }
302
303 // NodeStatus struct
304 type NodeStatus struct {
305         Volumes         []*volumeStatusEnt
306         BufferPool      PoolStatus
307         PullQueue       WorkQueueStatus
308         TrashQueue      WorkQueueStatus
309         RequestsCurrent int
310         RequestsMax     int
311 }
312
313 var st NodeStatus
314 var stLock sync.Mutex
315
316 // DebugHandler addresses /debug.json requests.
317 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
318         type debugStats struct {
319                 MemStats runtime.MemStats
320         }
321         var ds debugStats
322         runtime.ReadMemStats(&ds.MemStats)
323         err := json.NewEncoder(resp).Encode(&ds)
324         if err != nil {
325                 http.Error(resp, err.Error(), 500)
326         }
327 }
328
329 // StatusHandler addresses /status.json requests.
330 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
331         stLock.Lock()
332         rtr.readNodeStatus(&st)
333         jstat, err := json.Marshal(&st)
334         stLock.Unlock()
335         if err == nil {
336                 resp.Write(jstat)
337         } else {
338                 log.Printf("json.Marshal: %s", err)
339                 log.Printf("NodeStatus = %v", &st)
340                 http.Error(resp, err.Error(), 500)
341         }
342 }
343
344 // populate the given NodeStatus struct with current values.
345 func (rtr *router) readNodeStatus(st *NodeStatus) {
346         vols := KeepVM.AllReadable()
347         if cap(st.Volumes) < len(vols) {
348                 st.Volumes = make([]*volumeStatusEnt, len(vols))
349         }
350         st.Volumes = st.Volumes[:0]
351         for _, vol := range vols {
352                 var internalStats interface{}
353                 if vol, ok := vol.(InternalStatser); ok {
354                         internalStats = vol.InternalStats()
355                 }
356                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
357                         Label:         vol.String(),
358                         Status:        vol.Status(),
359                         InternalStats: internalStats,
360                         //VolumeStats: KeepVM.VolumeStats(vol),
361                 })
362         }
363         st.BufferPool.Alloc = bufs.Alloc()
364         st.BufferPool.Cap = bufs.Cap()
365         st.BufferPool.Len = bufs.Len()
366         st.PullQueue = getWorkQueueStatus(pullq)
367         st.TrashQueue = getWorkQueueStatus(trashq)
368         if rtr.limiter != nil {
369                 st.RequestsCurrent = rtr.limiter.Current()
370                 st.RequestsMax = rtr.limiter.Max()
371         }
372 }
373
374 // return a WorkQueueStatus for the given queue. If q is nil (which
375 // should never happen except in test suites), return a zero status
376 // value instead of crashing.
377 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
378         if q == nil {
379                 // This should only happen during tests.
380                 return WorkQueueStatus{}
381         }
382         return q.Status()
383 }
384
385 // DeleteHandler processes DELETE requests.
386 //
387 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
388 // from all connected volumes.
389 //
390 // Only the Data Manager, or an Arvados admin with scope "all", are
391 // allowed to issue DELETE requests.  If a DELETE request is not
392 // authenticated or is issued by a non-admin user, the server returns
393 // a PermissionError.
394 //
395 // Upon receiving a valid request from an authorized user,
396 // DeleteHandler deletes all copies of the specified block on local
397 // writable volumes.
398 //
399 // Response format:
400 //
401 // If the requested blocks was not found on any volume, the response
402 // code is HTTP 404 Not Found.
403 //
404 // Otherwise, the response code is 200 OK, with a response body
405 // consisting of the JSON message
406 //
407 //    {"copies_deleted":d,"copies_failed":f}
408 //
409 // where d and f are integers representing the number of blocks that
410 // were successfully and unsuccessfully deleted.
411 //
412 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
413         hash := mux.Vars(req)["hash"]
414
415         // Confirm that this user is an admin and has a token with unlimited scope.
416         var tok = GetAPIToken(req)
417         if tok == "" || !CanDelete(tok) {
418                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
419                 return
420         }
421
422         if !theConfig.EnableDelete {
423                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
424                 return
425         }
426
427         // Delete copies of this block from all available volumes.
428         // Report how many blocks were successfully deleted, and how
429         // many were found on writable volumes but not deleted.
430         var result struct {
431                 Deleted int `json:"copies_deleted"`
432                 Failed  int `json:"copies_failed"`
433         }
434         for _, vol := range KeepVM.AllWritable() {
435                 if err := vol.Trash(hash); err == nil {
436                         result.Deleted++
437                 } else if os.IsNotExist(err) {
438                         continue
439                 } else {
440                         result.Failed++
441                         log.Println("DeleteHandler:", err)
442                 }
443         }
444
445         var st int
446
447         if result.Deleted == 0 && result.Failed == 0 {
448                 st = http.StatusNotFound
449         } else {
450                 st = http.StatusOK
451         }
452
453         resp.WriteHeader(st)
454
455         if st == http.StatusOK {
456                 if body, err := json.Marshal(result); err == nil {
457                         resp.Write(body)
458                 } else {
459                         log.Printf("json.Marshal: %s (result = %v)", err, result)
460                         http.Error(resp, err.Error(), 500)
461                 }
462         }
463 }
464
465 /* PullHandler processes "PUT /pull" requests for the data manager.
466    The request body is a JSON message containing a list of pull
467    requests in the following format:
468
469    [
470       {
471          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
472          "servers":[
473                         "keep0.qr1hi.arvadosapi.com:25107",
474                         "keep1.qr1hi.arvadosapi.com:25108"
475                  ]
476           },
477           {
478                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
479                  "servers":[
480                         "10.0.1.5:25107",
481                         "10.0.1.6:25107",
482                         "10.0.1.7:25108"
483                  ]
484           },
485           ...
486    ]
487
488    Each pull request in the list consists of a block locator string
489    and an ordered list of servers.  Keepstore should try to fetch the
490    block from each server in turn.
491
492    If the request has not been sent by the Data Manager, return 401
493    Unauthorized.
494
495    If the JSON unmarshalling fails, return 400 Bad Request.
496 */
497
498 // PullRequest consists of a block locator and an ordered list of servers
499 type PullRequest struct {
500         Locator string   `json:"locator"`
501         Servers []string `json:"servers"`
502
503         // Destination mount, or "" for "anywhere"
504         MountUUID string
505 }
506
507 // PullHandler processes "PUT /pull" requests for the data manager.
508 func PullHandler(resp http.ResponseWriter, req *http.Request) {
509         // Reject unauthorized requests.
510         if !IsSystemAuth(GetAPIToken(req)) {
511                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
512                 return
513         }
514
515         // Parse the request body.
516         var pr []PullRequest
517         r := json.NewDecoder(req.Body)
518         if err := r.Decode(&pr); err != nil {
519                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
520                 return
521         }
522
523         // We have a properly formatted pull list sent from the data
524         // manager.  Report success and send the list to the pull list
525         // manager for further handling.
526         resp.WriteHeader(http.StatusOK)
527         resp.Write([]byte(
528                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
529
530         plist := list.New()
531         for _, p := range pr {
532                 plist.PushBack(p)
533         }
534         pullq.ReplaceQueue(plist)
535 }
536
537 // TrashRequest consists of a block locator and it's Mtime
538 type TrashRequest struct {
539         Locator    string `json:"locator"`
540         BlockMtime int64  `json:"block_mtime"`
541
542         // Target mount, or "" for "everywhere"
543         MountUUID string
544 }
545
546 // TrashHandler processes /trash requests.
547 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
548         // Reject unauthorized requests.
549         if !IsSystemAuth(GetAPIToken(req)) {
550                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
551                 return
552         }
553
554         // Parse the request body.
555         var trash []TrashRequest
556         r := json.NewDecoder(req.Body)
557         if err := r.Decode(&trash); err != nil {
558                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
559                 return
560         }
561
562         // We have a properly formatted trash list sent from the data
563         // manager.  Report success and send the list to the trash work
564         // queue for further handling.
565         resp.WriteHeader(http.StatusOK)
566         resp.Write([]byte(
567                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
568
569         tlist := list.New()
570         for _, t := range trash {
571                 tlist.PushBack(t)
572         }
573         trashq.ReplaceQueue(tlist)
574 }
575
576 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
577 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
578         // Reject unauthorized requests.
579         if !IsSystemAuth(GetAPIToken(req)) {
580                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
581                 return
582         }
583
584         hash := mux.Vars(req)["hash"]
585
586         if len(KeepVM.AllWritable()) == 0 {
587                 http.Error(resp, "No writable volumes", http.StatusNotFound)
588                 return
589         }
590
591         var untrashedOn, failedOn []string
592         var numNotFound int
593         for _, vol := range KeepVM.AllWritable() {
594                 err := vol.Untrash(hash)
595
596                 if os.IsNotExist(err) {
597                         numNotFound++
598                 } else if err != nil {
599                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
600                         failedOn = append(failedOn, vol.String())
601                 } else {
602                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
603                         untrashedOn = append(untrashedOn, vol.String())
604                 }
605         }
606
607         if numNotFound == len(KeepVM.AllWritable()) {
608                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
609                 return
610         }
611
612         if len(failedOn) == len(KeepVM.AllWritable()) {
613                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
614         } else {
615                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
616                 if len(failedOn) > 0 {
617                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
618                 }
619                 resp.Write([]byte(respBody))
620         }
621 }
622
623 // HealthCheckPingHandler processes "GET /_health/ping" requests
624 func HealthCheckPingHandler(resp http.ResponseWriter, req *http.Request) {
625         fn := func() interface{} {
626                 return map[string]string{"health": "OK"}
627         }
628
629         healthCheckDo(resp, req, fn)
630 }
631
632 // Any health check handlers can pass this "func" which returns json to healthCheckDo
633 type healthCheckFunc func() interface{}
634
635 func healthCheckDo(resp http.ResponseWriter, req *http.Request, fn healthCheckFunc) {
636         msg, code := healthCheckAuth(resp, req)
637         if msg != "" {
638                 http.Error(resp, msg, code)
639                 return
640         }
641
642         ok, err := json.Marshal(fn())
643         if err != nil {
644                 http.Error(resp, err.Error(), 500)
645                 return
646         }
647
648         resp.Write(ok)
649 }
650
651 func healthCheckAuth(resp http.ResponseWriter, req *http.Request) (string, int) {
652         if theConfig.ManagementToken == "" {
653                 return "disabled", http.StatusNotFound
654         } else if h := req.Header.Get("Authorization"); h == "" {
655                 return "authorization required", http.StatusUnauthorized
656         } else if h != "Bearer "+theConfig.ManagementToken {
657                 return "authorization error", http.StatusForbidden
658         }
659         return "", 0
660 }
661
662 // GetBlock and PutBlock implement lower-level code for handling
663 // blocks by rooting through volumes connected to the local machine.
664 // Once the handler has determined that system policy permits the
665 // request, it calls these methods to perform the actual operation.
666 //
667 // TODO(twp): this code would probably be better located in the
668 // VolumeManager interface. As an abstraction, the VolumeManager
669 // should be the only part of the code that cares about which volume a
670 // block is stored on, so it should be responsible for figuring out
671 // which volume to check for fetching blocks, storing blocks, etc.
672
673 // GetBlock fetches the block identified by "hash" into the provided
674 // buf, and returns the data size.
675 //
676 // If the block cannot be found on any volume, returns NotFoundError.
677 //
678 // If the block found does not have the correct MD5 hash, returns
679 // DiskHashError.
680 //
681 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
682         // Attempt to read the requested hash from a keep volume.
683         errorToCaller := NotFoundError
684
685         for _, vol := range KeepVM.AllReadable() {
686                 size, err := vol.Get(ctx, hash, buf)
687                 select {
688                 case <-ctx.Done():
689                         return 0, ErrClientDisconnect
690                 default:
691                 }
692                 if err != nil {
693                         // IsNotExist is an expected error and may be
694                         // ignored. All other errors are logged. In
695                         // any case we continue trying to read other
696                         // volumes. If all volumes report IsNotExist,
697                         // we return a NotFoundError.
698                         if !os.IsNotExist(err) {
699                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
700                         }
701                         continue
702                 }
703                 // Check the file checksum.
704                 //
705                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
706                 if filehash != hash {
707                         // TODO: Try harder to tell a sysadmin about
708                         // this.
709                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
710                                 vol, hash, filehash)
711                         errorToCaller = DiskHashError
712                         continue
713                 }
714                 if errorToCaller == DiskHashError {
715                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
716                                 vol, hash)
717                 }
718                 return size, nil
719         }
720         return 0, errorToCaller
721 }
722
723 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
724 //
725 // PutBlock(ctx, block, hash)
726 //   Stores the BLOCK (identified by the content id HASH) in Keep.
727 //
728 //   The MD5 checksum of the block must be identical to the content id HASH.
729 //   If not, an error is returned.
730 //
731 //   PutBlock stores the BLOCK on the first Keep volume with free space.
732 //   A failure code is returned to the user only if all volumes fail.
733 //
734 //   On success, PutBlock returns nil.
735 //   On failure, it returns a KeepError with one of the following codes:
736 //
737 //   500 Collision
738 //          A different block with the same hash already exists on this
739 //          Keep server.
740 //   422 MD5Fail
741 //          The MD5 hash of the BLOCK does not match the argument HASH.
742 //   503 Full
743 //          There was not enough space left in any Keep volume to store
744 //          the object.
745 //   500 Fail
746 //          The object could not be stored for some other reason (e.g.
747 //          all writes failed). The text of the error message should
748 //          provide as much detail as possible.
749 //
750 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
751         // Check that BLOCK's checksum matches HASH.
752         blockhash := fmt.Sprintf("%x", md5.Sum(block))
753         if blockhash != hash {
754                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
755                 return 0, RequestHashError
756         }
757
758         // If we already have this data, it's intact on disk, and we
759         // can update its timestamp, return success. If we have
760         // different data with the same hash, return failure.
761         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
762                 return n, err
763         } else if ctx.Err() != nil {
764                 return 0, ErrClientDisconnect
765         }
766
767         // Choose a Keep volume to write to.
768         // If this volume fails, try all of the volumes in order.
769         if vol := KeepVM.NextWritable(); vol != nil {
770                 if err := vol.Put(ctx, hash, block); err == nil {
771                         return vol.Replication(), nil // success!
772                 }
773                 if ctx.Err() != nil {
774                         return 0, ErrClientDisconnect
775                 }
776         }
777
778         writables := KeepVM.AllWritable()
779         if len(writables) == 0 {
780                 log.Print("No writable volumes.")
781                 return 0, FullError
782         }
783
784         allFull := true
785         for _, vol := range writables {
786                 err := vol.Put(ctx, hash, block)
787                 if ctx.Err() != nil {
788                         return 0, ErrClientDisconnect
789                 }
790                 if err == nil {
791                         return vol.Replication(), nil // success!
792                 }
793                 if err != FullError {
794                         // The volume is not full but the
795                         // write did not succeed.  Report the
796                         // error and continue trying.
797                         allFull = false
798                         log.Printf("%s: Write(%s): %s", vol, hash, err)
799                 }
800         }
801
802         if allFull {
803                 log.Print("All volumes are full.")
804                 return 0, FullError
805         }
806         // Already logged the non-full errors.
807         return 0, GenericError
808 }
809
810 // CompareAndTouch returns the current replication level if one of the
811 // volumes already has the given content and it successfully updates
812 // the relevant block's modification time in order to protect it from
813 // premature garbage collection. Otherwise, it returns a non-nil
814 // error.
815 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
816         var bestErr error = NotFoundError
817         for _, vol := range KeepVM.AllWritable() {
818                 err := vol.Compare(ctx, hash, buf)
819                 if ctx.Err() != nil {
820                         return 0, ctx.Err()
821                 } else if err == CollisionError {
822                         // Stop if we have a block with same hash but
823                         // different content. (It will be impossible
824                         // to tell which one is wanted if we have
825                         // both, so there's no point writing it even
826                         // on a different volume.)
827                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
828                         return 0, err
829                 } else if os.IsNotExist(err) {
830                         // Block does not exist. This is the only
831                         // "normal" error: we don't log anything.
832                         continue
833                 } else if err != nil {
834                         // Couldn't open file, data is corrupt on
835                         // disk, etc.: log this abnormal condition,
836                         // and try the next volume.
837                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
838                         continue
839                 }
840                 if err := vol.Touch(hash); err != nil {
841                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
842                         bestErr = err
843                         continue
844                 }
845                 // Compare and Touch both worked --> done.
846                 return vol.Replication(), nil
847         }
848         return 0, bestErr
849 }
850
851 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
852
853 // IsValidLocator returns true if the specified string is a valid Keep locator.
854 //   When Keep is extended to support hash types other than MD5,
855 //   this should be updated to cover those as well.
856 //
857 func IsValidLocator(loc string) bool {
858         return validLocatorRe.MatchString(loc)
859 }
860
861 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
862
863 // GetAPIToken returns the OAuth2 token from the Authorization
864 // header of a HTTP request, or an empty string if no matching
865 // token is found.
866 func GetAPIToken(req *http.Request) string {
867         if auth, ok := req.Header["Authorization"]; ok {
868                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
869                         return match[1]
870                 }
871         }
872         return ""
873 }
874
875 // IsExpired returns true if the given Unix timestamp (expressed as a
876 // hexadecimal string) is in the past, or if timestampHex cannot be
877 // parsed as a hexadecimal string.
878 func IsExpired(timestampHex string) bool {
879         ts, err := strconv.ParseInt(timestampHex, 16, 0)
880         if err != nil {
881                 log.Printf("IsExpired: %s", err)
882                 return true
883         }
884         return time.Unix(ts, 0).Before(time.Now())
885 }
886
887 // CanDelete returns true if the user identified by apiToken is
888 // allowed to delete blocks.
889 func CanDelete(apiToken string) bool {
890         if apiToken == "" {
891                 return false
892         }
893         // Blocks may be deleted only when Keep has been configured with a
894         // data manager.
895         if IsSystemAuth(apiToken) {
896                 return true
897         }
898         // TODO(twp): look up apiToken with the API server
899         // return true if is_admin is true and if the token
900         // has unlimited scope
901         return false
902 }
903
904 // IsSystemAuth returns true if the given token is allowed to perform
905 // system level actions like deleting data.
906 func IsSystemAuth(token string) bool {
907         return token != "" && token == theConfig.systemAuthToken
908 }