ac2d71228f823a9ab64c24d82e2c4fe6091648bf
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "sync"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // List volumes: path, device number, bytes used/avail.
50         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51
52         // Replace the current pull queue.
53         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54
55         // Replace the current trash queue.
56         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57
58         // Untrash moves blocks from trash back into store
59         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
60
61         // Any request which does not match any of these routes gets
62         // 400 Bad Request.
63         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
64
65         return rest
66 }
67
68 // BadRequestHandler is a HandleFunc to address bad requests.
69 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
70         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
71 }
72
73 // GetBlockHandler is a HandleFunc to address Get block requests.
74 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
75         ctx := contextForResponse(context.TODO(), resp)
76
77         if theConfig.RequireSignatures {
78                 locator := req.URL.Path[1:] // strip leading slash
79                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
80                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
81                         return
82                 }
83         }
84
85         // TODO: Probe volumes to check whether the block _might_
86         // exist. Some volumes/types could support a quick existence
87         // check without causing other operations to suffer. If all
88         // volumes support that, and assure us the block definitely
89         // isn't here, we can return 404 now instead of waiting for a
90         // buffer.
91
92         buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
93         if err != nil {
94                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
95                 return
96         }
97         defer bufs.Put(buf)
98
99         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
100         if err != nil {
101                 code := http.StatusInternalServerError
102                 if err, ok := err.(*KeepError); ok {
103                         code = err.HTTPCode
104                 }
105                 http.Error(resp, err.Error(), code)
106                 return
107         }
108
109         resp.Header().Set("Content-Length", strconv.Itoa(size))
110         resp.Header().Set("Content-Type", "application/octet-stream")
111         resp.Write(buf[:size])
112 }
113
114 // Return a new context that gets cancelled by resp's
115 // CloseNotifier. If resp does not implement http.CloseNotifier,
116 // return parent.
117 func contextForResponse(parent context.Context, resp http.ResponseWriter) context.Context {
118         cn, ok := resp.(http.CloseNotifier)
119         if !ok {
120                 return parent
121         }
122         ctx, cancel := context.WithCancel(parent)
123         go func() {
124                 <-cn.CloseNotify()
125                 cancel()
126         }()
127         return ctx
128 }
129
130 // Get a buffer from the pool -- but give up and return a non-nil
131 // error if resp implements http.CloseNotifier and tells us that the
132 // client has disconnected before we get a buffer.
133 func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
134         var closeNotifier <-chan bool
135         if resp, ok := resp.(http.CloseNotifier); ok {
136                 closeNotifier = resp.CloseNotify()
137         }
138         var buf []byte
139         bufReady := make(chan []byte)
140         go func() {
141                 bufReady <- bufs.Get(bufSize)
142                 close(bufReady)
143         }()
144         select {
145         case buf = <-bufReady:
146                 return buf, nil
147         case <-closeNotifier:
148                 go func() {
149                         // Even if closeNotifier happened first, we
150                         // need to keep waiting for our buf so we can
151                         // return it to the pool.
152                         bufs.Put(<-bufReady)
153                 }()
154                 return nil, ErrClientDisconnect
155         }
156 }
157
158 // PutBlockHandler is a HandleFunc to address Put block requests.
159 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
160         hash := mux.Vars(req)["hash"]
161
162         // Detect as many error conditions as possible before reading
163         // the body: avoid transmitting data that will not end up
164         // being written anyway.
165
166         if req.ContentLength == -1 {
167                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
168                 return
169         }
170
171         if req.ContentLength > BlockSize {
172                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
173                 return
174         }
175
176         if len(KeepVM.AllWritable()) == 0 {
177                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
178                 return
179         }
180
181         buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
182         if err != nil {
183                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
184                 return
185         }
186
187         _, err = io.ReadFull(req.Body, buf)
188         if err != nil {
189                 http.Error(resp, err.Error(), 500)
190                 bufs.Put(buf)
191                 return
192         }
193
194         replication, err := PutBlock(buf, hash)
195         bufs.Put(buf)
196
197         if err != nil {
198                 ke := err.(*KeepError)
199                 http.Error(resp, ke.Error(), ke.HTTPCode)
200                 return
201         }
202
203         // Success; add a size hint, sign the locator if possible, and
204         // return it to the client.
205         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
206         apiToken := GetAPIToken(req)
207         if theConfig.blobSigningKey != nil && apiToken != "" {
208                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
209                 returnHash = SignLocator(returnHash, apiToken, expiry)
210         }
211         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
212         resp.Write([]byte(returnHash + "\n"))
213 }
214
215 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
216 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
217         // Reject unauthorized requests.
218         if !IsSystemAuth(GetAPIToken(req)) {
219                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
220                 return
221         }
222
223         prefix := mux.Vars(req)["prefix"]
224
225         for _, vol := range KeepVM.AllReadable() {
226                 if err := vol.IndexTo(prefix, resp); err != nil {
227                         // The only errors returned by IndexTo are
228                         // write errors returned by resp.Write(),
229                         // which probably means the client has
230                         // disconnected and this error will never be
231                         // reported to the client -- but it will
232                         // appear in our own error log.
233                         http.Error(resp, err.Error(), http.StatusInternalServerError)
234                         return
235                 }
236         }
237         // An empty line at EOF is the only way the client can be
238         // assured the entire index was received.
239         resp.Write([]byte{'\n'})
240 }
241
242 // StatusHandler
243 //     Responds to /status.json requests with the current node status,
244 //     described in a JSON structure.
245 //
246 //     The data given in a status.json response includes:
247 //        volumes - a list of Keep volumes currently in use by this server
248 //          each volume is an object with the following fields:
249 //            * mount_point
250 //            * device_num (an integer identifying the underlying filesystem)
251 //            * bytes_free
252 //            * bytes_used
253
254 // PoolStatus struct
255 type PoolStatus struct {
256         Alloc uint64 `json:"BytesAllocated"`
257         Cap   int    `json:"BuffersMax"`
258         Len   int    `json:"BuffersInUse"`
259 }
260
261 // NodeStatus struct
262 type NodeStatus struct {
263         Volumes    []*VolumeStatus `json:"volumes"`
264         BufferPool PoolStatus
265         PullQueue  WorkQueueStatus
266         TrashQueue WorkQueueStatus
267         Memory     runtime.MemStats
268 }
269
270 var st NodeStatus
271 var stLock sync.Mutex
272
273 // StatusHandler addresses /status.json requests.
274 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
275         stLock.Lock()
276         readNodeStatus(&st)
277         jstat, err := json.Marshal(&st)
278         stLock.Unlock()
279         if err == nil {
280                 resp.Write(jstat)
281         } else {
282                 log.Printf("json.Marshal: %s", err)
283                 log.Printf("NodeStatus = %v", &st)
284                 http.Error(resp, err.Error(), 500)
285         }
286 }
287
288 // populate the given NodeStatus struct with current values.
289 func readNodeStatus(st *NodeStatus) {
290         vols := KeepVM.AllReadable()
291         if cap(st.Volumes) < len(vols) {
292                 st.Volumes = make([]*VolumeStatus, len(vols))
293         }
294         st.Volumes = st.Volumes[:0]
295         for _, vol := range vols {
296                 if s := vol.Status(); s != nil {
297                         st.Volumes = append(st.Volumes, s)
298                 }
299         }
300         st.BufferPool.Alloc = bufs.Alloc()
301         st.BufferPool.Cap = bufs.Cap()
302         st.BufferPool.Len = bufs.Len()
303         st.PullQueue = getWorkQueueStatus(pullq)
304         st.TrashQueue = getWorkQueueStatus(trashq)
305         runtime.ReadMemStats(&st.Memory)
306 }
307
308 // return a WorkQueueStatus for the given queue. If q is nil (which
309 // should never happen except in test suites), return a zero status
310 // value instead of crashing.
311 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
312         if q == nil {
313                 // This should only happen during tests.
314                 return WorkQueueStatus{}
315         }
316         return q.Status()
317 }
318
319 // DeleteHandler processes DELETE requests.
320 //
321 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
322 // from all connected volumes.
323 //
324 // Only the Data Manager, or an Arvados admin with scope "all", are
325 // allowed to issue DELETE requests.  If a DELETE request is not
326 // authenticated or is issued by a non-admin user, the server returns
327 // a PermissionError.
328 //
329 // Upon receiving a valid request from an authorized user,
330 // DeleteHandler deletes all copies of the specified block on local
331 // writable volumes.
332 //
333 // Response format:
334 //
335 // If the requested blocks was not found on any volume, the response
336 // code is HTTP 404 Not Found.
337 //
338 // Otherwise, the response code is 200 OK, with a response body
339 // consisting of the JSON message
340 //
341 //    {"copies_deleted":d,"copies_failed":f}
342 //
343 // where d and f are integers representing the number of blocks that
344 // were successfully and unsuccessfully deleted.
345 //
346 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
347         hash := mux.Vars(req)["hash"]
348
349         // Confirm that this user is an admin and has a token with unlimited scope.
350         var tok = GetAPIToken(req)
351         if tok == "" || !CanDelete(tok) {
352                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
353                 return
354         }
355
356         if !theConfig.EnableDelete {
357                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
358                 return
359         }
360
361         // Delete copies of this block from all available volumes.
362         // Report how many blocks were successfully deleted, and how
363         // many were found on writable volumes but not deleted.
364         var result struct {
365                 Deleted int `json:"copies_deleted"`
366                 Failed  int `json:"copies_failed"`
367         }
368         for _, vol := range KeepVM.AllWritable() {
369                 if err := vol.Trash(hash); err == nil {
370                         result.Deleted++
371                 } else if os.IsNotExist(err) {
372                         continue
373                 } else {
374                         result.Failed++
375                         log.Println("DeleteHandler:", err)
376                 }
377         }
378
379         var st int
380
381         if result.Deleted == 0 && result.Failed == 0 {
382                 st = http.StatusNotFound
383         } else {
384                 st = http.StatusOK
385         }
386
387         resp.WriteHeader(st)
388
389         if st == http.StatusOK {
390                 if body, err := json.Marshal(result); err == nil {
391                         resp.Write(body)
392                 } else {
393                         log.Printf("json.Marshal: %s (result = %v)", err, result)
394                         http.Error(resp, err.Error(), 500)
395                 }
396         }
397 }
398
399 /* PullHandler processes "PUT /pull" requests for the data manager.
400    The request body is a JSON message containing a list of pull
401    requests in the following format:
402
403    [
404       {
405          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
406          "servers":[
407                         "keep0.qr1hi.arvadosapi.com:25107",
408                         "keep1.qr1hi.arvadosapi.com:25108"
409                  ]
410           },
411           {
412                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
413                  "servers":[
414                         "10.0.1.5:25107",
415                         "10.0.1.6:25107",
416                         "10.0.1.7:25108"
417                  ]
418           },
419           ...
420    ]
421
422    Each pull request in the list consists of a block locator string
423    and an ordered list of servers.  Keepstore should try to fetch the
424    block from each server in turn.
425
426    If the request has not been sent by the Data Manager, return 401
427    Unauthorized.
428
429    If the JSON unmarshalling fails, return 400 Bad Request.
430 */
431
432 // PullRequest consists of a block locator and an ordered list of servers
433 type PullRequest struct {
434         Locator string   `json:"locator"`
435         Servers []string `json:"servers"`
436 }
437
438 // PullHandler processes "PUT /pull" requests for the data manager.
439 func PullHandler(resp http.ResponseWriter, req *http.Request) {
440         // Reject unauthorized requests.
441         if !IsSystemAuth(GetAPIToken(req)) {
442                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
443                 return
444         }
445
446         // Parse the request body.
447         var pr []PullRequest
448         r := json.NewDecoder(req.Body)
449         if err := r.Decode(&pr); err != nil {
450                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
451                 return
452         }
453
454         // We have a properly formatted pull list sent from the data
455         // manager.  Report success and send the list to the pull list
456         // manager for further handling.
457         resp.WriteHeader(http.StatusOK)
458         resp.Write([]byte(
459                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
460
461         plist := list.New()
462         for _, p := range pr {
463                 plist.PushBack(p)
464         }
465         pullq.ReplaceQueue(plist)
466 }
467
468 // TrashRequest consists of a block locator and it's Mtime
469 type TrashRequest struct {
470         Locator    string `json:"locator"`
471         BlockMtime int64  `json:"block_mtime"`
472 }
473
474 // TrashHandler processes /trash requests.
475 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
476         // Reject unauthorized requests.
477         if !IsSystemAuth(GetAPIToken(req)) {
478                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
479                 return
480         }
481
482         // Parse the request body.
483         var trash []TrashRequest
484         r := json.NewDecoder(req.Body)
485         if err := r.Decode(&trash); err != nil {
486                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
487                 return
488         }
489
490         // We have a properly formatted trash list sent from the data
491         // manager.  Report success and send the list to the trash work
492         // queue for further handling.
493         resp.WriteHeader(http.StatusOK)
494         resp.Write([]byte(
495                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
496
497         tlist := list.New()
498         for _, t := range trash {
499                 tlist.PushBack(t)
500         }
501         trashq.ReplaceQueue(tlist)
502 }
503
504 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
505 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
506         // Reject unauthorized requests.
507         if !IsSystemAuth(GetAPIToken(req)) {
508                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
509                 return
510         }
511
512         hash := mux.Vars(req)["hash"]
513
514         if len(KeepVM.AllWritable()) == 0 {
515                 http.Error(resp, "No writable volumes", http.StatusNotFound)
516                 return
517         }
518
519         var untrashedOn, failedOn []string
520         var numNotFound int
521         for _, vol := range KeepVM.AllWritable() {
522                 err := vol.Untrash(hash)
523
524                 if os.IsNotExist(err) {
525                         numNotFound++
526                 } else if err != nil {
527                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
528                         failedOn = append(failedOn, vol.String())
529                 } else {
530                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
531                         untrashedOn = append(untrashedOn, vol.String())
532                 }
533         }
534
535         if numNotFound == len(KeepVM.AllWritable()) {
536                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
537                 return
538         }
539
540         if len(failedOn) == len(KeepVM.AllWritable()) {
541                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
542         } else {
543                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
544                 if len(failedOn) > 0 {
545                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
546                 }
547                 resp.Write([]byte(respBody))
548         }
549 }
550
551 // GetBlock and PutBlock implement lower-level code for handling
552 // blocks by rooting through volumes connected to the local machine.
553 // Once the handler has determined that system policy permits the
554 // request, it calls these methods to perform the actual operation.
555 //
556 // TODO(twp): this code would probably be better located in the
557 // VolumeManager interface. As an abstraction, the VolumeManager
558 // should be the only part of the code that cares about which volume a
559 // block is stored on, so it should be responsible for figuring out
560 // which volume to check for fetching blocks, storing blocks, etc.
561
562 // GetBlock fetches the block identified by "hash" into the provided
563 // buf, and returns the data size.
564 //
565 // If the block cannot be found on any volume, returns NotFoundError.
566 //
567 // If the block found does not have the correct MD5 hash, returns
568 // DiskHashError.
569 //
570 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
571         // Attempt to read the requested hash from a keep volume.
572         errorToCaller := NotFoundError
573
574         for _, vol := range KeepVM.AllReadable() {
575                 size, err := vol.Get(ctx, hash, buf)
576                 select {
577                 case <-ctx.Done():
578                         return 0, ctx.Err()
579                 default:
580                 }
581                 if err != nil {
582                         // IsNotExist is an expected error and may be
583                         // ignored. All other errors are logged. In
584                         // any case we continue trying to read other
585                         // volumes. If all volumes report IsNotExist,
586                         // we return a NotFoundError.
587                         if !os.IsNotExist(err) {
588                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
589                         }
590                         continue
591                 }
592                 // Check the file checksum.
593                 //
594                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
595                 if filehash != hash {
596                         // TODO: Try harder to tell a sysadmin about
597                         // this.
598                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
599                                 vol, hash, filehash)
600                         errorToCaller = DiskHashError
601                         continue
602                 }
603                 if errorToCaller == DiskHashError {
604                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
605                                 vol, hash)
606                 }
607                 return size, nil
608         }
609         return 0, errorToCaller
610 }
611
612 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
613 //
614 // PutBlock(block, hash)
615 //   Stores the BLOCK (identified by the content id HASH) in Keep.
616 //
617 //   The MD5 checksum of the block must be identical to the content id HASH.
618 //   If not, an error is returned.
619 //
620 //   PutBlock stores the BLOCK on the first Keep volume with free space.
621 //   A failure code is returned to the user only if all volumes fail.
622 //
623 //   On success, PutBlock returns nil.
624 //   On failure, it returns a KeepError with one of the following codes:
625 //
626 //   500 Collision
627 //          A different block with the same hash already exists on this
628 //          Keep server.
629 //   422 MD5Fail
630 //          The MD5 hash of the BLOCK does not match the argument HASH.
631 //   503 Full
632 //          There was not enough space left in any Keep volume to store
633 //          the object.
634 //   500 Fail
635 //          The object could not be stored for some other reason (e.g.
636 //          all writes failed). The text of the error message should
637 //          provide as much detail as possible.
638 //
639 func PutBlock(block []byte, hash string) (int, error) {
640         // Check that BLOCK's checksum matches HASH.
641         blockhash := fmt.Sprintf("%x", md5.Sum(block))
642         if blockhash != hash {
643                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
644                 return 0, RequestHashError
645         }
646
647         // If we already have this data, it's intact on disk, and we
648         // can update its timestamp, return success. If we have
649         // different data with the same hash, return failure.
650         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
651                 return n, err
652         }
653
654         // Choose a Keep volume to write to.
655         // If this volume fails, try all of the volumes in order.
656         if vol := KeepVM.NextWritable(); vol != nil {
657                 if err := vol.Put(hash, block); err == nil {
658                         return vol.Replication(), nil // success!
659                 }
660         }
661
662         writables := KeepVM.AllWritable()
663         if len(writables) == 0 {
664                 log.Print("No writable volumes.")
665                 return 0, FullError
666         }
667
668         allFull := true
669         for _, vol := range writables {
670                 err := vol.Put(hash, block)
671                 if err == nil {
672                         return vol.Replication(), nil // success!
673                 }
674                 if err != FullError {
675                         // The volume is not full but the
676                         // write did not succeed.  Report the
677                         // error and continue trying.
678                         allFull = false
679                         log.Printf("%s: Write(%s): %s", vol, hash, err)
680                 }
681         }
682
683         if allFull {
684                 log.Print("All volumes are full.")
685                 return 0, FullError
686         }
687         // Already logged the non-full errors.
688         return 0, GenericError
689 }
690
691 // CompareAndTouch returns the current replication level if one of the
692 // volumes already has the given content and it successfully updates
693 // the relevant block's modification time in order to protect it from
694 // premature garbage collection. Otherwise, it returns a non-nil
695 // error.
696 func CompareAndTouch(hash string, buf []byte) (int, error) {
697         var bestErr error = NotFoundError
698         for _, vol := range KeepVM.AllWritable() {
699                 if err := vol.Compare(hash, buf); err == CollisionError {
700                         // Stop if we have a block with same hash but
701                         // different content. (It will be impossible
702                         // to tell which one is wanted if we have
703                         // both, so there's no point writing it even
704                         // on a different volume.)
705                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
706                         return 0, err
707                 } else if os.IsNotExist(err) {
708                         // Block does not exist. This is the only
709                         // "normal" error: we don't log anything.
710                         continue
711                 } else if err != nil {
712                         // Couldn't open file, data is corrupt on
713                         // disk, etc.: log this abnormal condition,
714                         // and try the next volume.
715                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
716                         continue
717                 }
718                 if err := vol.Touch(hash); err != nil {
719                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
720                         bestErr = err
721                         continue
722                 }
723                 // Compare and Touch both worked --> done.
724                 return vol.Replication(), nil
725         }
726         return 0, bestErr
727 }
728
729 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
730
731 // IsValidLocator returns true if the specified string is a valid Keep locator.
732 //   When Keep is extended to support hash types other than MD5,
733 //   this should be updated to cover those as well.
734 //
735 func IsValidLocator(loc string) bool {
736         return validLocatorRe.MatchString(loc)
737 }
738
739 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
740
741 // GetAPIToken returns the OAuth2 token from the Authorization
742 // header of a HTTP request, or an empty string if no matching
743 // token is found.
744 func GetAPIToken(req *http.Request) string {
745         if auth, ok := req.Header["Authorization"]; ok {
746                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
747                         return match[1]
748                 }
749         }
750         return ""
751 }
752
753 // IsExpired returns true if the given Unix timestamp (expressed as a
754 // hexadecimal string) is in the past, or if timestampHex cannot be
755 // parsed as a hexadecimal string.
756 func IsExpired(timestampHex string) bool {
757         ts, err := strconv.ParseInt(timestampHex, 16, 0)
758         if err != nil {
759                 log.Printf("IsExpired: %s", err)
760                 return true
761         }
762         return time.Unix(ts, 0).Before(time.Now())
763 }
764
765 // CanDelete returns true if the user identified by apiToken is
766 // allowed to delete blocks.
767 func CanDelete(apiToken string) bool {
768         if apiToken == "" {
769                 return false
770         }
771         // Blocks may be deleted only when Keep has been configured with a
772         // data manager.
773         if IsSystemAuth(apiToken) {
774                 return true
775         }
776         // TODO(twp): look up apiToken with the API server
777         // return true if is_admin is true and if the token
778         // has unlimited scope
779         return false
780 }
781
782 // IsSystemAuth returns true if the given token is allowed to perform
783 // system level actions like deleting data.
784 func IsSystemAuth(token string) bool {
785         return token != "" && token == theConfig.systemAuthToken
786 }