54b8b485e1dc99d491bd94d4b5b888b60b990b13
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "crypto/md5"
13         "encoding/json"
14         "fmt"
15         "github.com/gorilla/mux"
16         "io"
17         "log"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "strings"
24         "sync"
25         "time"
26 )
27
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
30 //
31 func MakeRESTRouter() *mux.Router {
32         rest := mux.NewRouter()
33
34         rest.HandleFunc(
35                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}+{hints}`,
38                 GetBlockHandler).Methods("GET", "HEAD")
39
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42         // List all blocks stored here. Privileged client only.
43         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44         // List blocks stored here whose hash has the given prefix.
45         // Privileged client only.
46         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47
48         // List volumes: path, device number, bytes used/avail.
49         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50
51         // Replace the current pull queue.
52         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53
54         // Replace the current trash queue.
55         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56
57         // Untrash moves blocks from trash back into store
58         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
59
60         // Any request which does not match any of these routes gets
61         // 400 Bad Request.
62         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
63
64         return rest
65 }
66
67 // BadRequestHandler is a HandleFunc to address bad requests.
68 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
69         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
70 }
71
72 // GetBlockHandler is a HandleFunc to address Get block requests.
73 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
74         if theConfig.RequireSignatures {
75                 locator := req.URL.Path[1:] // strip leading slash
76                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
77                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
78                         return
79                 }
80         }
81
82         // TODO: Probe volumes to check whether the block _might_
83         // exist. Some volumes/types could support a quick existence
84         // check without causing other operations to suffer. If all
85         // volumes support that, and assure us the block definitely
86         // isn't here, we can return 404 now instead of waiting for a
87         // buffer.
88
89         buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
90         if err != nil {
91                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
92                 return
93         }
94         defer bufs.Put(buf)
95
96         size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
97         if err != nil {
98                 code := http.StatusInternalServerError
99                 if err, ok := err.(*KeepError); ok {
100                         code = err.HTTPCode
101                 }
102                 http.Error(resp, err.Error(), code)
103                 return
104         }
105
106         resp.Header().Set("Content-Length", strconv.Itoa(size))
107         resp.Header().Set("Content-Type", "application/octet-stream")
108         resp.Write(buf[:size])
109 }
110
111 // Get a buffer from the pool -- but give up and return a non-nil
112 // error if resp implements http.CloseNotifier and tells us that the
113 // client has disconnected before we get a buffer.
114 func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
115         var closeNotifier <-chan bool
116         if resp, ok := resp.(http.CloseNotifier); ok {
117                 closeNotifier = resp.CloseNotify()
118         }
119         var buf []byte
120         bufReady := make(chan []byte)
121         go func() {
122                 bufReady <- bufs.Get(bufSize)
123                 close(bufReady)
124         }()
125         select {
126         case buf = <-bufReady:
127                 return buf, nil
128         case <-closeNotifier:
129                 go func() {
130                         // Even if closeNotifier happened first, we
131                         // need to keep waiting for our buf so we can
132                         // return it to the pool.
133                         bufs.Put(<-bufReady)
134                 }()
135                 return nil, ErrClientDisconnect
136         }
137 }
138
139 // PutBlockHandler is a HandleFunc to address Put block requests.
140 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
141         hash := mux.Vars(req)["hash"]
142
143         // Detect as many error conditions as possible before reading
144         // the body: avoid transmitting data that will not end up
145         // being written anyway.
146
147         if req.ContentLength == -1 {
148                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
149                 return
150         }
151
152         if req.ContentLength > BlockSize {
153                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
154                 return
155         }
156
157         if len(KeepVM.AllWritable()) == 0 {
158                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
159                 return
160         }
161
162         buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
163         if err != nil {
164                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
165                 return
166         }
167
168         _, err = io.ReadFull(req.Body, buf)
169         if err != nil {
170                 http.Error(resp, err.Error(), 500)
171                 bufs.Put(buf)
172                 return
173         }
174
175         replication, err := PutBlock(buf, hash)
176         bufs.Put(buf)
177
178         if err != nil {
179                 ke := err.(*KeepError)
180                 http.Error(resp, ke.Error(), ke.HTTPCode)
181                 return
182         }
183
184         // Success; add a size hint, sign the locator if possible, and
185         // return it to the client.
186         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
187         apiToken := GetAPIToken(req)
188         if theConfig.blobSigningKey != nil && apiToken != "" {
189                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
190                 returnHash = SignLocator(returnHash, apiToken, expiry)
191         }
192         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
193         resp.Write([]byte(returnHash + "\n"))
194 }
195
196 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
197 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
198         // Reject unauthorized requests.
199         if !IsSystemAuth(GetAPIToken(req)) {
200                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
201                 return
202         }
203
204         prefix := mux.Vars(req)["prefix"]
205
206         for _, vol := range KeepVM.AllReadable() {
207                 if err := vol.IndexTo(prefix, resp); err != nil {
208                         // The only errors returned by IndexTo are
209                         // write errors returned by resp.Write(),
210                         // which probably means the client has
211                         // disconnected and this error will never be
212                         // reported to the client -- but it will
213                         // appear in our own error log.
214                         http.Error(resp, err.Error(), http.StatusInternalServerError)
215                         return
216                 }
217         }
218         // An empty line at EOF is the only way the client can be
219         // assured the entire index was received.
220         resp.Write([]byte{'\n'})
221 }
222
223 // StatusHandler
224 //     Responds to /status.json requests with the current node status,
225 //     described in a JSON structure.
226 //
227 //     The data given in a status.json response includes:
228 //        volumes - a list of Keep volumes currently in use by this server
229 //          each volume is an object with the following fields:
230 //            * mount_point
231 //            * device_num (an integer identifying the underlying filesystem)
232 //            * bytes_free
233 //            * bytes_used
234
235 // PoolStatus struct
236 type PoolStatus struct {
237         Alloc uint64 `json:"BytesAllocated"`
238         Cap   int    `json:"BuffersMax"`
239         Len   int    `json:"BuffersInUse"`
240 }
241
242 // NodeStatus struct
243 type NodeStatus struct {
244         Volumes    []*VolumeStatus `json:"volumes"`
245         BufferPool PoolStatus
246         PullQueue  WorkQueueStatus
247         TrashQueue WorkQueueStatus
248         Memory     runtime.MemStats
249 }
250
251 var st NodeStatus
252 var stLock sync.Mutex
253
254 // StatusHandler addresses /status.json requests.
255 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
256         stLock.Lock()
257         readNodeStatus(&st)
258         jstat, err := json.Marshal(&st)
259         stLock.Unlock()
260         if err == nil {
261                 resp.Write(jstat)
262         } else {
263                 log.Printf("json.Marshal: %s", err)
264                 log.Printf("NodeStatus = %v", &st)
265                 http.Error(resp, err.Error(), 500)
266         }
267 }
268
269 // populate the given NodeStatus struct with current values.
270 func readNodeStatus(st *NodeStatus) {
271         vols := KeepVM.AllReadable()
272         if cap(st.Volumes) < len(vols) {
273                 st.Volumes = make([]*VolumeStatus, len(vols))
274         }
275         st.Volumes = st.Volumes[:0]
276         for _, vol := range vols {
277                 if s := vol.Status(); s != nil {
278                         st.Volumes = append(st.Volumes, s)
279                 }
280         }
281         st.BufferPool.Alloc = bufs.Alloc()
282         st.BufferPool.Cap = bufs.Cap()
283         st.BufferPool.Len = bufs.Len()
284         st.PullQueue = getWorkQueueStatus(pullq)
285         st.TrashQueue = getWorkQueueStatus(trashq)
286         runtime.ReadMemStats(&st.Memory)
287 }
288
289 // return a WorkQueueStatus for the given queue. If q is nil (which
290 // should never happen except in test suites), return a zero status
291 // value instead of crashing.
292 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
293         if q == nil {
294                 // This should only happen during tests.
295                 return WorkQueueStatus{}
296         }
297         return q.Status()
298 }
299
300 // DeleteHandler processes DELETE requests.
301 //
302 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
303 // from all connected volumes.
304 //
305 // Only the Data Manager, or an Arvados admin with scope "all", are
306 // allowed to issue DELETE requests.  If a DELETE request is not
307 // authenticated or is issued by a non-admin user, the server returns
308 // a PermissionError.
309 //
310 // Upon receiving a valid request from an authorized user,
311 // DeleteHandler deletes all copies of the specified block on local
312 // writable volumes.
313 //
314 // Response format:
315 //
316 // If the requested blocks was not found on any volume, the response
317 // code is HTTP 404 Not Found.
318 //
319 // Otherwise, the response code is 200 OK, with a response body
320 // consisting of the JSON message
321 //
322 //    {"copies_deleted":d,"copies_failed":f}
323 //
324 // where d and f are integers representing the number of blocks that
325 // were successfully and unsuccessfully deleted.
326 //
327 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
328         hash := mux.Vars(req)["hash"]
329
330         // Confirm that this user is an admin and has a token with unlimited scope.
331         var tok = GetAPIToken(req)
332         if tok == "" || !CanDelete(tok) {
333                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
334                 return
335         }
336
337         if !theConfig.EnableDelete {
338                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
339                 return
340         }
341
342         // Delete copies of this block from all available volumes.
343         // Report how many blocks were successfully deleted, and how
344         // many were found on writable volumes but not deleted.
345         var result struct {
346                 Deleted int `json:"copies_deleted"`
347                 Failed  int `json:"copies_failed"`
348         }
349         for _, vol := range KeepVM.AllWritable() {
350                 if err := vol.Trash(hash); err == nil {
351                         result.Deleted++
352                 } else if os.IsNotExist(err) {
353                         continue
354                 } else {
355                         result.Failed++
356                         log.Println("DeleteHandler:", err)
357                 }
358         }
359
360         var st int
361
362         if result.Deleted == 0 && result.Failed == 0 {
363                 st = http.StatusNotFound
364         } else {
365                 st = http.StatusOK
366         }
367
368         resp.WriteHeader(st)
369
370         if st == http.StatusOK {
371                 if body, err := json.Marshal(result); err == nil {
372                         resp.Write(body)
373                 } else {
374                         log.Printf("json.Marshal: %s (result = %v)", err, result)
375                         http.Error(resp, err.Error(), 500)
376                 }
377         }
378 }
379
380 /* PullHandler processes "PUT /pull" requests for the data manager.
381    The request body is a JSON message containing a list of pull
382    requests in the following format:
383
384    [
385       {
386          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
387          "servers":[
388                         "keep0.qr1hi.arvadosapi.com:25107",
389                         "keep1.qr1hi.arvadosapi.com:25108"
390                  ]
391           },
392           {
393                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
394                  "servers":[
395                         "10.0.1.5:25107",
396                         "10.0.1.6:25107",
397                         "10.0.1.7:25108"
398                  ]
399           },
400           ...
401    ]
402
403    Each pull request in the list consists of a block locator string
404    and an ordered list of servers.  Keepstore should try to fetch the
405    block from each server in turn.
406
407    If the request has not been sent by the Data Manager, return 401
408    Unauthorized.
409
410    If the JSON unmarshalling fails, return 400 Bad Request.
411 */
412
413 // PullRequest consists of a block locator and an ordered list of servers
414 type PullRequest struct {
415         Locator string   `json:"locator"`
416         Servers []string `json:"servers"`
417 }
418
419 // PullHandler processes "PUT /pull" requests for the data manager.
420 func PullHandler(resp http.ResponseWriter, req *http.Request) {
421         // Reject unauthorized requests.
422         if !IsSystemAuth(GetAPIToken(req)) {
423                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
424                 return
425         }
426
427         // Parse the request body.
428         var pr []PullRequest
429         r := json.NewDecoder(req.Body)
430         if err := r.Decode(&pr); err != nil {
431                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
432                 return
433         }
434
435         // We have a properly formatted pull list sent from the data
436         // manager.  Report success and send the list to the pull list
437         // manager for further handling.
438         resp.WriteHeader(http.StatusOK)
439         resp.Write([]byte(
440                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
441
442         plist := list.New()
443         for _, p := range pr {
444                 plist.PushBack(p)
445         }
446         pullq.ReplaceQueue(plist)
447 }
448
449 // TrashRequest consists of a block locator and it's Mtime
450 type TrashRequest struct {
451         Locator    string `json:"locator"`
452         BlockMtime int64  `json:"block_mtime"`
453 }
454
455 // TrashHandler processes /trash requests.
456 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
457         // Reject unauthorized requests.
458         if !IsSystemAuth(GetAPIToken(req)) {
459                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
460                 return
461         }
462
463         // Parse the request body.
464         var trash []TrashRequest
465         r := json.NewDecoder(req.Body)
466         if err := r.Decode(&trash); err != nil {
467                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
468                 return
469         }
470
471         // We have a properly formatted trash list sent from the data
472         // manager.  Report success and send the list to the trash work
473         // queue for further handling.
474         resp.WriteHeader(http.StatusOK)
475         resp.Write([]byte(
476                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
477
478         tlist := list.New()
479         for _, t := range trash {
480                 tlist.PushBack(t)
481         }
482         trashq.ReplaceQueue(tlist)
483 }
484
485 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
486 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
487         // Reject unauthorized requests.
488         if !IsSystemAuth(GetAPIToken(req)) {
489                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
490                 return
491         }
492
493         hash := mux.Vars(req)["hash"]
494
495         if len(KeepVM.AllWritable()) == 0 {
496                 http.Error(resp, "No writable volumes", http.StatusNotFound)
497                 return
498         }
499
500         var untrashedOn, failedOn []string
501         var numNotFound int
502         for _, vol := range KeepVM.AllWritable() {
503                 err := vol.Untrash(hash)
504
505                 if os.IsNotExist(err) {
506                         numNotFound++
507                 } else if err != nil {
508                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
509                         failedOn = append(failedOn, vol.String())
510                 } else {
511                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
512                         untrashedOn = append(untrashedOn, vol.String())
513                 }
514         }
515
516         if numNotFound == len(KeepVM.AllWritable()) {
517                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
518                 return
519         }
520
521         if len(failedOn) == len(KeepVM.AllWritable()) {
522                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
523         } else {
524                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
525                 if len(failedOn) > 0 {
526                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
527                 }
528                 resp.Write([]byte(respBody))
529         }
530 }
531
532 // GetBlock and PutBlock implement lower-level code for handling
533 // blocks by rooting through volumes connected to the local machine.
534 // Once the handler has determined that system policy permits the
535 // request, it calls these methods to perform the actual operation.
536 //
537 // TODO(twp): this code would probably be better located in the
538 // VolumeManager interface. As an abstraction, the VolumeManager
539 // should be the only part of the code that cares about which volume a
540 // block is stored on, so it should be responsible for figuring out
541 // which volume to check for fetching blocks, storing blocks, etc.
542
543 // GetBlock fetches the block identified by "hash" into the provided
544 // buf, and returns the data size.
545 //
546 // If the block cannot be found on any volume, returns NotFoundError.
547 //
548 // If the block found does not have the correct MD5 hash, returns
549 // DiskHashError.
550 //
551 func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
552         // Attempt to read the requested hash from a keep volume.
553         errorToCaller := NotFoundError
554
555         for _, vol := range KeepVM.AllReadable() {
556                 size, err := vol.Get(hash, buf)
557                 if err != nil {
558                         // IsNotExist is an expected error and may be
559                         // ignored. All other errors are logged. In
560                         // any case we continue trying to read other
561                         // volumes. If all volumes report IsNotExist,
562                         // we return a NotFoundError.
563                         if !os.IsNotExist(err) {
564                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
565                         }
566                         continue
567                 }
568                 // Check the file checksum.
569                 //
570                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
571                 if filehash != hash {
572                         // TODO: Try harder to tell a sysadmin about
573                         // this.
574                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
575                                 vol, hash, filehash)
576                         errorToCaller = DiskHashError
577                         continue
578                 }
579                 if errorToCaller == DiskHashError {
580                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
581                                 vol, hash)
582                 }
583                 return size, nil
584         }
585         return 0, errorToCaller
586 }
587
588 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
589 //
590 // PutBlock(block, hash)
591 //   Stores the BLOCK (identified by the content id HASH) in Keep.
592 //
593 //   The MD5 checksum of the block must be identical to the content id HASH.
594 //   If not, an error is returned.
595 //
596 //   PutBlock stores the BLOCK on the first Keep volume with free space.
597 //   A failure code is returned to the user only if all volumes fail.
598 //
599 //   On success, PutBlock returns nil.
600 //   On failure, it returns a KeepError with one of the following codes:
601 //
602 //   500 Collision
603 //          A different block with the same hash already exists on this
604 //          Keep server.
605 //   422 MD5Fail
606 //          The MD5 hash of the BLOCK does not match the argument HASH.
607 //   503 Full
608 //          There was not enough space left in any Keep volume to store
609 //          the object.
610 //   500 Fail
611 //          The object could not be stored for some other reason (e.g.
612 //          all writes failed). The text of the error message should
613 //          provide as much detail as possible.
614 //
615 func PutBlock(block []byte, hash string) (int, error) {
616         // Check that BLOCK's checksum matches HASH.
617         blockhash := fmt.Sprintf("%x", md5.Sum(block))
618         if blockhash != hash {
619                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
620                 return 0, RequestHashError
621         }
622
623         // If we already have this data, it's intact on disk, and we
624         // can update its timestamp, return success. If we have
625         // different data with the same hash, return failure.
626         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
627                 return n, err
628         }
629
630         // Choose a Keep volume to write to.
631         // If this volume fails, try all of the volumes in order.
632         if vol := KeepVM.NextWritable(); vol != nil {
633                 if err := vol.Put(hash, block); err == nil {
634                         return vol.Replication(), nil // success!
635                 }
636         }
637
638         writables := KeepVM.AllWritable()
639         if len(writables) == 0 {
640                 log.Print("No writable volumes.")
641                 return 0, FullError
642         }
643
644         allFull := true
645         for _, vol := range writables {
646                 err := vol.Put(hash, block)
647                 if err == nil {
648                         return vol.Replication(), nil // success!
649                 }
650                 if err != FullError {
651                         // The volume is not full but the
652                         // write did not succeed.  Report the
653                         // error and continue trying.
654                         allFull = false
655                         log.Printf("%s: Write(%s): %s", vol, hash, err)
656                 }
657         }
658
659         if allFull {
660                 log.Print("All volumes are full.")
661                 return 0, FullError
662         }
663         // Already logged the non-full errors.
664         return 0, GenericError
665 }
666
667 // CompareAndTouch returns the current replication level if one of the
668 // volumes already has the given content and it successfully updates
669 // the relevant block's modification time in order to protect it from
670 // premature garbage collection. Otherwise, it returns a non-nil
671 // error.
672 func CompareAndTouch(hash string, buf []byte) (int, error) {
673         var bestErr error = NotFoundError
674         for _, vol := range KeepVM.AllWritable() {
675                 if err := vol.Compare(hash, buf); err == CollisionError {
676                         // Stop if we have a block with same hash but
677                         // different content. (It will be impossible
678                         // to tell which one is wanted if we have
679                         // both, so there's no point writing it even
680                         // on a different volume.)
681                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
682                         return 0, err
683                 } else if os.IsNotExist(err) {
684                         // Block does not exist. This is the only
685                         // "normal" error: we don't log anything.
686                         continue
687                 } else if err != nil {
688                         // Couldn't open file, data is corrupt on
689                         // disk, etc.: log this abnormal condition,
690                         // and try the next volume.
691                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
692                         continue
693                 }
694                 if err := vol.Touch(hash); err != nil {
695                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
696                         bestErr = err
697                         continue
698                 }
699                 // Compare and Touch both worked --> done.
700                 return vol.Replication(), nil
701         }
702         return 0, bestErr
703 }
704
705 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
706
707 // IsValidLocator returns true if the specified string is a valid Keep locator.
708 //   When Keep is extended to support hash types other than MD5,
709 //   this should be updated to cover those as well.
710 //
711 func IsValidLocator(loc string) bool {
712         return validLocatorRe.MatchString(loc)
713 }
714
715 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
716
717 // GetAPIToken returns the OAuth2 token from the Authorization
718 // header of a HTTP request, or an empty string if no matching
719 // token is found.
720 func GetAPIToken(req *http.Request) string {
721         if auth, ok := req.Header["Authorization"]; ok {
722                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
723                         return match[1]
724                 }
725         }
726         return ""
727 }
728
729 // IsExpired returns true if the given Unix timestamp (expressed as a
730 // hexadecimal string) is in the past, or if timestampHex cannot be
731 // parsed as a hexadecimal string.
732 func IsExpired(timestampHex string) bool {
733         ts, err := strconv.ParseInt(timestampHex, 16, 0)
734         if err != nil {
735                 log.Printf("IsExpired: %s", err)
736                 return true
737         }
738         return time.Unix(ts, 0).Before(time.Now())
739 }
740
741 // CanDelete returns true if the user identified by apiToken is
742 // allowed to delete blocks.
743 func CanDelete(apiToken string) bool {
744         if apiToken == "" {
745                 return false
746         }
747         // Blocks may be deleted only when Keep has been configured with a
748         // data manager.
749         if IsSystemAuth(apiToken) {
750                 return true
751         }
752         // TODO(twp): look up apiToken with the API server
753         // return true if is_admin is true and if the token
754         // has unlimited scope
755         return false
756 }
757
758 // IsSystemAuth returns true if the given token is allowed to perform
759 // system level actions like deleting data.
760 func IsSystemAuth(token string) bool {
761         return token != "" && token == theConfig.systemAuthToken
762 }