10467: Fix context usage: ensure cancel always gets called.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "sync"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // List volumes: path, device number, bytes used/avail.
50         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51
52         // Replace the current pull queue.
53         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54
55         // Replace the current trash queue.
56         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57
58         // Untrash moves blocks from trash back into store
59         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
60
61         // Any request which does not match any of these routes gets
62         // 400 Bad Request.
63         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
64
65         return rest
66 }
67
68 // BadRequestHandler is a HandleFunc to address bad requests.
69 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
70         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
71 }
72
73 // GetBlockHandler is a HandleFunc to address Get block requests.
74 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
75         ctx, cancel := contextForResponse(context.TODO(), resp)
76         defer cancel()
77
78         if theConfig.RequireSignatures {
79                 locator := req.URL.Path[1:] // strip leading slash
80                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
81                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
82                         return
83                 }
84         }
85
86         // TODO: Probe volumes to check whether the block _might_
87         // exist. Some volumes/types could support a quick existence
88         // check without causing other operations to suffer. If all
89         // volumes support that, and assure us the block definitely
90         // isn't here, we can return 404 now instead of waiting for a
91         // buffer.
92
93         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
94         if err != nil {
95                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
96                 return
97         }
98         defer bufs.Put(buf)
99
100         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
101         if err != nil {
102                 code := http.StatusInternalServerError
103                 if err, ok := err.(*KeepError); ok {
104                         code = err.HTTPCode
105                 }
106                 http.Error(resp, err.Error(), code)
107                 return
108         }
109
110         resp.Header().Set("Content-Length", strconv.Itoa(size))
111         resp.Header().Set("Content-Type", "application/octet-stream")
112         resp.Write(buf[:size])
113 }
114
115 // Return a new context that gets cancelled by resp's CloseNotifier.
116 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
117         ctx, cancel := context.WithCancel(parent)
118         if cn, ok := resp.(http.CloseNotifier); ok {
119                 go func(c <-chan bool) {
120                         select {
121                         case <-c:
122                                 theConfig.debugLogf("cancel context")
123                                 cancel()
124                         case <-ctx.Done():
125                         }
126                 }(cn.CloseNotify())
127         }
128         return ctx, cancel
129 }
130
131 // Get a buffer from the pool -- but give up and return a non-nil
132 // error if ctx ends before we get a buffer.
133 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
134         bufReady := make(chan []byte)
135         go func() {
136                 bufReady <- bufs.Get(bufSize)
137         }()
138         select {
139         case buf := <-bufReady:
140                 return buf, nil
141         case <-ctx.Done():
142                 go func() {
143                         // Even if closeNotifier happened first, we
144                         // need to keep waiting for our buf so we can
145                         // return it to the pool.
146                         bufs.Put(<-bufReady)
147                 }()
148                 return nil, ErrClientDisconnect
149         }
150 }
151
152 // PutBlockHandler is a HandleFunc to address Put block requests.
153 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
154         ctx, cancel := contextForResponse(context.TODO(), resp)
155         defer cancel()
156
157         hash := mux.Vars(req)["hash"]
158
159         // Detect as many error conditions as possible before reading
160         // the body: avoid transmitting data that will not end up
161         // being written anyway.
162
163         if req.ContentLength == -1 {
164                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
165                 return
166         }
167
168         if req.ContentLength > BlockSize {
169                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
170                 return
171         }
172
173         if len(KeepVM.AllWritable()) == 0 {
174                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
175                 return
176         }
177
178         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
179         if err != nil {
180                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
181                 return
182         }
183
184         _, err = io.ReadFull(req.Body, buf)
185         if err != nil {
186                 http.Error(resp, err.Error(), 500)
187                 bufs.Put(buf)
188                 return
189         }
190
191         replication, err := PutBlock(ctx, buf, hash)
192         bufs.Put(buf)
193
194         if err != nil {
195                 ke := err.(*KeepError)
196                 http.Error(resp, ke.Error(), ke.HTTPCode)
197                 return
198         }
199
200         // Success; add a size hint, sign the locator if possible, and
201         // return it to the client.
202         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
203         apiToken := GetAPIToken(req)
204         if theConfig.blobSigningKey != nil && apiToken != "" {
205                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
206                 returnHash = SignLocator(returnHash, apiToken, expiry)
207         }
208         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
209         resp.Write([]byte(returnHash + "\n"))
210 }
211
212 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
213 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
214         // Reject unauthorized requests.
215         if !IsSystemAuth(GetAPIToken(req)) {
216                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
217                 return
218         }
219
220         prefix := mux.Vars(req)["prefix"]
221
222         for _, vol := range KeepVM.AllReadable() {
223                 if err := vol.IndexTo(prefix, resp); err != nil {
224                         // The only errors returned by IndexTo are
225                         // write errors returned by resp.Write(),
226                         // which probably means the client has
227                         // disconnected and this error will never be
228                         // reported to the client -- but it will
229                         // appear in our own error log.
230                         http.Error(resp, err.Error(), http.StatusInternalServerError)
231                         return
232                 }
233         }
234         // An empty line at EOF is the only way the client can be
235         // assured the entire index was received.
236         resp.Write([]byte{'\n'})
237 }
238
239 // StatusHandler
240 //     Responds to /status.json requests with the current node status,
241 //     described in a JSON structure.
242 //
243 //     The data given in a status.json response includes:
244 //        volumes - a list of Keep volumes currently in use by this server
245 //          each volume is an object with the following fields:
246 //            * mount_point
247 //            * device_num (an integer identifying the underlying filesystem)
248 //            * bytes_free
249 //            * bytes_used
250
251 // PoolStatus struct
252 type PoolStatus struct {
253         Alloc uint64 `json:"BytesAllocated"`
254         Cap   int    `json:"BuffersMax"`
255         Len   int    `json:"BuffersInUse"`
256 }
257
258 // NodeStatus struct
259 type NodeStatus struct {
260         Volumes    []*VolumeStatus `json:"volumes"`
261         BufferPool PoolStatus
262         PullQueue  WorkQueueStatus
263         TrashQueue WorkQueueStatus
264         Memory     runtime.MemStats
265 }
266
267 var st NodeStatus
268 var stLock sync.Mutex
269
270 // StatusHandler addresses /status.json requests.
271 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
272         stLock.Lock()
273         readNodeStatus(&st)
274         jstat, err := json.Marshal(&st)
275         stLock.Unlock()
276         if err == nil {
277                 resp.Write(jstat)
278         } else {
279                 log.Printf("json.Marshal: %s", err)
280                 log.Printf("NodeStatus = %v", &st)
281                 http.Error(resp, err.Error(), 500)
282         }
283 }
284
285 // populate the given NodeStatus struct with current values.
286 func readNodeStatus(st *NodeStatus) {
287         vols := KeepVM.AllReadable()
288         if cap(st.Volumes) < len(vols) {
289                 st.Volumes = make([]*VolumeStatus, len(vols))
290         }
291         st.Volumes = st.Volumes[:0]
292         for _, vol := range vols {
293                 if s := vol.Status(); s != nil {
294                         st.Volumes = append(st.Volumes, s)
295                 }
296         }
297         st.BufferPool.Alloc = bufs.Alloc()
298         st.BufferPool.Cap = bufs.Cap()
299         st.BufferPool.Len = bufs.Len()
300         st.PullQueue = getWorkQueueStatus(pullq)
301         st.TrashQueue = getWorkQueueStatus(trashq)
302         runtime.ReadMemStats(&st.Memory)
303 }
304
305 // return a WorkQueueStatus for the given queue. If q is nil (which
306 // should never happen except in test suites), return a zero status
307 // value instead of crashing.
308 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
309         if q == nil {
310                 // This should only happen during tests.
311                 return WorkQueueStatus{}
312         }
313         return q.Status()
314 }
315
316 // DeleteHandler processes DELETE requests.
317 //
318 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
319 // from all connected volumes.
320 //
321 // Only the Data Manager, or an Arvados admin with scope "all", are
322 // allowed to issue DELETE requests.  If a DELETE request is not
323 // authenticated or is issued by a non-admin user, the server returns
324 // a PermissionError.
325 //
326 // Upon receiving a valid request from an authorized user,
327 // DeleteHandler deletes all copies of the specified block on local
328 // writable volumes.
329 //
330 // Response format:
331 //
332 // If the requested blocks was not found on any volume, the response
333 // code is HTTP 404 Not Found.
334 //
335 // Otherwise, the response code is 200 OK, with a response body
336 // consisting of the JSON message
337 //
338 //    {"copies_deleted":d,"copies_failed":f}
339 //
340 // where d and f are integers representing the number of blocks that
341 // were successfully and unsuccessfully deleted.
342 //
343 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
344         hash := mux.Vars(req)["hash"]
345
346         // Confirm that this user is an admin and has a token with unlimited scope.
347         var tok = GetAPIToken(req)
348         if tok == "" || !CanDelete(tok) {
349                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
350                 return
351         }
352
353         if !theConfig.EnableDelete {
354                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
355                 return
356         }
357
358         // Delete copies of this block from all available volumes.
359         // Report how many blocks were successfully deleted, and how
360         // many were found on writable volumes but not deleted.
361         var result struct {
362                 Deleted int `json:"copies_deleted"`
363                 Failed  int `json:"copies_failed"`
364         }
365         for _, vol := range KeepVM.AllWritable() {
366                 if err := vol.Trash(hash); err == nil {
367                         result.Deleted++
368                 } else if os.IsNotExist(err) {
369                         continue
370                 } else {
371                         result.Failed++
372                         log.Println("DeleteHandler:", err)
373                 }
374         }
375
376         var st int
377
378         if result.Deleted == 0 && result.Failed == 0 {
379                 st = http.StatusNotFound
380         } else {
381                 st = http.StatusOK
382         }
383
384         resp.WriteHeader(st)
385
386         if st == http.StatusOK {
387                 if body, err := json.Marshal(result); err == nil {
388                         resp.Write(body)
389                 } else {
390                         log.Printf("json.Marshal: %s (result = %v)", err, result)
391                         http.Error(resp, err.Error(), 500)
392                 }
393         }
394 }
395
396 /* PullHandler processes "PUT /pull" requests for the data manager.
397    The request body is a JSON message containing a list of pull
398    requests in the following format:
399
400    [
401       {
402          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
403          "servers":[
404                         "keep0.qr1hi.arvadosapi.com:25107",
405                         "keep1.qr1hi.arvadosapi.com:25108"
406                  ]
407           },
408           {
409                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
410                  "servers":[
411                         "10.0.1.5:25107",
412                         "10.0.1.6:25107",
413                         "10.0.1.7:25108"
414                  ]
415           },
416           ...
417    ]
418
419    Each pull request in the list consists of a block locator string
420    and an ordered list of servers.  Keepstore should try to fetch the
421    block from each server in turn.
422
423    If the request has not been sent by the Data Manager, return 401
424    Unauthorized.
425
426    If the JSON unmarshalling fails, return 400 Bad Request.
427 */
428
429 // PullRequest consists of a block locator and an ordered list of servers
430 type PullRequest struct {
431         Locator string   `json:"locator"`
432         Servers []string `json:"servers"`
433 }
434
435 // PullHandler processes "PUT /pull" requests for the data manager.
436 func PullHandler(resp http.ResponseWriter, req *http.Request) {
437         // Reject unauthorized requests.
438         if !IsSystemAuth(GetAPIToken(req)) {
439                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
440                 return
441         }
442
443         // Parse the request body.
444         var pr []PullRequest
445         r := json.NewDecoder(req.Body)
446         if err := r.Decode(&pr); err != nil {
447                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
448                 return
449         }
450
451         // We have a properly formatted pull list sent from the data
452         // manager.  Report success and send the list to the pull list
453         // manager for further handling.
454         resp.WriteHeader(http.StatusOK)
455         resp.Write([]byte(
456                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
457
458         plist := list.New()
459         for _, p := range pr {
460                 plist.PushBack(p)
461         }
462         pullq.ReplaceQueue(plist)
463 }
464
465 // TrashRequest consists of a block locator and it's Mtime
466 type TrashRequest struct {
467         Locator    string `json:"locator"`
468         BlockMtime int64  `json:"block_mtime"`
469 }
470
471 // TrashHandler processes /trash requests.
472 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
473         // Reject unauthorized requests.
474         if !IsSystemAuth(GetAPIToken(req)) {
475                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
476                 return
477         }
478
479         // Parse the request body.
480         var trash []TrashRequest
481         r := json.NewDecoder(req.Body)
482         if err := r.Decode(&trash); err != nil {
483                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
484                 return
485         }
486
487         // We have a properly formatted trash list sent from the data
488         // manager.  Report success and send the list to the trash work
489         // queue for further handling.
490         resp.WriteHeader(http.StatusOK)
491         resp.Write([]byte(
492                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
493
494         tlist := list.New()
495         for _, t := range trash {
496                 tlist.PushBack(t)
497         }
498         trashq.ReplaceQueue(tlist)
499 }
500
501 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
502 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
503         // Reject unauthorized requests.
504         if !IsSystemAuth(GetAPIToken(req)) {
505                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
506                 return
507         }
508
509         hash := mux.Vars(req)["hash"]
510
511         if len(KeepVM.AllWritable()) == 0 {
512                 http.Error(resp, "No writable volumes", http.StatusNotFound)
513                 return
514         }
515
516         var untrashedOn, failedOn []string
517         var numNotFound int
518         for _, vol := range KeepVM.AllWritable() {
519                 err := vol.Untrash(hash)
520
521                 if os.IsNotExist(err) {
522                         numNotFound++
523                 } else if err != nil {
524                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
525                         failedOn = append(failedOn, vol.String())
526                 } else {
527                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
528                         untrashedOn = append(untrashedOn, vol.String())
529                 }
530         }
531
532         if numNotFound == len(KeepVM.AllWritable()) {
533                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
534                 return
535         }
536
537         if len(failedOn) == len(KeepVM.AllWritable()) {
538                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
539         } else {
540                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
541                 if len(failedOn) > 0 {
542                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
543                 }
544                 resp.Write([]byte(respBody))
545         }
546 }
547
548 // GetBlock and PutBlock implement lower-level code for handling
549 // blocks by rooting through volumes connected to the local machine.
550 // Once the handler has determined that system policy permits the
551 // request, it calls these methods to perform the actual operation.
552 //
553 // TODO(twp): this code would probably be better located in the
554 // VolumeManager interface. As an abstraction, the VolumeManager
555 // should be the only part of the code that cares about which volume a
556 // block is stored on, so it should be responsible for figuring out
557 // which volume to check for fetching blocks, storing blocks, etc.
558
559 // GetBlock fetches the block identified by "hash" into the provided
560 // buf, and returns the data size.
561 //
562 // If the block cannot be found on any volume, returns NotFoundError.
563 //
564 // If the block found does not have the correct MD5 hash, returns
565 // DiskHashError.
566 //
567 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
568         // Attempt to read the requested hash from a keep volume.
569         errorToCaller := NotFoundError
570
571         for _, vol := range KeepVM.AllReadable() {
572                 size, err := vol.Get(ctx, hash, buf)
573                 select {
574                 case <-ctx.Done():
575                         return 0, ctx.Err()
576                 default:
577                 }
578                 if err != nil {
579                         // IsNotExist is an expected error and may be
580                         // ignored. All other errors are logged. In
581                         // any case we continue trying to read other
582                         // volumes. If all volumes report IsNotExist,
583                         // we return a NotFoundError.
584                         if !os.IsNotExist(err) {
585                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
586                         }
587                         continue
588                 }
589                 // Check the file checksum.
590                 //
591                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
592                 if filehash != hash {
593                         // TODO: Try harder to tell a sysadmin about
594                         // this.
595                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
596                                 vol, hash, filehash)
597                         errorToCaller = DiskHashError
598                         continue
599                 }
600                 if errorToCaller == DiskHashError {
601                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
602                                 vol, hash)
603                 }
604                 return size, nil
605         }
606         return 0, errorToCaller
607 }
608
609 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
610 //
611 // PutBlock(ctx, block, hash)
612 //   Stores the BLOCK (identified by the content id HASH) in Keep.
613 //
614 //   The MD5 checksum of the block must be identical to the content id HASH.
615 //   If not, an error is returned.
616 //
617 //   PutBlock stores the BLOCK on the first Keep volume with free space.
618 //   A failure code is returned to the user only if all volumes fail.
619 //
620 //   On success, PutBlock returns nil.
621 //   On failure, it returns a KeepError with one of the following codes:
622 //
623 //   500 Collision
624 //          A different block with the same hash already exists on this
625 //          Keep server.
626 //   422 MD5Fail
627 //          The MD5 hash of the BLOCK does not match the argument HASH.
628 //   503 Full
629 //          There was not enough space left in any Keep volume to store
630 //          the object.
631 //   500 Fail
632 //          The object could not be stored for some other reason (e.g.
633 //          all writes failed). The text of the error message should
634 //          provide as much detail as possible.
635 //
636 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
637         // Check that BLOCK's checksum matches HASH.
638         blockhash := fmt.Sprintf("%x", md5.Sum(block))
639         if blockhash != hash {
640                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
641                 return 0, RequestHashError
642         }
643
644         // If we already have this data, it's intact on disk, and we
645         // can update its timestamp, return success. If we have
646         // different data with the same hash, return failure.
647         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
648                 return n, err
649         }
650
651         // Choose a Keep volume to write to.
652         // If this volume fails, try all of the volumes in order.
653         if vol := KeepVM.NextWritable(); vol != nil {
654                 if err := vol.Put(context.TODO(), hash, block); err == nil {
655                         return vol.Replication(), nil // success!
656                 }
657         }
658
659         writables := KeepVM.AllWritable()
660         if len(writables) == 0 {
661                 log.Print("No writable volumes.")
662                 return 0, FullError
663         }
664
665         allFull := true
666         for _, vol := range writables {
667                 err := vol.Put(ctx, hash, block)
668                 select {
669                 case <-ctx.Done():
670                         return 0, ctx.Err()
671                 default:
672                 }
673                 if err == nil {
674                         return vol.Replication(), nil // success!
675                 }
676                 if err != FullError {
677                         // The volume is not full but the
678                         // write did not succeed.  Report the
679                         // error and continue trying.
680                         allFull = false
681                         log.Printf("%s: Write(%s): %s", vol, hash, err)
682                 }
683         }
684
685         if allFull {
686                 log.Print("All volumes are full.")
687                 return 0, FullError
688         }
689         // Already logged the non-full errors.
690         return 0, GenericError
691 }
692
693 // CompareAndTouch returns the current replication level if one of the
694 // volumes already has the given content and it successfully updates
695 // the relevant block's modification time in order to protect it from
696 // premature garbage collection. Otherwise, it returns a non-nil
697 // error.
698 func CompareAndTouch(hash string, buf []byte) (int, error) {
699         var bestErr error = NotFoundError
700         for _, vol := range KeepVM.AllWritable() {
701                 if err := vol.Compare(hash, buf); err == CollisionError {
702                         // Stop if we have a block with same hash but
703                         // different content. (It will be impossible
704                         // to tell which one is wanted if we have
705                         // both, so there's no point writing it even
706                         // on a different volume.)
707                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
708                         return 0, err
709                 } else if os.IsNotExist(err) {
710                         // Block does not exist. This is the only
711                         // "normal" error: we don't log anything.
712                         continue
713                 } else if err != nil {
714                         // Couldn't open file, data is corrupt on
715                         // disk, etc.: log this abnormal condition,
716                         // and try the next volume.
717                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
718                         continue
719                 }
720                 if err := vol.Touch(hash); err != nil {
721                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
722                         bestErr = err
723                         continue
724                 }
725                 // Compare and Touch both worked --> done.
726                 return vol.Replication(), nil
727         }
728         return 0, bestErr
729 }
730
731 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
732
733 // IsValidLocator returns true if the specified string is a valid Keep locator.
734 //   When Keep is extended to support hash types other than MD5,
735 //   this should be updated to cover those as well.
736 //
737 func IsValidLocator(loc string) bool {
738         return validLocatorRe.MatchString(loc)
739 }
740
741 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
742
743 // GetAPIToken returns the OAuth2 token from the Authorization
744 // header of a HTTP request, or an empty string if no matching
745 // token is found.
746 func GetAPIToken(req *http.Request) string {
747         if auth, ok := req.Header["Authorization"]; ok {
748                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
749                         return match[1]
750                 }
751         }
752         return ""
753 }
754
755 // IsExpired returns true if the given Unix timestamp (expressed as a
756 // hexadecimal string) is in the past, or if timestampHex cannot be
757 // parsed as a hexadecimal string.
758 func IsExpired(timestampHex string) bool {
759         ts, err := strconv.ParseInt(timestampHex, 16, 0)
760         if err != nil {
761                 log.Printf("IsExpired: %s", err)
762                 return true
763         }
764         return time.Unix(ts, 0).Before(time.Now())
765 }
766
767 // CanDelete returns true if the user identified by apiToken is
768 // allowed to delete blocks.
769 func CanDelete(apiToken string) bool {
770         if apiToken == "" {
771                 return false
772         }
773         // Blocks may be deleted only when Keep has been configured with a
774         // data manager.
775         if IsSystemAuth(apiToken) {
776                 return true
777         }
778         // TODO(twp): look up apiToken with the API server
779         // return true if is_admin is true and if the token
780         // has unlimited scope
781         return false
782 }
783
784 // IsSystemAuth returns true if the given token is allowed to perform
785 // system level actions like deleting data.
786 func IsSystemAuth(token string) bool {
787         return token != "" && token == theConfig.systemAuthToken
788 }