10467: Return "context done" error instead of 404 if client hangs up during GET.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "sync"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // List volumes: path, device number, bytes used/avail.
50         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51
52         // Replace the current pull queue.
53         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54
55         // Replace the current trash queue.
56         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57
58         // Untrash moves blocks from trash back into store
59         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
60
61         // Any request which does not match any of these routes gets
62         // 400 Bad Request.
63         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
64
65         return rest
66 }
67
68 // BadRequestHandler is a HandleFunc to address bad requests.
69 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
70         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
71 }
72
73 // GetBlockHandler is a HandleFunc to address Get block requests.
74 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
75         if theConfig.RequireSignatures {
76                 locator := req.URL.Path[1:] // strip leading slash
77                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
78                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
79                         return
80                 }
81         }
82
83         // TODO: Probe volumes to check whether the block _might_
84         // exist. Some volumes/types could support a quick existence
85         // check without causing other operations to suffer. If all
86         // volumes support that, and assure us the block definitely
87         // isn't here, we can return 404 now instead of waiting for a
88         // buffer.
89
90         buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
91         if err != nil {
92                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
93                 return
94         }
95         defer bufs.Put(buf)
96
97         ctx, cancel := context.WithCancel(context.TODO())
98         if resp, ok := resp.(http.CloseNotifier); ok {
99                 go func() {
100                         <-resp.CloseNotify()
101                         cancel()
102                 }()
103         }
104         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
105         if err != nil {
106                 code := http.StatusInternalServerError
107                 if err, ok := err.(*KeepError); ok {
108                         code = err.HTTPCode
109                 }
110                 http.Error(resp, err.Error(), code)
111                 return
112         }
113
114         resp.Header().Set("Content-Length", strconv.Itoa(size))
115         resp.Header().Set("Content-Type", "application/octet-stream")
116         resp.Write(buf[:size])
117 }
118
119 // Get a buffer from the pool -- but give up and return a non-nil
120 // error if resp implements http.CloseNotifier and tells us that the
121 // client has disconnected before we get a buffer.
122 func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
123         var closeNotifier <-chan bool
124         if resp, ok := resp.(http.CloseNotifier); ok {
125                 closeNotifier = resp.CloseNotify()
126         }
127         var buf []byte
128         bufReady := make(chan []byte)
129         go func() {
130                 bufReady <- bufs.Get(bufSize)
131                 close(bufReady)
132         }()
133         select {
134         case buf = <-bufReady:
135                 return buf, nil
136         case <-closeNotifier:
137                 go func() {
138                         // Even if closeNotifier happened first, we
139                         // need to keep waiting for our buf so we can
140                         // return it to the pool.
141                         bufs.Put(<-bufReady)
142                 }()
143                 return nil, ErrClientDisconnect
144         }
145 }
146
147 // PutBlockHandler is a HandleFunc to address Put block requests.
148 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
149         hash := mux.Vars(req)["hash"]
150
151         // Detect as many error conditions as possible before reading
152         // the body: avoid transmitting data that will not end up
153         // being written anyway.
154
155         if req.ContentLength == -1 {
156                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
157                 return
158         }
159
160         if req.ContentLength > BlockSize {
161                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
162                 return
163         }
164
165         if len(KeepVM.AllWritable()) == 0 {
166                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
167                 return
168         }
169
170         buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
171         if err != nil {
172                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
173                 return
174         }
175
176         _, err = io.ReadFull(req.Body, buf)
177         if err != nil {
178                 http.Error(resp, err.Error(), 500)
179                 bufs.Put(buf)
180                 return
181         }
182
183         replication, err := PutBlock(buf, hash)
184         bufs.Put(buf)
185
186         if err != nil {
187                 ke := err.(*KeepError)
188                 http.Error(resp, ke.Error(), ke.HTTPCode)
189                 return
190         }
191
192         // Success; add a size hint, sign the locator if possible, and
193         // return it to the client.
194         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
195         apiToken := GetAPIToken(req)
196         if theConfig.blobSigningKey != nil && apiToken != "" {
197                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
198                 returnHash = SignLocator(returnHash, apiToken, expiry)
199         }
200         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
201         resp.Write([]byte(returnHash + "\n"))
202 }
203
204 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
205 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
206         // Reject unauthorized requests.
207         if !IsSystemAuth(GetAPIToken(req)) {
208                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
209                 return
210         }
211
212         prefix := mux.Vars(req)["prefix"]
213
214         for _, vol := range KeepVM.AllReadable() {
215                 if err := vol.IndexTo(prefix, resp); err != nil {
216                         // The only errors returned by IndexTo are
217                         // write errors returned by resp.Write(),
218                         // which probably means the client has
219                         // disconnected and this error will never be
220                         // reported to the client -- but it will
221                         // appear in our own error log.
222                         http.Error(resp, err.Error(), http.StatusInternalServerError)
223                         return
224                 }
225         }
226         // An empty line at EOF is the only way the client can be
227         // assured the entire index was received.
228         resp.Write([]byte{'\n'})
229 }
230
231 // StatusHandler
232 //     Responds to /status.json requests with the current node status,
233 //     described in a JSON structure.
234 //
235 //     The data given in a status.json response includes:
236 //        volumes - a list of Keep volumes currently in use by this server
237 //          each volume is an object with the following fields:
238 //            * mount_point
239 //            * device_num (an integer identifying the underlying filesystem)
240 //            * bytes_free
241 //            * bytes_used
242
243 // PoolStatus struct
244 type PoolStatus struct {
245         Alloc uint64 `json:"BytesAllocated"`
246         Cap   int    `json:"BuffersMax"`
247         Len   int    `json:"BuffersInUse"`
248 }
249
250 // NodeStatus struct
251 type NodeStatus struct {
252         Volumes    []*VolumeStatus `json:"volumes"`
253         BufferPool PoolStatus
254         PullQueue  WorkQueueStatus
255         TrashQueue WorkQueueStatus
256         Memory     runtime.MemStats
257 }
258
259 var st NodeStatus
260 var stLock sync.Mutex
261
262 // StatusHandler addresses /status.json requests.
263 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
264         stLock.Lock()
265         readNodeStatus(&st)
266         jstat, err := json.Marshal(&st)
267         stLock.Unlock()
268         if err == nil {
269                 resp.Write(jstat)
270         } else {
271                 log.Printf("json.Marshal: %s", err)
272                 log.Printf("NodeStatus = %v", &st)
273                 http.Error(resp, err.Error(), 500)
274         }
275 }
276
277 // populate the given NodeStatus struct with current values.
278 func readNodeStatus(st *NodeStatus) {
279         vols := KeepVM.AllReadable()
280         if cap(st.Volumes) < len(vols) {
281                 st.Volumes = make([]*VolumeStatus, len(vols))
282         }
283         st.Volumes = st.Volumes[:0]
284         for _, vol := range vols {
285                 if s := vol.Status(); s != nil {
286                         st.Volumes = append(st.Volumes, s)
287                 }
288         }
289         st.BufferPool.Alloc = bufs.Alloc()
290         st.BufferPool.Cap = bufs.Cap()
291         st.BufferPool.Len = bufs.Len()
292         st.PullQueue = getWorkQueueStatus(pullq)
293         st.TrashQueue = getWorkQueueStatus(trashq)
294         runtime.ReadMemStats(&st.Memory)
295 }
296
297 // return a WorkQueueStatus for the given queue. If q is nil (which
298 // should never happen except in test suites), return a zero status
299 // value instead of crashing.
300 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
301         if q == nil {
302                 // This should only happen during tests.
303                 return WorkQueueStatus{}
304         }
305         return q.Status()
306 }
307
308 // DeleteHandler processes DELETE requests.
309 //
310 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
311 // from all connected volumes.
312 //
313 // Only the Data Manager, or an Arvados admin with scope "all", are
314 // allowed to issue DELETE requests.  If a DELETE request is not
315 // authenticated or is issued by a non-admin user, the server returns
316 // a PermissionError.
317 //
318 // Upon receiving a valid request from an authorized user,
319 // DeleteHandler deletes all copies of the specified block on local
320 // writable volumes.
321 //
322 // Response format:
323 //
324 // If the requested blocks was not found on any volume, the response
325 // code is HTTP 404 Not Found.
326 //
327 // Otherwise, the response code is 200 OK, with a response body
328 // consisting of the JSON message
329 //
330 //    {"copies_deleted":d,"copies_failed":f}
331 //
332 // where d and f are integers representing the number of blocks that
333 // were successfully and unsuccessfully deleted.
334 //
335 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
336         hash := mux.Vars(req)["hash"]
337
338         // Confirm that this user is an admin and has a token with unlimited scope.
339         var tok = GetAPIToken(req)
340         if tok == "" || !CanDelete(tok) {
341                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
342                 return
343         }
344
345         if !theConfig.EnableDelete {
346                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
347                 return
348         }
349
350         // Delete copies of this block from all available volumes.
351         // Report how many blocks were successfully deleted, and how
352         // many were found on writable volumes but not deleted.
353         var result struct {
354                 Deleted int `json:"copies_deleted"`
355                 Failed  int `json:"copies_failed"`
356         }
357         for _, vol := range KeepVM.AllWritable() {
358                 if err := vol.Trash(hash); err == nil {
359                         result.Deleted++
360                 } else if os.IsNotExist(err) {
361                         continue
362                 } else {
363                         result.Failed++
364                         log.Println("DeleteHandler:", err)
365                 }
366         }
367
368         var st int
369
370         if result.Deleted == 0 && result.Failed == 0 {
371                 st = http.StatusNotFound
372         } else {
373                 st = http.StatusOK
374         }
375
376         resp.WriteHeader(st)
377
378         if st == http.StatusOK {
379                 if body, err := json.Marshal(result); err == nil {
380                         resp.Write(body)
381                 } else {
382                         log.Printf("json.Marshal: %s (result = %v)", err, result)
383                         http.Error(resp, err.Error(), 500)
384                 }
385         }
386 }
387
388 /* PullHandler processes "PUT /pull" requests for the data manager.
389    The request body is a JSON message containing a list of pull
390    requests in the following format:
391
392    [
393       {
394          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
395          "servers":[
396                         "keep0.qr1hi.arvadosapi.com:25107",
397                         "keep1.qr1hi.arvadosapi.com:25108"
398                  ]
399           },
400           {
401                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
402                  "servers":[
403                         "10.0.1.5:25107",
404                         "10.0.1.6:25107",
405                         "10.0.1.7:25108"
406                  ]
407           },
408           ...
409    ]
410
411    Each pull request in the list consists of a block locator string
412    and an ordered list of servers.  Keepstore should try to fetch the
413    block from each server in turn.
414
415    If the request has not been sent by the Data Manager, return 401
416    Unauthorized.
417
418    If the JSON unmarshalling fails, return 400 Bad Request.
419 */
420
421 // PullRequest consists of a block locator and an ordered list of servers
422 type PullRequest struct {
423         Locator string   `json:"locator"`
424         Servers []string `json:"servers"`
425 }
426
427 // PullHandler processes "PUT /pull" requests for the data manager.
428 func PullHandler(resp http.ResponseWriter, req *http.Request) {
429         // Reject unauthorized requests.
430         if !IsSystemAuth(GetAPIToken(req)) {
431                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
432                 return
433         }
434
435         // Parse the request body.
436         var pr []PullRequest
437         r := json.NewDecoder(req.Body)
438         if err := r.Decode(&pr); err != nil {
439                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
440                 return
441         }
442
443         // We have a properly formatted pull list sent from the data
444         // manager.  Report success and send the list to the pull list
445         // manager for further handling.
446         resp.WriteHeader(http.StatusOK)
447         resp.Write([]byte(
448                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
449
450         plist := list.New()
451         for _, p := range pr {
452                 plist.PushBack(p)
453         }
454         pullq.ReplaceQueue(plist)
455 }
456
457 // TrashRequest consists of a block locator and it's Mtime
458 type TrashRequest struct {
459         Locator    string `json:"locator"`
460         BlockMtime int64  `json:"block_mtime"`
461 }
462
463 // TrashHandler processes /trash requests.
464 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
465         // Reject unauthorized requests.
466         if !IsSystemAuth(GetAPIToken(req)) {
467                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
468                 return
469         }
470
471         // Parse the request body.
472         var trash []TrashRequest
473         r := json.NewDecoder(req.Body)
474         if err := r.Decode(&trash); err != nil {
475                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
476                 return
477         }
478
479         // We have a properly formatted trash list sent from the data
480         // manager.  Report success and send the list to the trash work
481         // queue for further handling.
482         resp.WriteHeader(http.StatusOK)
483         resp.Write([]byte(
484                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
485
486         tlist := list.New()
487         for _, t := range trash {
488                 tlist.PushBack(t)
489         }
490         trashq.ReplaceQueue(tlist)
491 }
492
493 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
494 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
495         // Reject unauthorized requests.
496         if !IsSystemAuth(GetAPIToken(req)) {
497                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
498                 return
499         }
500
501         hash := mux.Vars(req)["hash"]
502
503         if len(KeepVM.AllWritable()) == 0 {
504                 http.Error(resp, "No writable volumes", http.StatusNotFound)
505                 return
506         }
507
508         var untrashedOn, failedOn []string
509         var numNotFound int
510         for _, vol := range KeepVM.AllWritable() {
511                 err := vol.Untrash(hash)
512
513                 if os.IsNotExist(err) {
514                         numNotFound++
515                 } else if err != nil {
516                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
517                         failedOn = append(failedOn, vol.String())
518                 } else {
519                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
520                         untrashedOn = append(untrashedOn, vol.String())
521                 }
522         }
523
524         if numNotFound == len(KeepVM.AllWritable()) {
525                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
526                 return
527         }
528
529         if len(failedOn) == len(KeepVM.AllWritable()) {
530                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
531         } else {
532                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
533                 if len(failedOn) > 0 {
534                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
535                 }
536                 resp.Write([]byte(respBody))
537         }
538 }
539
540 // GetBlock and PutBlock implement lower-level code for handling
541 // blocks by rooting through volumes connected to the local machine.
542 // Once the handler has determined that system policy permits the
543 // request, it calls these methods to perform the actual operation.
544 //
545 // TODO(twp): this code would probably be better located in the
546 // VolumeManager interface. As an abstraction, the VolumeManager
547 // should be the only part of the code that cares about which volume a
548 // block is stored on, so it should be responsible for figuring out
549 // which volume to check for fetching blocks, storing blocks, etc.
550
551 // GetBlock fetches the block identified by "hash" into the provided
552 // buf, and returns the data size.
553 //
554 // If the block cannot be found on any volume, returns NotFoundError.
555 //
556 // If the block found does not have the correct MD5 hash, returns
557 // DiskHashError.
558 //
559 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
560         // Attempt to read the requested hash from a keep volume.
561         errorToCaller := NotFoundError
562
563         for _, vol := range KeepVM.AllReadable() {
564                 size, err := vol.Get(ctx, hash, buf)
565                 select {
566                 case <-ctx.Done():
567                         return 0, ctx.Err()
568                 default:
569                 }
570                 if err != nil {
571                         // IsNotExist is an expected error and may be
572                         // ignored. All other errors are logged. In
573                         // any case we continue trying to read other
574                         // volumes. If all volumes report IsNotExist,
575                         // we return a NotFoundError.
576                         if !os.IsNotExist(err) {
577                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
578                         }
579                         continue
580                 }
581                 // Check the file checksum.
582                 //
583                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
584                 if filehash != hash {
585                         // TODO: Try harder to tell a sysadmin about
586                         // this.
587                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
588                                 vol, hash, filehash)
589                         errorToCaller = DiskHashError
590                         continue
591                 }
592                 if errorToCaller == DiskHashError {
593                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
594                                 vol, hash)
595                 }
596                 return size, nil
597         }
598         return 0, errorToCaller
599 }
600
601 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
602 //
603 // PutBlock(block, hash)
604 //   Stores the BLOCK (identified by the content id HASH) in Keep.
605 //
606 //   The MD5 checksum of the block must be identical to the content id HASH.
607 //   If not, an error is returned.
608 //
609 //   PutBlock stores the BLOCK on the first Keep volume with free space.
610 //   A failure code is returned to the user only if all volumes fail.
611 //
612 //   On success, PutBlock returns nil.
613 //   On failure, it returns a KeepError with one of the following codes:
614 //
615 //   500 Collision
616 //          A different block with the same hash already exists on this
617 //          Keep server.
618 //   422 MD5Fail
619 //          The MD5 hash of the BLOCK does not match the argument HASH.
620 //   503 Full
621 //          There was not enough space left in any Keep volume to store
622 //          the object.
623 //   500 Fail
624 //          The object could not be stored for some other reason (e.g.
625 //          all writes failed). The text of the error message should
626 //          provide as much detail as possible.
627 //
628 func PutBlock(block []byte, hash string) (int, error) {
629         // Check that BLOCK's checksum matches HASH.
630         blockhash := fmt.Sprintf("%x", md5.Sum(block))
631         if blockhash != hash {
632                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
633                 return 0, RequestHashError
634         }
635
636         // If we already have this data, it's intact on disk, and we
637         // can update its timestamp, return success. If we have
638         // different data with the same hash, return failure.
639         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
640                 return n, err
641         }
642
643         // Choose a Keep volume to write to.
644         // If this volume fails, try all of the volumes in order.
645         if vol := KeepVM.NextWritable(); vol != nil {
646                 if err := vol.Put(hash, block); err == nil {
647                         return vol.Replication(), nil // success!
648                 }
649         }
650
651         writables := KeepVM.AllWritable()
652         if len(writables) == 0 {
653                 log.Print("No writable volumes.")
654                 return 0, FullError
655         }
656
657         allFull := true
658         for _, vol := range writables {
659                 err := vol.Put(hash, block)
660                 if err == nil {
661                         return vol.Replication(), nil // success!
662                 }
663                 if err != FullError {
664                         // The volume is not full but the
665                         // write did not succeed.  Report the
666                         // error and continue trying.
667                         allFull = false
668                         log.Printf("%s: Write(%s): %s", vol, hash, err)
669                 }
670         }
671
672         if allFull {
673                 log.Print("All volumes are full.")
674                 return 0, FullError
675         }
676         // Already logged the non-full errors.
677         return 0, GenericError
678 }
679
680 // CompareAndTouch returns the current replication level if one of the
681 // volumes already has the given content and it successfully updates
682 // the relevant block's modification time in order to protect it from
683 // premature garbage collection. Otherwise, it returns a non-nil
684 // error.
685 func CompareAndTouch(hash string, buf []byte) (int, error) {
686         var bestErr error = NotFoundError
687         for _, vol := range KeepVM.AllWritable() {
688                 if err := vol.Compare(hash, buf); err == CollisionError {
689                         // Stop if we have a block with same hash but
690                         // different content. (It will be impossible
691                         // to tell which one is wanted if we have
692                         // both, so there's no point writing it even
693                         // on a different volume.)
694                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
695                         return 0, err
696                 } else if os.IsNotExist(err) {
697                         // Block does not exist. This is the only
698                         // "normal" error: we don't log anything.
699                         continue
700                 } else if err != nil {
701                         // Couldn't open file, data is corrupt on
702                         // disk, etc.: log this abnormal condition,
703                         // and try the next volume.
704                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
705                         continue
706                 }
707                 if err := vol.Touch(hash); err != nil {
708                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
709                         bestErr = err
710                         continue
711                 }
712                 // Compare and Touch both worked --> done.
713                 return vol.Replication(), nil
714         }
715         return 0, bestErr
716 }
717
718 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
719
720 // IsValidLocator returns true if the specified string is a valid Keep locator.
721 //   When Keep is extended to support hash types other than MD5,
722 //   this should be updated to cover those as well.
723 //
724 func IsValidLocator(loc string) bool {
725         return validLocatorRe.MatchString(loc)
726 }
727
728 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
729
730 // GetAPIToken returns the OAuth2 token from the Authorization
731 // header of a HTTP request, or an empty string if no matching
732 // token is found.
733 func GetAPIToken(req *http.Request) string {
734         if auth, ok := req.Header["Authorization"]; ok {
735                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
736                         return match[1]
737                 }
738         }
739         return ""
740 }
741
742 // IsExpired returns true if the given Unix timestamp (expressed as a
743 // hexadecimal string) is in the past, or if timestampHex cannot be
744 // parsed as a hexadecimal string.
745 func IsExpired(timestampHex string) bool {
746         ts, err := strconv.ParseInt(timestampHex, 16, 0)
747         if err != nil {
748                 log.Printf("IsExpired: %s", err)
749                 return true
750         }
751         return time.Unix(ts, 0).Before(time.Now())
752 }
753
754 // CanDelete returns true if the user identified by apiToken is
755 // allowed to delete blocks.
756 func CanDelete(apiToken string) bool {
757         if apiToken == "" {
758                 return false
759         }
760         // Blocks may be deleted only when Keep has been configured with a
761         // data manager.
762         if IsSystemAuth(apiToken) {
763                 return true
764         }
765         // TODO(twp): look up apiToken with the API server
766         // return true if is_admin is true and if the token
767         // has unlimited scope
768         return false
769 }
770
771 // IsSystemAuth returns true if the given token is allowed to perform
772 // system level actions like deleting data.
773 func IsSystemAuth(token string) bool {
774         return token != "" && token == theConfig.systemAuthToken
775 }