10467: Use ErrClientDisconnect. Convert type assertion panic to 500 error.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "sync"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // List volumes: path, device number, bytes used/avail.
50         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51
52         // Replace the current pull queue.
53         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54
55         // Replace the current trash queue.
56         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57
58         // Untrash moves blocks from trash back into store
59         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
60
61         // Any request which does not match any of these routes gets
62         // 400 Bad Request.
63         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
64
65         return rest
66 }
67
68 // BadRequestHandler is a HandleFunc to address bad requests.
69 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
70         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
71 }
72
73 // GetBlockHandler is a HandleFunc to address Get block requests.
74 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
75         ctx, cancel := contextForResponse(context.TODO(), resp)
76         defer cancel()
77
78         if theConfig.RequireSignatures {
79                 locator := req.URL.Path[1:] // strip leading slash
80                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
81                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
82                         return
83                 }
84         }
85
86         // TODO: Probe volumes to check whether the block _might_
87         // exist. Some volumes/types could support a quick existence
88         // check without causing other operations to suffer. If all
89         // volumes support that, and assure us the block definitely
90         // isn't here, we can return 404 now instead of waiting for a
91         // buffer.
92
93         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
94         if err != nil {
95                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
96                 return
97         }
98         defer bufs.Put(buf)
99
100         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
101         if err != nil {
102                 code := http.StatusInternalServerError
103                 if err, ok := err.(*KeepError); ok {
104                         code = err.HTTPCode
105                 }
106                 http.Error(resp, err.Error(), code)
107                 return
108         }
109
110         resp.Header().Set("Content-Length", strconv.Itoa(size))
111         resp.Header().Set("Content-Type", "application/octet-stream")
112         resp.Write(buf[:size])
113 }
114
115 // Return a new context that gets cancelled by resp's CloseNotifier.
116 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
117         ctx, cancel := context.WithCancel(parent)
118         if cn, ok := resp.(http.CloseNotifier); ok {
119                 go func(c <-chan bool) {
120                         select {
121                         case <-c:
122                                 theConfig.debugLogf("cancel context")
123                                 cancel()
124                         case <-ctx.Done():
125                         }
126                 }(cn.CloseNotify())
127         }
128         return ctx, cancel
129 }
130
131 // Get a buffer from the pool -- but give up and return a non-nil
132 // error if ctx ends before we get a buffer.
133 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
134         bufReady := make(chan []byte)
135         go func() {
136                 bufReady <- bufs.Get(bufSize)
137         }()
138         select {
139         case buf := <-bufReady:
140                 return buf, nil
141         case <-ctx.Done():
142                 go func() {
143                         // Even if closeNotifier happened first, we
144                         // need to keep waiting for our buf so we can
145                         // return it to the pool.
146                         bufs.Put(<-bufReady)
147                 }()
148                 return nil, ErrClientDisconnect
149         }
150 }
151
152 // PutBlockHandler is a HandleFunc to address Put block requests.
153 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
154         ctx, cancel := contextForResponse(context.TODO(), resp)
155         defer cancel()
156
157         hash := mux.Vars(req)["hash"]
158
159         // Detect as many error conditions as possible before reading
160         // the body: avoid transmitting data that will not end up
161         // being written anyway.
162
163         if req.ContentLength == -1 {
164                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
165                 return
166         }
167
168         if req.ContentLength > BlockSize {
169                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
170                 return
171         }
172
173         if len(KeepVM.AllWritable()) == 0 {
174                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
175                 return
176         }
177
178         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
179         if err != nil {
180                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
181                 return
182         }
183
184         _, err = io.ReadFull(req.Body, buf)
185         if err != nil {
186                 http.Error(resp, err.Error(), 500)
187                 bufs.Put(buf)
188                 return
189         }
190
191         replication, err := PutBlock(ctx, buf, hash)
192         bufs.Put(buf)
193
194         if err != nil {
195                 code := http.StatusInternalServerError
196                 if err, ok := err.(*KeepError); ok {
197                         code = err.HTTPCode
198                 }
199                 http.Error(resp, err.Error(), code)
200                 return
201         }
202
203         // Success; add a size hint, sign the locator if possible, and
204         // return it to the client.
205         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
206         apiToken := GetAPIToken(req)
207         if theConfig.blobSigningKey != nil && apiToken != "" {
208                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
209                 returnHash = SignLocator(returnHash, apiToken, expiry)
210         }
211         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
212         resp.Write([]byte(returnHash + "\n"))
213 }
214
215 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
216 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
217         // Reject unauthorized requests.
218         if !IsSystemAuth(GetAPIToken(req)) {
219                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
220                 return
221         }
222
223         prefix := mux.Vars(req)["prefix"]
224
225         for _, vol := range KeepVM.AllReadable() {
226                 if err := vol.IndexTo(prefix, resp); err != nil {
227                         // The only errors returned by IndexTo are
228                         // write errors returned by resp.Write(),
229                         // which probably means the client has
230                         // disconnected and this error will never be
231                         // reported to the client -- but it will
232                         // appear in our own error log.
233                         http.Error(resp, err.Error(), http.StatusInternalServerError)
234                         return
235                 }
236         }
237         // An empty line at EOF is the only way the client can be
238         // assured the entire index was received.
239         resp.Write([]byte{'\n'})
240 }
241
242 // StatusHandler
243 //     Responds to /status.json requests with the current node status,
244 //     described in a JSON structure.
245 //
246 //     The data given in a status.json response includes:
247 //        volumes - a list of Keep volumes currently in use by this server
248 //          each volume is an object with the following fields:
249 //            * mount_point
250 //            * device_num (an integer identifying the underlying filesystem)
251 //            * bytes_free
252 //            * bytes_used
253
254 // PoolStatus struct
255 type PoolStatus struct {
256         Alloc uint64 `json:"BytesAllocated"`
257         Cap   int    `json:"BuffersMax"`
258         Len   int    `json:"BuffersInUse"`
259 }
260
261 // NodeStatus struct
262 type NodeStatus struct {
263         Volumes    []*VolumeStatus `json:"volumes"`
264         BufferPool PoolStatus
265         PullQueue  WorkQueueStatus
266         TrashQueue WorkQueueStatus
267         Memory     runtime.MemStats
268 }
269
270 var st NodeStatus
271 var stLock sync.Mutex
272
273 // StatusHandler addresses /status.json requests.
274 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
275         stLock.Lock()
276         readNodeStatus(&st)
277         jstat, err := json.Marshal(&st)
278         stLock.Unlock()
279         if err == nil {
280                 resp.Write(jstat)
281         } else {
282                 log.Printf("json.Marshal: %s", err)
283                 log.Printf("NodeStatus = %v", &st)
284                 http.Error(resp, err.Error(), 500)
285         }
286 }
287
288 // populate the given NodeStatus struct with current values.
289 func readNodeStatus(st *NodeStatus) {
290         vols := KeepVM.AllReadable()
291         if cap(st.Volumes) < len(vols) {
292                 st.Volumes = make([]*VolumeStatus, len(vols))
293         }
294         st.Volumes = st.Volumes[:0]
295         for _, vol := range vols {
296                 if s := vol.Status(); s != nil {
297                         st.Volumes = append(st.Volumes, s)
298                 }
299         }
300         st.BufferPool.Alloc = bufs.Alloc()
301         st.BufferPool.Cap = bufs.Cap()
302         st.BufferPool.Len = bufs.Len()
303         st.PullQueue = getWorkQueueStatus(pullq)
304         st.TrashQueue = getWorkQueueStatus(trashq)
305         runtime.ReadMemStats(&st.Memory)
306 }
307
308 // return a WorkQueueStatus for the given queue. If q is nil (which
309 // should never happen except in test suites), return a zero status
310 // value instead of crashing.
311 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
312         if q == nil {
313                 // This should only happen during tests.
314                 return WorkQueueStatus{}
315         }
316         return q.Status()
317 }
318
319 // DeleteHandler processes DELETE requests.
320 //
321 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
322 // from all connected volumes.
323 //
324 // Only the Data Manager, or an Arvados admin with scope "all", are
325 // allowed to issue DELETE requests.  If a DELETE request is not
326 // authenticated or is issued by a non-admin user, the server returns
327 // a PermissionError.
328 //
329 // Upon receiving a valid request from an authorized user,
330 // DeleteHandler deletes all copies of the specified block on local
331 // writable volumes.
332 //
333 // Response format:
334 //
335 // If the requested blocks was not found on any volume, the response
336 // code is HTTP 404 Not Found.
337 //
338 // Otherwise, the response code is 200 OK, with a response body
339 // consisting of the JSON message
340 //
341 //    {"copies_deleted":d,"copies_failed":f}
342 //
343 // where d and f are integers representing the number of blocks that
344 // were successfully and unsuccessfully deleted.
345 //
346 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
347         hash := mux.Vars(req)["hash"]
348
349         // Confirm that this user is an admin and has a token with unlimited scope.
350         var tok = GetAPIToken(req)
351         if tok == "" || !CanDelete(tok) {
352                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
353                 return
354         }
355
356         if !theConfig.EnableDelete {
357                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
358                 return
359         }
360
361         // Delete copies of this block from all available volumes.
362         // Report how many blocks were successfully deleted, and how
363         // many were found on writable volumes but not deleted.
364         var result struct {
365                 Deleted int `json:"copies_deleted"`
366                 Failed  int `json:"copies_failed"`
367         }
368         for _, vol := range KeepVM.AllWritable() {
369                 if err := vol.Trash(hash); err == nil {
370                         result.Deleted++
371                 } else if os.IsNotExist(err) {
372                         continue
373                 } else {
374                         result.Failed++
375                         log.Println("DeleteHandler:", err)
376                 }
377         }
378
379         var st int
380
381         if result.Deleted == 0 && result.Failed == 0 {
382                 st = http.StatusNotFound
383         } else {
384                 st = http.StatusOK
385         }
386
387         resp.WriteHeader(st)
388
389         if st == http.StatusOK {
390                 if body, err := json.Marshal(result); err == nil {
391                         resp.Write(body)
392                 } else {
393                         log.Printf("json.Marshal: %s (result = %v)", err, result)
394                         http.Error(resp, err.Error(), 500)
395                 }
396         }
397 }
398
399 /* PullHandler processes "PUT /pull" requests for the data manager.
400    The request body is a JSON message containing a list of pull
401    requests in the following format:
402
403    [
404       {
405          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
406          "servers":[
407                         "keep0.qr1hi.arvadosapi.com:25107",
408                         "keep1.qr1hi.arvadosapi.com:25108"
409                  ]
410           },
411           {
412                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
413                  "servers":[
414                         "10.0.1.5:25107",
415                         "10.0.1.6:25107",
416                         "10.0.1.7:25108"
417                  ]
418           },
419           ...
420    ]
421
422    Each pull request in the list consists of a block locator string
423    and an ordered list of servers.  Keepstore should try to fetch the
424    block from each server in turn.
425
426    If the request has not been sent by the Data Manager, return 401
427    Unauthorized.
428
429    If the JSON unmarshalling fails, return 400 Bad Request.
430 */
431
432 // PullRequest consists of a block locator and an ordered list of servers
433 type PullRequest struct {
434         Locator string   `json:"locator"`
435         Servers []string `json:"servers"`
436 }
437
438 // PullHandler processes "PUT /pull" requests for the data manager.
439 func PullHandler(resp http.ResponseWriter, req *http.Request) {
440         // Reject unauthorized requests.
441         if !IsSystemAuth(GetAPIToken(req)) {
442                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
443                 return
444         }
445
446         // Parse the request body.
447         var pr []PullRequest
448         r := json.NewDecoder(req.Body)
449         if err := r.Decode(&pr); err != nil {
450                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
451                 return
452         }
453
454         // We have a properly formatted pull list sent from the data
455         // manager.  Report success and send the list to the pull list
456         // manager for further handling.
457         resp.WriteHeader(http.StatusOK)
458         resp.Write([]byte(
459                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
460
461         plist := list.New()
462         for _, p := range pr {
463                 plist.PushBack(p)
464         }
465         pullq.ReplaceQueue(plist)
466 }
467
468 // TrashRequest consists of a block locator and it's Mtime
469 type TrashRequest struct {
470         Locator    string `json:"locator"`
471         BlockMtime int64  `json:"block_mtime"`
472 }
473
474 // TrashHandler processes /trash requests.
475 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
476         // Reject unauthorized requests.
477         if !IsSystemAuth(GetAPIToken(req)) {
478                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
479                 return
480         }
481
482         // Parse the request body.
483         var trash []TrashRequest
484         r := json.NewDecoder(req.Body)
485         if err := r.Decode(&trash); err != nil {
486                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
487                 return
488         }
489
490         // We have a properly formatted trash list sent from the data
491         // manager.  Report success and send the list to the trash work
492         // queue for further handling.
493         resp.WriteHeader(http.StatusOK)
494         resp.Write([]byte(
495                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
496
497         tlist := list.New()
498         for _, t := range trash {
499                 tlist.PushBack(t)
500         }
501         trashq.ReplaceQueue(tlist)
502 }
503
504 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
505 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
506         // Reject unauthorized requests.
507         if !IsSystemAuth(GetAPIToken(req)) {
508                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
509                 return
510         }
511
512         hash := mux.Vars(req)["hash"]
513
514         if len(KeepVM.AllWritable()) == 0 {
515                 http.Error(resp, "No writable volumes", http.StatusNotFound)
516                 return
517         }
518
519         var untrashedOn, failedOn []string
520         var numNotFound int
521         for _, vol := range KeepVM.AllWritable() {
522                 err := vol.Untrash(hash)
523
524                 if os.IsNotExist(err) {
525                         numNotFound++
526                 } else if err != nil {
527                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
528                         failedOn = append(failedOn, vol.String())
529                 } else {
530                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
531                         untrashedOn = append(untrashedOn, vol.String())
532                 }
533         }
534
535         if numNotFound == len(KeepVM.AllWritable()) {
536                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
537                 return
538         }
539
540         if len(failedOn) == len(KeepVM.AllWritable()) {
541                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
542         } else {
543                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
544                 if len(failedOn) > 0 {
545                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
546                 }
547                 resp.Write([]byte(respBody))
548         }
549 }
550
551 // GetBlock and PutBlock implement lower-level code for handling
552 // blocks by rooting through volumes connected to the local machine.
553 // Once the handler has determined that system policy permits the
554 // request, it calls these methods to perform the actual operation.
555 //
556 // TODO(twp): this code would probably be better located in the
557 // VolumeManager interface. As an abstraction, the VolumeManager
558 // should be the only part of the code that cares about which volume a
559 // block is stored on, so it should be responsible for figuring out
560 // which volume to check for fetching blocks, storing blocks, etc.
561
562 // GetBlock fetches the block identified by "hash" into the provided
563 // buf, and returns the data size.
564 //
565 // If the block cannot be found on any volume, returns NotFoundError.
566 //
567 // If the block found does not have the correct MD5 hash, returns
568 // DiskHashError.
569 //
570 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
571         // Attempt to read the requested hash from a keep volume.
572         errorToCaller := NotFoundError
573
574         for _, vol := range KeepVM.AllReadable() {
575                 size, err := vol.Get(ctx, hash, buf)
576                 select {
577                 case <-ctx.Done():
578                         return 0, ErrClientDisconnect
579                 default:
580                 }
581                 if err != nil {
582                         // IsNotExist is an expected error and may be
583                         // ignored. All other errors are logged. In
584                         // any case we continue trying to read other
585                         // volumes. If all volumes report IsNotExist,
586                         // we return a NotFoundError.
587                         if !os.IsNotExist(err) {
588                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
589                         }
590                         continue
591                 }
592                 // Check the file checksum.
593                 //
594                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
595                 if filehash != hash {
596                         // TODO: Try harder to tell a sysadmin about
597                         // this.
598                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
599                                 vol, hash, filehash)
600                         errorToCaller = DiskHashError
601                         continue
602                 }
603                 if errorToCaller == DiskHashError {
604                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
605                                 vol, hash)
606                 }
607                 return size, nil
608         }
609         return 0, errorToCaller
610 }
611
612 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
613 //
614 // PutBlock(ctx, block, hash)
615 //   Stores the BLOCK (identified by the content id HASH) in Keep.
616 //
617 //   The MD5 checksum of the block must be identical to the content id HASH.
618 //   If not, an error is returned.
619 //
620 //   PutBlock stores the BLOCK on the first Keep volume with free space.
621 //   A failure code is returned to the user only if all volumes fail.
622 //
623 //   On success, PutBlock returns nil.
624 //   On failure, it returns a KeepError with one of the following codes:
625 //
626 //   500 Collision
627 //          A different block with the same hash already exists on this
628 //          Keep server.
629 //   422 MD5Fail
630 //          The MD5 hash of the BLOCK does not match the argument HASH.
631 //   503 Full
632 //          There was not enough space left in any Keep volume to store
633 //          the object.
634 //   500 Fail
635 //          The object could not be stored for some other reason (e.g.
636 //          all writes failed). The text of the error message should
637 //          provide as much detail as possible.
638 //
639 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
640         // Check that BLOCK's checksum matches HASH.
641         blockhash := fmt.Sprintf("%x", md5.Sum(block))
642         if blockhash != hash {
643                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
644                 return 0, RequestHashError
645         }
646
647         // If we already have this data, it's intact on disk, and we
648         // can update its timestamp, return success. If we have
649         // different data with the same hash, return failure.
650         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
651                 return n, err
652         }
653
654         // Choose a Keep volume to write to.
655         // If this volume fails, try all of the volumes in order.
656         if vol := KeepVM.NextWritable(); vol != nil {
657                 if err := vol.Put(ctx, hash, block); err == nil {
658                         return vol.Replication(), nil // success!
659                 }
660                 if ctx.Err() != nil {
661                         return 0, ErrClientDisconnect
662                 }
663         }
664
665         writables := KeepVM.AllWritable()
666         if len(writables) == 0 {
667                 log.Print("No writable volumes.")
668                 return 0, FullError
669         }
670
671         allFull := true
672         for _, vol := range writables {
673                 err := vol.Put(ctx, hash, block)
674                 if ctx.Err() != nil {
675                         return 0, ErrClientDisconnect
676                 }
677                 if err == nil {
678                         return vol.Replication(), nil // success!
679                 }
680                 if err != FullError {
681                         // The volume is not full but the
682                         // write did not succeed.  Report the
683                         // error and continue trying.
684                         allFull = false
685                         log.Printf("%s: Write(%s): %s", vol, hash, err)
686                 }
687         }
688
689         if allFull {
690                 log.Print("All volumes are full.")
691                 return 0, FullError
692         }
693         // Already logged the non-full errors.
694         return 0, GenericError
695 }
696
697 // CompareAndTouch returns the current replication level if one of the
698 // volumes already has the given content and it successfully updates
699 // the relevant block's modification time in order to protect it from
700 // premature garbage collection. Otherwise, it returns a non-nil
701 // error.
702 func CompareAndTouch(hash string, buf []byte) (int, error) {
703         var bestErr error = NotFoundError
704         for _, vol := range KeepVM.AllWritable() {
705                 if err := vol.Compare(hash, buf); err == CollisionError {
706                         // Stop if we have a block with same hash but
707                         // different content. (It will be impossible
708                         // to tell which one is wanted if we have
709                         // both, so there's no point writing it even
710                         // on a different volume.)
711                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
712                         return 0, err
713                 } else if os.IsNotExist(err) {
714                         // Block does not exist. This is the only
715                         // "normal" error: we don't log anything.
716                         continue
717                 } else if err != nil {
718                         // Couldn't open file, data is corrupt on
719                         // disk, etc.: log this abnormal condition,
720                         // and try the next volume.
721                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
722                         continue
723                 }
724                 if err := vol.Touch(hash); err != nil {
725                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
726                         bestErr = err
727                         continue
728                 }
729                 // Compare and Touch both worked --> done.
730                 return vol.Replication(), nil
731         }
732         return 0, bestErr
733 }
734
735 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
736
737 // IsValidLocator returns true if the specified string is a valid Keep locator.
738 //   When Keep is extended to support hash types other than MD5,
739 //   this should be updated to cover those as well.
740 //
741 func IsValidLocator(loc string) bool {
742         return validLocatorRe.MatchString(loc)
743 }
744
745 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
746
747 // GetAPIToken returns the OAuth2 token from the Authorization
748 // header of a HTTP request, or an empty string if no matching
749 // token is found.
750 func GetAPIToken(req *http.Request) string {
751         if auth, ok := req.Header["Authorization"]; ok {
752                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
753                         return match[1]
754                 }
755         }
756         return ""
757 }
758
759 // IsExpired returns true if the given Unix timestamp (expressed as a
760 // hexadecimal string) is in the past, or if timestampHex cannot be
761 // parsed as a hexadecimal string.
762 func IsExpired(timestampHex string) bool {
763         ts, err := strconv.ParseInt(timestampHex, 16, 0)
764         if err != nil {
765                 log.Printf("IsExpired: %s", err)
766                 return true
767         }
768         return time.Unix(ts, 0).Before(time.Now())
769 }
770
771 // CanDelete returns true if the user identified by apiToken is
772 // allowed to delete blocks.
773 func CanDelete(apiToken string) bool {
774         if apiToken == "" {
775                 return false
776         }
777         // Blocks may be deleted only when Keep has been configured with a
778         // data manager.
779         if IsSystemAuth(apiToken) {
780                 return true
781         }
782         // TODO(twp): look up apiToken with the API server
783         // return true if is_admin is true and if the token
784         // has unlimited scope
785         return false
786 }
787
788 // IsSystemAuth returns true if the given token is allowed to perform
789 // system level actions like deleting data.
790 func IsSystemAuth(token string) bool {
791         return token != "" && token == theConfig.systemAuthToken
792 }