10467: Abort S3 request and release buffer if caller disconnects while server is...
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "sync"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // List volumes: path, device number, bytes used/avail.
50         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51
52         // Replace the current pull queue.
53         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54
55         // Replace the current trash queue.
56         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57
58         // Untrash moves blocks from trash back into store
59         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
60
61         // Any request which does not match any of these routes gets
62         // 400 Bad Request.
63         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
64
65         return rest
66 }
67
68 // BadRequestHandler is a HandleFunc to address bad requests.
69 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
70         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
71 }
72
73 // GetBlockHandler is a HandleFunc to address Get block requests.
74 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
75         if theConfig.RequireSignatures {
76                 locator := req.URL.Path[1:] // strip leading slash
77                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
78                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
79                         return
80                 }
81         }
82
83         // TODO: Probe volumes to check whether the block _might_
84         // exist. Some volumes/types could support a quick existence
85         // check without causing other operations to suffer. If all
86         // volumes support that, and assure us the block definitely
87         // isn't here, we can return 404 now instead of waiting for a
88         // buffer.
89
90         buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
91         if err != nil {
92                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
93                 return
94         }
95         defer bufs.Put(buf)
96
97         ctx, cancel := context.WithCancel(context.TODO())
98         if resp, ok := resp.(http.CloseNotifier); ok {
99                 go func() {
100                         <-resp.CloseNotify()
101                         cancel()
102                 }()
103         }
104         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
105         if err != nil {
106                 code := http.StatusInternalServerError
107                 if err, ok := err.(*KeepError); ok {
108                         code = err.HTTPCode
109                 }
110                 http.Error(resp, err.Error(), code)
111                 return
112         }
113
114         resp.Header().Set("Content-Length", strconv.Itoa(size))
115         resp.Header().Set("Content-Type", "application/octet-stream")
116         resp.Write(buf[:size])
117 }
118
119 // Get a buffer from the pool -- but give up and return a non-nil
120 // error if resp implements http.CloseNotifier and tells us that the
121 // client has disconnected before we get a buffer.
122 func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
123         var closeNotifier <-chan bool
124         if resp, ok := resp.(http.CloseNotifier); ok {
125                 closeNotifier = resp.CloseNotify()
126         }
127         var buf []byte
128         bufReady := make(chan []byte)
129         go func() {
130                 bufReady <- bufs.Get(bufSize)
131                 close(bufReady)
132         }()
133         select {
134         case buf = <-bufReady:
135                 return buf, nil
136         case <-closeNotifier:
137                 go func() {
138                         // Even if closeNotifier happened first, we
139                         // need to keep waiting for our buf so we can
140                         // return it to the pool.
141                         bufs.Put(<-bufReady)
142                 }()
143                 return nil, ErrClientDisconnect
144         }
145 }
146
147 // PutBlockHandler is a HandleFunc to address Put block requests.
148 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
149         hash := mux.Vars(req)["hash"]
150
151         // Detect as many error conditions as possible before reading
152         // the body: avoid transmitting data that will not end up
153         // being written anyway.
154
155         if req.ContentLength == -1 {
156                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
157                 return
158         }
159
160         if req.ContentLength > BlockSize {
161                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
162                 return
163         }
164
165         if len(KeepVM.AllWritable()) == 0 {
166                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
167                 return
168         }
169
170         buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
171         if err != nil {
172                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
173                 return
174         }
175
176         _, err = io.ReadFull(req.Body, buf)
177         if err != nil {
178                 http.Error(resp, err.Error(), 500)
179                 bufs.Put(buf)
180                 return
181         }
182
183         replication, err := PutBlock(buf, hash)
184         bufs.Put(buf)
185
186         if err != nil {
187                 ke := err.(*KeepError)
188                 http.Error(resp, ke.Error(), ke.HTTPCode)
189                 return
190         }
191
192         // Success; add a size hint, sign the locator if possible, and
193         // return it to the client.
194         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
195         apiToken := GetAPIToken(req)
196         if theConfig.blobSigningKey != nil && apiToken != "" {
197                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
198                 returnHash = SignLocator(returnHash, apiToken, expiry)
199         }
200         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
201         resp.Write([]byte(returnHash + "\n"))
202 }
203
204 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
205 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
206         // Reject unauthorized requests.
207         if !IsSystemAuth(GetAPIToken(req)) {
208                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
209                 return
210         }
211
212         prefix := mux.Vars(req)["prefix"]
213
214         for _, vol := range KeepVM.AllReadable() {
215                 if err := vol.IndexTo(prefix, resp); err != nil {
216                         // The only errors returned by IndexTo are
217                         // write errors returned by resp.Write(),
218                         // which probably means the client has
219                         // disconnected and this error will never be
220                         // reported to the client -- but it will
221                         // appear in our own error log.
222                         http.Error(resp, err.Error(), http.StatusInternalServerError)
223                         return
224                 }
225         }
226         // An empty line at EOF is the only way the client can be
227         // assured the entire index was received.
228         resp.Write([]byte{'\n'})
229 }
230
231 // StatusHandler
232 //     Responds to /status.json requests with the current node status,
233 //     described in a JSON structure.
234 //
235 //     The data given in a status.json response includes:
236 //        volumes - a list of Keep volumes currently in use by this server
237 //          each volume is an object with the following fields:
238 //            * mount_point
239 //            * device_num (an integer identifying the underlying filesystem)
240 //            * bytes_free
241 //            * bytes_used
242
243 // PoolStatus struct
244 type PoolStatus struct {
245         Alloc uint64 `json:"BytesAllocated"`
246         Cap   int    `json:"BuffersMax"`
247         Len   int    `json:"BuffersInUse"`
248 }
249
250 // NodeStatus struct
251 type NodeStatus struct {
252         Volumes    []*VolumeStatus `json:"volumes"`
253         BufferPool PoolStatus
254         PullQueue  WorkQueueStatus
255         TrashQueue WorkQueueStatus
256         Memory     runtime.MemStats
257 }
258
259 var st NodeStatus
260 var stLock sync.Mutex
261
262 // StatusHandler addresses /status.json requests.
263 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
264         stLock.Lock()
265         readNodeStatus(&st)
266         jstat, err := json.Marshal(&st)
267         stLock.Unlock()
268         if err == nil {
269                 resp.Write(jstat)
270         } else {
271                 log.Printf("json.Marshal: %s", err)
272                 log.Printf("NodeStatus = %v", &st)
273                 http.Error(resp, err.Error(), 500)
274         }
275 }
276
277 // populate the given NodeStatus struct with current values.
278 func readNodeStatus(st *NodeStatus) {
279         vols := KeepVM.AllReadable()
280         if cap(st.Volumes) < len(vols) {
281                 st.Volumes = make([]*VolumeStatus, len(vols))
282         }
283         st.Volumes = st.Volumes[:0]
284         for _, vol := range vols {
285                 if s := vol.Status(); s != nil {
286                         st.Volumes = append(st.Volumes, s)
287                 }
288         }
289         st.BufferPool.Alloc = bufs.Alloc()
290         st.BufferPool.Cap = bufs.Cap()
291         st.BufferPool.Len = bufs.Len()
292         st.PullQueue = getWorkQueueStatus(pullq)
293         st.TrashQueue = getWorkQueueStatus(trashq)
294         runtime.ReadMemStats(&st.Memory)
295 }
296
297 // return a WorkQueueStatus for the given queue. If q is nil (which
298 // should never happen except in test suites), return a zero status
299 // value instead of crashing.
300 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
301         if q == nil {
302                 // This should only happen during tests.
303                 return WorkQueueStatus{}
304         }
305         return q.Status()
306 }
307
308 // DeleteHandler processes DELETE requests.
309 //
310 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
311 // from all connected volumes.
312 //
313 // Only the Data Manager, or an Arvados admin with scope "all", are
314 // allowed to issue DELETE requests.  If a DELETE request is not
315 // authenticated or is issued by a non-admin user, the server returns
316 // a PermissionError.
317 //
318 // Upon receiving a valid request from an authorized user,
319 // DeleteHandler deletes all copies of the specified block on local
320 // writable volumes.
321 //
322 // Response format:
323 //
324 // If the requested blocks was not found on any volume, the response
325 // code is HTTP 404 Not Found.
326 //
327 // Otherwise, the response code is 200 OK, with a response body
328 // consisting of the JSON message
329 //
330 //    {"copies_deleted":d,"copies_failed":f}
331 //
332 // where d and f are integers representing the number of blocks that
333 // were successfully and unsuccessfully deleted.
334 //
335 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
336         hash := mux.Vars(req)["hash"]
337
338         // Confirm that this user is an admin and has a token with unlimited scope.
339         var tok = GetAPIToken(req)
340         if tok == "" || !CanDelete(tok) {
341                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
342                 return
343         }
344
345         if !theConfig.EnableDelete {
346                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
347                 return
348         }
349
350         // Delete copies of this block from all available volumes.
351         // Report how many blocks were successfully deleted, and how
352         // many were found on writable volumes but not deleted.
353         var result struct {
354                 Deleted int `json:"copies_deleted"`
355                 Failed  int `json:"copies_failed"`
356         }
357         for _, vol := range KeepVM.AllWritable() {
358                 if err := vol.Trash(hash); err == nil {
359                         result.Deleted++
360                 } else if os.IsNotExist(err) {
361                         continue
362                 } else {
363                         result.Failed++
364                         log.Println("DeleteHandler:", err)
365                 }
366         }
367
368         var st int
369
370         if result.Deleted == 0 && result.Failed == 0 {
371                 st = http.StatusNotFound
372         } else {
373                 st = http.StatusOK
374         }
375
376         resp.WriteHeader(st)
377
378         if st == http.StatusOK {
379                 if body, err := json.Marshal(result); err == nil {
380                         resp.Write(body)
381                 } else {
382                         log.Printf("json.Marshal: %s (result = %v)", err, result)
383                         http.Error(resp, err.Error(), 500)
384                 }
385         }
386 }
387
388 /* PullHandler processes "PUT /pull" requests for the data manager.
389    The request body is a JSON message containing a list of pull
390    requests in the following format:
391
392    [
393       {
394          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
395          "servers":[
396                         "keep0.qr1hi.arvadosapi.com:25107",
397                         "keep1.qr1hi.arvadosapi.com:25108"
398                  ]
399           },
400           {
401                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
402                  "servers":[
403                         "10.0.1.5:25107",
404                         "10.0.1.6:25107",
405                         "10.0.1.7:25108"
406                  ]
407           },
408           ...
409    ]
410
411    Each pull request in the list consists of a block locator string
412    and an ordered list of servers.  Keepstore should try to fetch the
413    block from each server in turn.
414
415    If the request has not been sent by the Data Manager, return 401
416    Unauthorized.
417
418    If the JSON unmarshalling fails, return 400 Bad Request.
419 */
420
421 // PullRequest consists of a block locator and an ordered list of servers
422 type PullRequest struct {
423         Locator string   `json:"locator"`
424         Servers []string `json:"servers"`
425 }
426
427 // PullHandler processes "PUT /pull" requests for the data manager.
428 func PullHandler(resp http.ResponseWriter, req *http.Request) {
429         // Reject unauthorized requests.
430         if !IsSystemAuth(GetAPIToken(req)) {
431                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
432                 return
433         }
434
435         // Parse the request body.
436         var pr []PullRequest
437         r := json.NewDecoder(req.Body)
438         if err := r.Decode(&pr); err != nil {
439                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
440                 return
441         }
442
443         // We have a properly formatted pull list sent from the data
444         // manager.  Report success and send the list to the pull list
445         // manager for further handling.
446         resp.WriteHeader(http.StatusOK)
447         resp.Write([]byte(
448                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
449
450         plist := list.New()
451         for _, p := range pr {
452                 plist.PushBack(p)
453         }
454         pullq.ReplaceQueue(plist)
455 }
456
457 // TrashRequest consists of a block locator and it's Mtime
458 type TrashRequest struct {
459         Locator    string `json:"locator"`
460         BlockMtime int64  `json:"block_mtime"`
461 }
462
463 // TrashHandler processes /trash requests.
464 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
465         // Reject unauthorized requests.
466         if !IsSystemAuth(GetAPIToken(req)) {
467                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
468                 return
469         }
470
471         // Parse the request body.
472         var trash []TrashRequest
473         r := json.NewDecoder(req.Body)
474         if err := r.Decode(&trash); err != nil {
475                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
476                 return
477         }
478
479         // We have a properly formatted trash list sent from the data
480         // manager.  Report success and send the list to the trash work
481         // queue for further handling.
482         resp.WriteHeader(http.StatusOK)
483         resp.Write([]byte(
484                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
485
486         tlist := list.New()
487         for _, t := range trash {
488                 tlist.PushBack(t)
489         }
490         trashq.ReplaceQueue(tlist)
491 }
492
493 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
494 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
495         // Reject unauthorized requests.
496         if !IsSystemAuth(GetAPIToken(req)) {
497                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
498                 return
499         }
500
501         hash := mux.Vars(req)["hash"]
502
503         if len(KeepVM.AllWritable()) == 0 {
504                 http.Error(resp, "No writable volumes", http.StatusNotFound)
505                 return
506         }
507
508         var untrashedOn, failedOn []string
509         var numNotFound int
510         for _, vol := range KeepVM.AllWritable() {
511                 err := vol.Untrash(hash)
512
513                 if os.IsNotExist(err) {
514                         numNotFound++
515                 } else if err != nil {
516                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
517                         failedOn = append(failedOn, vol.String())
518                 } else {
519                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
520                         untrashedOn = append(untrashedOn, vol.String())
521                 }
522         }
523
524         if numNotFound == len(KeepVM.AllWritable()) {
525                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
526                 return
527         }
528
529         if len(failedOn) == len(KeepVM.AllWritable()) {
530                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
531         } else {
532                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
533                 if len(failedOn) > 0 {
534                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
535                 }
536                 resp.Write([]byte(respBody))
537         }
538 }
539
540 // GetBlock and PutBlock implement lower-level code for handling
541 // blocks by rooting through volumes connected to the local machine.
542 // Once the handler has determined that system policy permits the
543 // request, it calls these methods to perform the actual operation.
544 //
545 // TODO(twp): this code would probably be better located in the
546 // VolumeManager interface. As an abstraction, the VolumeManager
547 // should be the only part of the code that cares about which volume a
548 // block is stored on, so it should be responsible for figuring out
549 // which volume to check for fetching blocks, storing blocks, etc.
550
551 // GetBlock fetches the block identified by "hash" into the provided
552 // buf, and returns the data size.
553 //
554 // If the block cannot be found on any volume, returns NotFoundError.
555 //
556 // If the block found does not have the correct MD5 hash, returns
557 // DiskHashError.
558 //
559 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
560         // Attempt to read the requested hash from a keep volume.
561         errorToCaller := NotFoundError
562
563         for _, vol := range KeepVM.AllReadable() {
564                 size, err := vol.Get(ctx, hash, buf)
565                 if err != nil {
566                         // IsNotExist is an expected error and may be
567                         // ignored. All other errors are logged. In
568                         // any case we continue trying to read other
569                         // volumes. If all volumes report IsNotExist,
570                         // we return a NotFoundError.
571                         if !os.IsNotExist(err) {
572                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
573                         }
574                         continue
575                 }
576                 // Check the file checksum.
577                 //
578                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
579                 if filehash != hash {
580                         // TODO: Try harder to tell a sysadmin about
581                         // this.
582                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
583                                 vol, hash, filehash)
584                         errorToCaller = DiskHashError
585                         continue
586                 }
587                 if errorToCaller == DiskHashError {
588                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
589                                 vol, hash)
590                 }
591                 return size, nil
592         }
593         return 0, errorToCaller
594 }
595
596 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
597 //
598 // PutBlock(block, hash)
599 //   Stores the BLOCK (identified by the content id HASH) in Keep.
600 //
601 //   The MD5 checksum of the block must be identical to the content id HASH.
602 //   If not, an error is returned.
603 //
604 //   PutBlock stores the BLOCK on the first Keep volume with free space.
605 //   A failure code is returned to the user only if all volumes fail.
606 //
607 //   On success, PutBlock returns nil.
608 //   On failure, it returns a KeepError with one of the following codes:
609 //
610 //   500 Collision
611 //          A different block with the same hash already exists on this
612 //          Keep server.
613 //   422 MD5Fail
614 //          The MD5 hash of the BLOCK does not match the argument HASH.
615 //   503 Full
616 //          There was not enough space left in any Keep volume to store
617 //          the object.
618 //   500 Fail
619 //          The object could not be stored for some other reason (e.g.
620 //          all writes failed). The text of the error message should
621 //          provide as much detail as possible.
622 //
623 func PutBlock(block []byte, hash string) (int, error) {
624         // Check that BLOCK's checksum matches HASH.
625         blockhash := fmt.Sprintf("%x", md5.Sum(block))
626         if blockhash != hash {
627                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
628                 return 0, RequestHashError
629         }
630
631         // If we already have this data, it's intact on disk, and we
632         // can update its timestamp, return success. If we have
633         // different data with the same hash, return failure.
634         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
635                 return n, err
636         }
637
638         // Choose a Keep volume to write to.
639         // If this volume fails, try all of the volumes in order.
640         if vol := KeepVM.NextWritable(); vol != nil {
641                 if err := vol.Put(hash, block); err == nil {
642                         return vol.Replication(), nil // success!
643                 }
644         }
645
646         writables := KeepVM.AllWritable()
647         if len(writables) == 0 {
648                 log.Print("No writable volumes.")
649                 return 0, FullError
650         }
651
652         allFull := true
653         for _, vol := range writables {
654                 err := vol.Put(hash, block)
655                 if err == nil {
656                         return vol.Replication(), nil // success!
657                 }
658                 if err != FullError {
659                         // The volume is not full but the
660                         // write did not succeed.  Report the
661                         // error and continue trying.
662                         allFull = false
663                         log.Printf("%s: Write(%s): %s", vol, hash, err)
664                 }
665         }
666
667         if allFull {
668                 log.Print("All volumes are full.")
669                 return 0, FullError
670         }
671         // Already logged the non-full errors.
672         return 0, GenericError
673 }
674
675 // CompareAndTouch returns the current replication level if one of the
676 // volumes already has the given content and it successfully updates
677 // the relevant block's modification time in order to protect it from
678 // premature garbage collection. Otherwise, it returns a non-nil
679 // error.
680 func CompareAndTouch(hash string, buf []byte) (int, error) {
681         var bestErr error = NotFoundError
682         for _, vol := range KeepVM.AllWritable() {
683                 if err := vol.Compare(hash, buf); err == CollisionError {
684                         // Stop if we have a block with same hash but
685                         // different content. (It will be impossible
686                         // to tell which one is wanted if we have
687                         // both, so there's no point writing it even
688                         // on a different volume.)
689                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
690                         return 0, err
691                 } else if os.IsNotExist(err) {
692                         // Block does not exist. This is the only
693                         // "normal" error: we don't log anything.
694                         continue
695                 } else if err != nil {
696                         // Couldn't open file, data is corrupt on
697                         // disk, etc.: log this abnormal condition,
698                         // and try the next volume.
699                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
700                         continue
701                 }
702                 if err := vol.Touch(hash); err != nil {
703                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
704                         bestErr = err
705                         continue
706                 }
707                 // Compare and Touch both worked --> done.
708                 return vol.Replication(), nil
709         }
710         return 0, bestErr
711 }
712
713 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
714
715 // IsValidLocator returns true if the specified string is a valid Keep locator.
716 //   When Keep is extended to support hash types other than MD5,
717 //   this should be updated to cover those as well.
718 //
719 func IsValidLocator(loc string) bool {
720         return validLocatorRe.MatchString(loc)
721 }
722
723 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
724
725 // GetAPIToken returns the OAuth2 token from the Authorization
726 // header of a HTTP request, or an empty string if no matching
727 // token is found.
728 func GetAPIToken(req *http.Request) string {
729         if auth, ok := req.Header["Authorization"]; ok {
730                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
731                         return match[1]
732                 }
733         }
734         return ""
735 }
736
737 // IsExpired returns true if the given Unix timestamp (expressed as a
738 // hexadecimal string) is in the past, or if timestampHex cannot be
739 // parsed as a hexadecimal string.
740 func IsExpired(timestampHex string) bool {
741         ts, err := strconv.ParseInt(timestampHex, 16, 0)
742         if err != nil {
743                 log.Printf("IsExpired: %s", err)
744                 return true
745         }
746         return time.Unix(ts, 0).Before(time.Now())
747 }
748
749 // CanDelete returns true if the user identified by apiToken is
750 // allowed to delete blocks.
751 func CanDelete(apiToken string) bool {
752         if apiToken == "" {
753                 return false
754         }
755         // Blocks may be deleted only when Keep has been configured with a
756         // data manager.
757         if IsSystemAuth(apiToken) {
758                 return true
759         }
760         // TODO(twp): look up apiToken with the API server
761         // return true if is_admin is true and if the token
762         // has unlimited scope
763         return false
764 }
765
766 // IsSystemAuth returns true if the given token is allowed to perform
767 // system level actions like deleting data.
768 func IsSystemAuth(token string) bool {
769         return token != "" && token == theConfig.systemAuthToken
770 }