a188c47c53451ae9d3f7c3f9dbf64f6b03377d92
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "crypto/md5"
13         "encoding/json"
14         "fmt"
15         "github.com/gorilla/mux"
16         "io"
17         "log"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "strings"
24         "sync"
25         "time"
26 )
27
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
30 //
31 func MakeRESTRouter() *mux.Router {
32         rest := mux.NewRouter()
33
34         rest.HandleFunc(
35                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}+{hints}`,
38                 GetBlockHandler).Methods("GET", "HEAD")
39
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42         // List all blocks stored here. Privileged client only.
43         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44         // List blocks stored here whose hash has the given prefix.
45         // Privileged client only.
46         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47
48         // List volumes: path, device number, bytes used/avail.
49         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50
51         // Replace the current pull queue.
52         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53
54         // Replace the current trash queue.
55         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56
57         // Untrash moves blocks from trash back into store
58         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
59
60         // Any request which does not match any of these routes gets
61         // 400 Bad Request.
62         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
63
64         return rest
65 }
66
67 // BadRequestHandler is a HandleFunc to address bad requests.
68 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
69         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
70 }
71
72 // GetBlockHandler is a HandleFunc to address Get block requests.
73 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
74         if enforcePermissions {
75                 locator := req.URL.Path[1:] // strip leading slash
76                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
77                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
78                         return
79                 }
80         }
81
82         block, err := GetBlock(mux.Vars(req)["hash"])
83         if err != nil {
84                 // This type assertion is safe because the only errors
85                 // GetBlock can return are DiskHashError or NotFoundError.
86                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
87                 return
88         }
89         defer bufs.Put(block)
90
91         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
92         resp.Header().Set("Content-Type", "application/octet-stream")
93         resp.Write(block)
94 }
95
96 var errClientDisconnected = fmt.Errorf("client disconnected")
97
98 // Get a buffer from the pool -- but give up and return a non-nil
99 // error if resp implements http.CloseNotifier and tells us that the
100 // client has disconnected before we get a buffer.
101 func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte, error) {
102         var closeNotifier <-chan bool
103         if resp, ok := resp.(http.CloseNotifier); ok {
104                 closeNotifier = resp.CloseNotify()
105         }
106         var buf []byte
107         bufReady := make(chan []byte)
108         go func() {
109                 bufReady <- bufs.Get(bufSize)
110                 close(bufReady)
111         }()
112         select {
113         case buf = <-bufReady:
114                 return buf, nil
115         case <-closeNotifier:
116                 go func() {
117                         // Even if closeNotifier happened first, we
118                         // need to keep waiting for our buf so we can
119                         // return it to the pool.
120                         bufs.Put(<-bufReady)
121                 }()
122                 return nil, errClientDisconnected
123         }
124 }
125
126 // PutBlockHandler is a HandleFunc to address Put block requests.
127 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
128         hash := mux.Vars(req)["hash"]
129
130         // Detect as many error conditions as possible before reading
131         // the body: avoid transmitting data that will not end up
132         // being written anyway.
133
134         if req.ContentLength == -1 {
135                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
136                 return
137         }
138
139         if req.ContentLength > BlockSize {
140                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
141                 return
142         }
143
144         if len(KeepVM.AllWritable()) == 0 {
145                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
146                 return
147         }
148
149         buf, err := getBufferForResponseWriter(resp, int(req.ContentLength))
150         if err != nil {
151                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
152                 return
153         }
154
155         _, err = io.ReadFull(req.Body, buf)
156         if err != nil {
157                 http.Error(resp, err.Error(), 500)
158                 bufs.Put(buf)
159                 return
160         }
161
162         replication, err := PutBlock(buf, hash)
163         bufs.Put(buf)
164
165         if err != nil {
166                 ke := err.(*KeepError)
167                 http.Error(resp, ke.Error(), ke.HTTPCode)
168                 return
169         }
170
171         // Success; add a size hint, sign the locator if possible, and
172         // return it to the client.
173         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
174         apiToken := GetApiToken(req)
175         if PermissionSecret != nil && apiToken != "" {
176                 expiry := time.Now().Add(blobSignatureTTL)
177                 returnHash = SignLocator(returnHash, apiToken, expiry)
178         }
179         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
180         resp.Write([]byte(returnHash + "\n"))
181 }
182
183 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
184 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
185         // Reject unauthorized requests.
186         if !IsDataManagerToken(GetApiToken(req)) {
187                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
188                 return
189         }
190
191         prefix := mux.Vars(req)["prefix"]
192
193         for _, vol := range KeepVM.AllReadable() {
194                 if err := vol.IndexTo(prefix, resp); err != nil {
195                         // The only errors returned by IndexTo are
196                         // write errors returned by resp.Write(),
197                         // which probably means the client has
198                         // disconnected and this error will never be
199                         // reported to the client -- but it will
200                         // appear in our own error log.
201                         http.Error(resp, err.Error(), http.StatusInternalServerError)
202                         return
203                 }
204         }
205         // An empty line at EOF is the only way the client can be
206         // assured the entire index was received.
207         resp.Write([]byte{'\n'})
208 }
209
210 // StatusHandler
211 //     Responds to /status.json requests with the current node status,
212 //     described in a JSON structure.
213 //
214 //     The data given in a status.json response includes:
215 //        volumes - a list of Keep volumes currently in use by this server
216 //          each volume is an object with the following fields:
217 //            * mount_point
218 //            * device_num (an integer identifying the underlying filesystem)
219 //            * bytes_free
220 //            * bytes_used
221
222 // PoolStatus struct
223 type PoolStatus struct {
224         Alloc uint64 `json:"BytesAllocated"`
225         Cap   int    `json:"BuffersMax"`
226         Len   int    `json:"BuffersInUse"`
227 }
228
229 // NodeStatus struct
230 type NodeStatus struct {
231         Volumes    []*VolumeStatus `json:"volumes"`
232         BufferPool PoolStatus
233         PullQueue  WorkQueueStatus
234         TrashQueue WorkQueueStatus
235         Memory     runtime.MemStats
236 }
237
238 var st NodeStatus
239 var stLock sync.Mutex
240
241 // StatusHandler addresses /status.json requests.
242 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
243         stLock.Lock()
244         readNodeStatus(&st)
245         jstat, err := json.Marshal(&st)
246         stLock.Unlock()
247         if err == nil {
248                 resp.Write(jstat)
249         } else {
250                 log.Printf("json.Marshal: %s", err)
251                 log.Printf("NodeStatus = %v", &st)
252                 http.Error(resp, err.Error(), 500)
253         }
254 }
255
256 // populate the given NodeStatus struct with current values.
257 func readNodeStatus(st *NodeStatus) {
258         vols := KeepVM.AllReadable()
259         if cap(st.Volumes) < len(vols) {
260                 st.Volumes = make([]*VolumeStatus, len(vols))
261         }
262         st.Volumes = st.Volumes[:0]
263         for _, vol := range vols {
264                 if s := vol.Status(); s != nil {
265                         st.Volumes = append(st.Volumes, s)
266                 }
267         }
268         st.BufferPool.Alloc = bufs.Alloc()
269         st.BufferPool.Cap = bufs.Cap()
270         st.BufferPool.Len = bufs.Len()
271         st.PullQueue = getWorkQueueStatus(pullq)
272         st.TrashQueue = getWorkQueueStatus(trashq)
273         runtime.ReadMemStats(&st.Memory)
274 }
275
276 // return a WorkQueueStatus for the given queue. If q is nil (which
277 // should never happen except in test suites), return a zero status
278 // value instead of crashing.
279 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
280         if q == nil {
281                 // This should only happen during tests.
282                 return WorkQueueStatus{}
283         }
284         return q.Status()
285 }
286
287 // DeleteHandler processes DELETE requests.
288 //
289 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
290 // from all connected volumes.
291 //
292 // Only the Data Manager, or an Arvados admin with scope "all", are
293 // allowed to issue DELETE requests.  If a DELETE request is not
294 // authenticated or is issued by a non-admin user, the server returns
295 // a PermissionError.
296 //
297 // Upon receiving a valid request from an authorized user,
298 // DeleteHandler deletes all copies of the specified block on local
299 // writable volumes.
300 //
301 // Response format:
302 //
303 // If the requested blocks was not found on any volume, the response
304 // code is HTTP 404 Not Found.
305 //
306 // Otherwise, the response code is 200 OK, with a response body
307 // consisting of the JSON message
308 //
309 //    {"copies_deleted":d,"copies_failed":f}
310 //
311 // where d and f are integers representing the number of blocks that
312 // were successfully and unsuccessfully deleted.
313 //
314 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
315         hash := mux.Vars(req)["hash"]
316
317         // Confirm that this user is an admin and has a token with unlimited scope.
318         var tok = GetApiToken(req)
319         if tok == "" || !CanDelete(tok) {
320                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
321                 return
322         }
323
324         if neverDelete {
325                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
326                 return
327         }
328
329         // Delete copies of this block from all available volumes.
330         // Report how many blocks were successfully deleted, and how
331         // many were found on writable volumes but not deleted.
332         var result struct {
333                 Deleted int `json:"copies_deleted"`
334                 Failed  int `json:"copies_failed"`
335         }
336         for _, vol := range KeepVM.AllWritable() {
337                 if err := vol.Trash(hash); err == nil {
338                         result.Deleted++
339                 } else if os.IsNotExist(err) {
340                         continue
341                 } else {
342                         result.Failed++
343                         log.Println("DeleteHandler:", err)
344                 }
345         }
346
347         var st int
348
349         if result.Deleted == 0 && result.Failed == 0 {
350                 st = http.StatusNotFound
351         } else {
352                 st = http.StatusOK
353         }
354
355         resp.WriteHeader(st)
356
357         if st == http.StatusOK {
358                 if body, err := json.Marshal(result); err == nil {
359                         resp.Write(body)
360                 } else {
361                         log.Printf("json.Marshal: %s (result = %v)", err, result)
362                         http.Error(resp, err.Error(), 500)
363                 }
364         }
365 }
366
367 /* PullHandler processes "PUT /pull" requests for the data manager.
368    The request body is a JSON message containing a list of pull
369    requests in the following format:
370
371    [
372       {
373          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
374          "servers":[
375                         "keep0.qr1hi.arvadosapi.com:25107",
376                         "keep1.qr1hi.arvadosapi.com:25108"
377                  ]
378           },
379           {
380                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
381                  "servers":[
382                         "10.0.1.5:25107",
383                         "10.0.1.6:25107",
384                         "10.0.1.7:25108"
385                  ]
386           },
387           ...
388    ]
389
390    Each pull request in the list consists of a block locator string
391    and an ordered list of servers.  Keepstore should try to fetch the
392    block from each server in turn.
393
394    If the request has not been sent by the Data Manager, return 401
395    Unauthorized.
396
397    If the JSON unmarshalling fails, return 400 Bad Request.
398 */
399
400 // PullRequest consists of a block locator and an ordered list of servers
401 type PullRequest struct {
402         Locator string   `json:"locator"`
403         Servers []string `json:"servers"`
404 }
405
406 // PullHandler processes "PUT /pull" requests for the data manager.
407 func PullHandler(resp http.ResponseWriter, req *http.Request) {
408         // Reject unauthorized requests.
409         if !IsDataManagerToken(GetApiToken(req)) {
410                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
411                 return
412         }
413
414         // Parse the request body.
415         var pr []PullRequest
416         r := json.NewDecoder(req.Body)
417         if err := r.Decode(&pr); err != nil {
418                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
419                 return
420         }
421
422         // We have a properly formatted pull list sent from the data
423         // manager.  Report success and send the list to the pull list
424         // manager for further handling.
425         resp.WriteHeader(http.StatusOK)
426         resp.Write([]byte(
427                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
428
429         plist := list.New()
430         for _, p := range pr {
431                 plist.PushBack(p)
432         }
433         pullq.ReplaceQueue(plist)
434 }
435
436 // TrashRequest consists of a block locator and it's Mtime
437 type TrashRequest struct {
438         Locator    string `json:"locator"`
439         BlockMtime int64  `json:"block_mtime"`
440 }
441
442 // TrashHandler processes /trash requests.
443 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
444         // Reject unauthorized requests.
445         if !IsDataManagerToken(GetApiToken(req)) {
446                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
447                 return
448         }
449
450         // Parse the request body.
451         var trash []TrashRequest
452         r := json.NewDecoder(req.Body)
453         if err := r.Decode(&trash); err != nil {
454                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
455                 return
456         }
457
458         // We have a properly formatted trash list sent from the data
459         // manager.  Report success and send the list to the trash work
460         // queue for further handling.
461         resp.WriteHeader(http.StatusOK)
462         resp.Write([]byte(
463                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
464
465         tlist := list.New()
466         for _, t := range trash {
467                 tlist.PushBack(t)
468         }
469         trashq.ReplaceQueue(tlist)
470 }
471
472 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
473 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
474         // Reject unauthorized requests.
475         if !IsDataManagerToken(GetApiToken(req)) {
476                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
477                 return
478         }
479
480         hash := mux.Vars(req)["hash"]
481
482         if len(KeepVM.AllWritable()) == 0 {
483                 http.Error(resp, "No writable volumes", http.StatusNotFound)
484                 return
485         }
486
487         var untrashedOn, failedOn []string
488         var numNotFound int
489         for _, vol := range KeepVM.AllWritable() {
490                 err := vol.Untrash(hash)
491
492                 if os.IsNotExist(err) {
493                         numNotFound++
494                 } else if err != nil {
495                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
496                         failedOn = append(failedOn, vol.String())
497                 } else {
498                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
499                         untrashedOn = append(untrashedOn, vol.String())
500                 }
501         }
502
503         if numNotFound == len(KeepVM.AllWritable()) {
504                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
505                 return
506         }
507
508         if len(failedOn) == len(KeepVM.AllWritable()) {
509                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
510         } else {
511                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
512                 if len(failedOn) > 0 {
513                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
514                 }
515                 resp.Write([]byte(respBody))
516         }
517 }
518
519 // ==============================
520 // GetBlock and PutBlock implement lower-level code for handling
521 // blocks by rooting through volumes connected to the local machine.
522 // Once the handler has determined that system policy permits the
523 // request, it calls these methods to perform the actual operation.
524 //
525 // TODO(twp): this code would probably be better located in the
526 // VolumeManager interface. As an abstraction, the VolumeManager
527 // should be the only part of the code that cares about which volume a
528 // block is stored on, so it should be responsible for figuring out
529 // which volume to check for fetching blocks, storing blocks, etc.
530 // ==============================
531
532 // GetBlock fetches and returns the block identified by "hash".
533 //
534 // On success, GetBlock returns a byte slice with the block data, and
535 // a nil error.
536 //
537 // If the block cannot be found on any volume, returns NotFoundError.
538 //
539 // If the block found does not have the correct MD5 hash, returns
540 // DiskHashError.
541 //
542 func GetBlock(hash string) ([]byte, error) {
543         // Attempt to read the requested hash from a keep volume.
544         errorToCaller := NotFoundError
545
546         for _, vol := range KeepVM.AllReadable() {
547                 buf, err := vol.Get(hash)
548                 if err != nil {
549                         // IsNotExist is an expected error and may be
550                         // ignored. All other errors are logged. In
551                         // any case we continue trying to read other
552                         // volumes. If all volumes report IsNotExist,
553                         // we return a NotFoundError.
554                         if !os.IsNotExist(err) {
555                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
556                         }
557                         continue
558                 }
559                 // Check the file checksum.
560                 //
561                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
562                 if filehash != hash {
563                         // TODO: Try harder to tell a sysadmin about
564                         // this.
565                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
566                                 vol, hash, filehash)
567                         errorToCaller = DiskHashError
568                         bufs.Put(buf)
569                         continue
570                 }
571                 if errorToCaller == DiskHashError {
572                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
573                                 vol, hash)
574                 }
575                 return buf, nil
576         }
577         return nil, errorToCaller
578 }
579
580 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
581 //
582 // PutBlock(block, hash)
583 //   Stores the BLOCK (identified by the content id HASH) in Keep.
584 //
585 //   The MD5 checksum of the block must be identical to the content id HASH.
586 //   If not, an error is returned.
587 //
588 //   PutBlock stores the BLOCK on the first Keep volume with free space.
589 //   A failure code is returned to the user only if all volumes fail.
590 //
591 //   On success, PutBlock returns nil.
592 //   On failure, it returns a KeepError with one of the following codes:
593 //
594 //   500 Collision
595 //          A different block with the same hash already exists on this
596 //          Keep server.
597 //   422 MD5Fail
598 //          The MD5 hash of the BLOCK does not match the argument HASH.
599 //   503 Full
600 //          There was not enough space left in any Keep volume to store
601 //          the object.
602 //   500 Fail
603 //          The object could not be stored for some other reason (e.g.
604 //          all writes failed). The text of the error message should
605 //          provide as much detail as possible.
606 //
607 func PutBlock(block []byte, hash string) (int, error) {
608         // Check that BLOCK's checksum matches HASH.
609         blockhash := fmt.Sprintf("%x", md5.Sum(block))
610         if blockhash != hash {
611                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
612                 return 0, RequestHashError
613         }
614
615         // If we already have this data, it's intact on disk, and we
616         // can update its timestamp, return success. If we have
617         // different data with the same hash, return failure.
618         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
619                 return n, err
620         }
621
622         // Choose a Keep volume to write to.
623         // If this volume fails, try all of the volumes in order.
624         if vol := KeepVM.NextWritable(); vol != nil {
625                 if err := vol.Put(hash, block); err == nil {
626                         return vol.Replication(), nil // success!
627                 }
628         }
629
630         writables := KeepVM.AllWritable()
631         if len(writables) == 0 {
632                 log.Print("No writable volumes.")
633                 return 0, FullError
634         }
635
636         allFull := true
637         for _, vol := range writables {
638                 err := vol.Put(hash, block)
639                 if err == nil {
640                         return vol.Replication(), nil // success!
641                 }
642                 if err != FullError {
643                         // The volume is not full but the
644                         // write did not succeed.  Report the
645                         // error and continue trying.
646                         allFull = false
647                         log.Printf("%s: Write(%s): %s", vol, hash, err)
648                 }
649         }
650
651         if allFull {
652                 log.Print("All volumes are full.")
653                 return 0, FullError
654         }
655         // Already logged the non-full errors.
656         return 0, GenericError
657 }
658
659 // CompareAndTouch returns the current replication level if one of the
660 // volumes already has the given content and it successfully updates
661 // the relevant block's modification time in order to protect it from
662 // premature garbage collection. Otherwise, it returns a non-nil
663 // error.
664 func CompareAndTouch(hash string, buf []byte) (int, error) {
665         var bestErr error = NotFoundError
666         for _, vol := range KeepVM.AllWritable() {
667                 if err := vol.Compare(hash, buf); err == CollisionError {
668                         // Stop if we have a block with same hash but
669                         // different content. (It will be impossible
670                         // to tell which one is wanted if we have
671                         // both, so there's no point writing it even
672                         // on a different volume.)
673                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
674                         return 0, err
675                 } else if os.IsNotExist(err) {
676                         // Block does not exist. This is the only
677                         // "normal" error: we don't log anything.
678                         continue
679                 } else if err != nil {
680                         // Couldn't open file, data is corrupt on
681                         // disk, etc.: log this abnormal condition,
682                         // and try the next volume.
683                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
684                         continue
685                 }
686                 if err := vol.Touch(hash); err != nil {
687                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
688                         bestErr = err
689                         continue
690                 }
691                 // Compare and Touch both worked --> done.
692                 return vol.Replication(), nil
693         }
694         return 0, bestErr
695 }
696
697 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
698
699 // IsValidLocator returns true if the specified string is a valid Keep locator.
700 //   When Keep is extended to support hash types other than MD5,
701 //   this should be updated to cover those as well.
702 //
703 func IsValidLocator(loc string) bool {
704         return validLocatorRe.MatchString(loc)
705 }
706
707 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
708
709 // GetApiToken returns the OAuth2 token from the Authorization
710 // header of a HTTP request, or an empty string if no matching
711 // token is found.
712 func GetApiToken(req *http.Request) string {
713         if auth, ok := req.Header["Authorization"]; ok {
714                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
715                         return match[1]
716                 }
717         }
718         return ""
719 }
720
721 // IsExpired returns true if the given Unix timestamp (expressed as a
722 // hexadecimal string) is in the past, or if timestampHex cannot be
723 // parsed as a hexadecimal string.
724 func IsExpired(timestampHex string) bool {
725         ts, err := strconv.ParseInt(timestampHex, 16, 0)
726         if err != nil {
727                 log.Printf("IsExpired: %s", err)
728                 return true
729         }
730         return time.Unix(ts, 0).Before(time.Now())
731 }
732
733 // CanDelete returns true if the user identified by apiToken is
734 // allowed to delete blocks.
735 func CanDelete(apiToken string) bool {
736         if apiToken == "" {
737                 return false
738         }
739         // Blocks may be deleted only when Keep has been configured with a
740         // data manager.
741         if IsDataManagerToken(apiToken) {
742                 return true
743         }
744         // TODO(twp): look up apiToken with the API server
745         // return true if is_admin is true and if the token
746         // has unlimited scope
747         return false
748 }
749
750 // IsDataManagerToken returns true if apiToken represents the data
751 // manager's token.
752 func IsDataManagerToken(apiToken string) bool {
753         return dataManagerToken != "" && apiToken == dataManagerToken
754 }