10484: Serve MemStats at /debug.json instead of /status.json.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "sync"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // Internals/debugging info (runtime.MemStats)
50         rest.HandleFunc(`/debug.json`, DebugHandler).Methods("GET", "HEAD")
51
52         // List volumes: path, device number, bytes used/avail.
53         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
54
55         // Replace the current pull queue.
56         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
57
58         // Replace the current trash queue.
59         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
60
61         // Untrash moves blocks from trash back into store
62         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
63
64         // Any request which does not match any of these routes gets
65         // 400 Bad Request.
66         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
67
68         return rest
69 }
70
71 // BadRequestHandler is a HandleFunc to address bad requests.
72 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
73         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
74 }
75
76 // GetBlockHandler is a HandleFunc to address Get block requests.
77 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
78         ctx, cancel := contextForResponse(context.TODO(), resp)
79         defer cancel()
80
81         if theConfig.RequireSignatures {
82                 locator := req.URL.Path[1:] // strip leading slash
83                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
84                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
85                         return
86                 }
87         }
88
89         // TODO: Probe volumes to check whether the block _might_
90         // exist. Some volumes/types could support a quick existence
91         // check without causing other operations to suffer. If all
92         // volumes support that, and assure us the block definitely
93         // isn't here, we can return 404 now instead of waiting for a
94         // buffer.
95
96         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
97         if err != nil {
98                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
99                 return
100         }
101         defer bufs.Put(buf)
102
103         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
104         if err != nil {
105                 code := http.StatusInternalServerError
106                 if err, ok := err.(*KeepError); ok {
107                         code = err.HTTPCode
108                 }
109                 http.Error(resp, err.Error(), code)
110                 return
111         }
112
113         resp.Header().Set("Content-Length", strconv.Itoa(size))
114         resp.Header().Set("Content-Type", "application/octet-stream")
115         resp.Write(buf[:size])
116 }
117
118 // Return a new context that gets cancelled by resp's CloseNotifier.
119 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
120         ctx, cancel := context.WithCancel(parent)
121         if cn, ok := resp.(http.CloseNotifier); ok {
122                 go func(c <-chan bool) {
123                         select {
124                         case <-c:
125                                 theConfig.debugLogf("cancel context")
126                                 cancel()
127                         case <-ctx.Done():
128                         }
129                 }(cn.CloseNotify())
130         }
131         return ctx, cancel
132 }
133
134 // Get a buffer from the pool -- but give up and return a non-nil
135 // error if ctx ends before we get a buffer.
136 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
137         bufReady := make(chan []byte)
138         go func() {
139                 bufReady <- bufs.Get(bufSize)
140         }()
141         select {
142         case buf := <-bufReady:
143                 return buf, nil
144         case <-ctx.Done():
145                 go func() {
146                         // Even if closeNotifier happened first, we
147                         // need to keep waiting for our buf so we can
148                         // return it to the pool.
149                         bufs.Put(<-bufReady)
150                 }()
151                 return nil, ErrClientDisconnect
152         }
153 }
154
155 // PutBlockHandler is a HandleFunc to address Put block requests.
156 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
157         ctx, cancel := contextForResponse(context.TODO(), resp)
158         defer cancel()
159
160         hash := mux.Vars(req)["hash"]
161
162         // Detect as many error conditions as possible before reading
163         // the body: avoid transmitting data that will not end up
164         // being written anyway.
165
166         if req.ContentLength == -1 {
167                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
168                 return
169         }
170
171         if req.ContentLength > BlockSize {
172                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
173                 return
174         }
175
176         if len(KeepVM.AllWritable()) == 0 {
177                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
178                 return
179         }
180
181         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
182         if err != nil {
183                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
184                 return
185         }
186
187         _, err = io.ReadFull(req.Body, buf)
188         if err != nil {
189                 http.Error(resp, err.Error(), 500)
190                 bufs.Put(buf)
191                 return
192         }
193
194         replication, err := PutBlock(ctx, buf, hash)
195         bufs.Put(buf)
196
197         if err != nil {
198                 code := http.StatusInternalServerError
199                 if err, ok := err.(*KeepError); ok {
200                         code = err.HTTPCode
201                 }
202                 http.Error(resp, err.Error(), code)
203                 return
204         }
205
206         // Success; add a size hint, sign the locator if possible, and
207         // return it to the client.
208         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
209         apiToken := GetAPIToken(req)
210         if theConfig.blobSigningKey != nil && apiToken != "" {
211                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
212                 returnHash = SignLocator(returnHash, apiToken, expiry)
213         }
214         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
215         resp.Write([]byte(returnHash + "\n"))
216 }
217
218 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
219 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
220         // Reject unauthorized requests.
221         if !IsSystemAuth(GetAPIToken(req)) {
222                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
223                 return
224         }
225
226         prefix := mux.Vars(req)["prefix"]
227
228         for _, vol := range KeepVM.AllReadable() {
229                 if err := vol.IndexTo(prefix, resp); err != nil {
230                         // The only errors returned by IndexTo are
231                         // write errors returned by resp.Write(),
232                         // which probably means the client has
233                         // disconnected and this error will never be
234                         // reported to the client -- but it will
235                         // appear in our own error log.
236                         http.Error(resp, err.Error(), http.StatusInternalServerError)
237                         return
238                 }
239         }
240         // An empty line at EOF is the only way the client can be
241         // assured the entire index was received.
242         resp.Write([]byte{'\n'})
243 }
244
245 // PoolStatus struct
246 type PoolStatus struct {
247         Alloc uint64 `json:"BytesAllocated"`
248         Cap   int    `json:"BuffersMax"`
249         Len   int    `json:"BuffersInUse"`
250 }
251
252 // NodeStatus struct
253 type NodeStatus struct {
254         Volumes    []*VolumeStatus `json:"volumes"`
255         BufferPool PoolStatus
256         PullQueue  WorkQueueStatus
257         TrashQueue WorkQueueStatus
258 }
259
260 var st NodeStatus
261 var stLock sync.Mutex
262
263 // DebugHandler addresses /debug.json requests.
264 func DebugHandler(resp http.ResponseWriter, req *http.Request) {
265         type debugStats struct {
266                 MemStats runtime.MemStats
267         }
268         var ds debugStats
269         runtime.ReadMemStats(&ds.MemStats)
270         err := json.NewEncoder(resp).Encode(&ds)
271         if err != nil {
272                 http.Error(resp, err.Error(), 500)
273         }
274 }
275
276 // StatusHandler addresses /status.json requests.
277 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
278         stLock.Lock()
279         readNodeStatus(&st)
280         jstat, err := json.Marshal(&st)
281         stLock.Unlock()
282         if err == nil {
283                 resp.Write(jstat)
284         } else {
285                 log.Printf("json.Marshal: %s", err)
286                 log.Printf("NodeStatus = %v", &st)
287                 http.Error(resp, err.Error(), 500)
288         }
289 }
290
291 // populate the given NodeStatus struct with current values.
292 func readNodeStatus(st *NodeStatus) {
293         vols := KeepVM.AllReadable()
294         if cap(st.Volumes) < len(vols) {
295                 st.Volumes = make([]*VolumeStatus, len(vols))
296         }
297         st.Volumes = st.Volumes[:0]
298         for _, vol := range vols {
299                 if s := vol.Status(); s != nil {
300                         st.Volumes = append(st.Volumes, s)
301                 }
302         }
303         st.BufferPool.Alloc = bufs.Alloc()
304         st.BufferPool.Cap = bufs.Cap()
305         st.BufferPool.Len = bufs.Len()
306         st.PullQueue = getWorkQueueStatus(pullq)
307         st.TrashQueue = getWorkQueueStatus(trashq)
308 }
309
310 // return a WorkQueueStatus for the given queue. If q is nil (which
311 // should never happen except in test suites), return a zero status
312 // value instead of crashing.
313 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
314         if q == nil {
315                 // This should only happen during tests.
316                 return WorkQueueStatus{}
317         }
318         return q.Status()
319 }
320
321 // DeleteHandler processes DELETE requests.
322 //
323 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
324 // from all connected volumes.
325 //
326 // Only the Data Manager, or an Arvados admin with scope "all", are
327 // allowed to issue DELETE requests.  If a DELETE request is not
328 // authenticated or is issued by a non-admin user, the server returns
329 // a PermissionError.
330 //
331 // Upon receiving a valid request from an authorized user,
332 // DeleteHandler deletes all copies of the specified block on local
333 // writable volumes.
334 //
335 // Response format:
336 //
337 // If the requested blocks was not found on any volume, the response
338 // code is HTTP 404 Not Found.
339 //
340 // Otherwise, the response code is 200 OK, with a response body
341 // consisting of the JSON message
342 //
343 //    {"copies_deleted":d,"copies_failed":f}
344 //
345 // where d and f are integers representing the number of blocks that
346 // were successfully and unsuccessfully deleted.
347 //
348 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
349         hash := mux.Vars(req)["hash"]
350
351         // Confirm that this user is an admin and has a token with unlimited scope.
352         var tok = GetAPIToken(req)
353         if tok == "" || !CanDelete(tok) {
354                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
355                 return
356         }
357
358         if !theConfig.EnableDelete {
359                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
360                 return
361         }
362
363         // Delete copies of this block from all available volumes.
364         // Report how many blocks were successfully deleted, and how
365         // many were found on writable volumes but not deleted.
366         var result struct {
367                 Deleted int `json:"copies_deleted"`
368                 Failed  int `json:"copies_failed"`
369         }
370         for _, vol := range KeepVM.AllWritable() {
371                 if err := vol.Trash(hash); err == nil {
372                         result.Deleted++
373                 } else if os.IsNotExist(err) {
374                         continue
375                 } else {
376                         result.Failed++
377                         log.Println("DeleteHandler:", err)
378                 }
379         }
380
381         var st int
382
383         if result.Deleted == 0 && result.Failed == 0 {
384                 st = http.StatusNotFound
385         } else {
386                 st = http.StatusOK
387         }
388
389         resp.WriteHeader(st)
390
391         if st == http.StatusOK {
392                 if body, err := json.Marshal(result); err == nil {
393                         resp.Write(body)
394                 } else {
395                         log.Printf("json.Marshal: %s (result = %v)", err, result)
396                         http.Error(resp, err.Error(), 500)
397                 }
398         }
399 }
400
401 /* PullHandler processes "PUT /pull" requests for the data manager.
402    The request body is a JSON message containing a list of pull
403    requests in the following format:
404
405    [
406       {
407          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
408          "servers":[
409                         "keep0.qr1hi.arvadosapi.com:25107",
410                         "keep1.qr1hi.arvadosapi.com:25108"
411                  ]
412           },
413           {
414                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
415                  "servers":[
416                         "10.0.1.5:25107",
417                         "10.0.1.6:25107",
418                         "10.0.1.7:25108"
419                  ]
420           },
421           ...
422    ]
423
424    Each pull request in the list consists of a block locator string
425    and an ordered list of servers.  Keepstore should try to fetch the
426    block from each server in turn.
427
428    If the request has not been sent by the Data Manager, return 401
429    Unauthorized.
430
431    If the JSON unmarshalling fails, return 400 Bad Request.
432 */
433
434 // PullRequest consists of a block locator and an ordered list of servers
435 type PullRequest struct {
436         Locator string   `json:"locator"`
437         Servers []string `json:"servers"`
438 }
439
440 // PullHandler processes "PUT /pull" requests for the data manager.
441 func PullHandler(resp http.ResponseWriter, req *http.Request) {
442         // Reject unauthorized requests.
443         if !IsSystemAuth(GetAPIToken(req)) {
444                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
445                 return
446         }
447
448         // Parse the request body.
449         var pr []PullRequest
450         r := json.NewDecoder(req.Body)
451         if err := r.Decode(&pr); err != nil {
452                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
453                 return
454         }
455
456         // We have a properly formatted pull list sent from the data
457         // manager.  Report success and send the list to the pull list
458         // manager for further handling.
459         resp.WriteHeader(http.StatusOK)
460         resp.Write([]byte(
461                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
462
463         plist := list.New()
464         for _, p := range pr {
465                 plist.PushBack(p)
466         }
467         pullq.ReplaceQueue(plist)
468 }
469
470 // TrashRequest consists of a block locator and it's Mtime
471 type TrashRequest struct {
472         Locator    string `json:"locator"`
473         BlockMtime int64  `json:"block_mtime"`
474 }
475
476 // TrashHandler processes /trash requests.
477 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
478         // Reject unauthorized requests.
479         if !IsSystemAuth(GetAPIToken(req)) {
480                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
481                 return
482         }
483
484         // Parse the request body.
485         var trash []TrashRequest
486         r := json.NewDecoder(req.Body)
487         if err := r.Decode(&trash); err != nil {
488                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
489                 return
490         }
491
492         // We have a properly formatted trash list sent from the data
493         // manager.  Report success and send the list to the trash work
494         // queue for further handling.
495         resp.WriteHeader(http.StatusOK)
496         resp.Write([]byte(
497                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
498
499         tlist := list.New()
500         for _, t := range trash {
501                 tlist.PushBack(t)
502         }
503         trashq.ReplaceQueue(tlist)
504 }
505
506 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
507 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
508         // Reject unauthorized requests.
509         if !IsSystemAuth(GetAPIToken(req)) {
510                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
511                 return
512         }
513
514         hash := mux.Vars(req)["hash"]
515
516         if len(KeepVM.AllWritable()) == 0 {
517                 http.Error(resp, "No writable volumes", http.StatusNotFound)
518                 return
519         }
520
521         var untrashedOn, failedOn []string
522         var numNotFound int
523         for _, vol := range KeepVM.AllWritable() {
524                 err := vol.Untrash(hash)
525
526                 if os.IsNotExist(err) {
527                         numNotFound++
528                 } else if err != nil {
529                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
530                         failedOn = append(failedOn, vol.String())
531                 } else {
532                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
533                         untrashedOn = append(untrashedOn, vol.String())
534                 }
535         }
536
537         if numNotFound == len(KeepVM.AllWritable()) {
538                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
539                 return
540         }
541
542         if len(failedOn) == len(KeepVM.AllWritable()) {
543                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
544         } else {
545                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
546                 if len(failedOn) > 0 {
547                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
548                 }
549                 resp.Write([]byte(respBody))
550         }
551 }
552
553 // GetBlock and PutBlock implement lower-level code for handling
554 // blocks by rooting through volumes connected to the local machine.
555 // Once the handler has determined that system policy permits the
556 // request, it calls these methods to perform the actual operation.
557 //
558 // TODO(twp): this code would probably be better located in the
559 // VolumeManager interface. As an abstraction, the VolumeManager
560 // should be the only part of the code that cares about which volume a
561 // block is stored on, so it should be responsible for figuring out
562 // which volume to check for fetching blocks, storing blocks, etc.
563
564 // GetBlock fetches the block identified by "hash" into the provided
565 // buf, and returns the data size.
566 //
567 // If the block cannot be found on any volume, returns NotFoundError.
568 //
569 // If the block found does not have the correct MD5 hash, returns
570 // DiskHashError.
571 //
572 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
573         // Attempt to read the requested hash from a keep volume.
574         errorToCaller := NotFoundError
575
576         for _, vol := range KeepVM.AllReadable() {
577                 size, err := vol.Get(ctx, hash, buf)
578                 select {
579                 case <-ctx.Done():
580                         return 0, ErrClientDisconnect
581                 default:
582                 }
583                 if err != nil {
584                         // IsNotExist is an expected error and may be
585                         // ignored. All other errors are logged. In
586                         // any case we continue trying to read other
587                         // volumes. If all volumes report IsNotExist,
588                         // we return a NotFoundError.
589                         if !os.IsNotExist(err) {
590                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
591                         }
592                         continue
593                 }
594                 // Check the file checksum.
595                 //
596                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
597                 if filehash != hash {
598                         // TODO: Try harder to tell a sysadmin about
599                         // this.
600                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
601                                 vol, hash, filehash)
602                         errorToCaller = DiskHashError
603                         continue
604                 }
605                 if errorToCaller == DiskHashError {
606                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
607                                 vol, hash)
608                 }
609                 return size, nil
610         }
611         return 0, errorToCaller
612 }
613
614 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
615 //
616 // PutBlock(ctx, block, hash)
617 //   Stores the BLOCK (identified by the content id HASH) in Keep.
618 //
619 //   The MD5 checksum of the block must be identical to the content id HASH.
620 //   If not, an error is returned.
621 //
622 //   PutBlock stores the BLOCK on the first Keep volume with free space.
623 //   A failure code is returned to the user only if all volumes fail.
624 //
625 //   On success, PutBlock returns nil.
626 //   On failure, it returns a KeepError with one of the following codes:
627 //
628 //   500 Collision
629 //          A different block with the same hash already exists on this
630 //          Keep server.
631 //   422 MD5Fail
632 //          The MD5 hash of the BLOCK does not match the argument HASH.
633 //   503 Full
634 //          There was not enough space left in any Keep volume to store
635 //          the object.
636 //   500 Fail
637 //          The object could not be stored for some other reason (e.g.
638 //          all writes failed). The text of the error message should
639 //          provide as much detail as possible.
640 //
641 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
642         // Check that BLOCK's checksum matches HASH.
643         blockhash := fmt.Sprintf("%x", md5.Sum(block))
644         if blockhash != hash {
645                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
646                 return 0, RequestHashError
647         }
648
649         // If we already have this data, it's intact on disk, and we
650         // can update its timestamp, return success. If we have
651         // different data with the same hash, return failure.
652         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
653                 return n, err
654         } else if ctx.Err() != nil {
655                 return 0, ErrClientDisconnect
656         }
657
658         // Choose a Keep volume to write to.
659         // If this volume fails, try all of the volumes in order.
660         if vol := KeepVM.NextWritable(); vol != nil {
661                 if err := vol.Put(ctx, hash, block); err == nil {
662                         return vol.Replication(), nil // success!
663                 }
664                 if ctx.Err() != nil {
665                         return 0, ErrClientDisconnect
666                 }
667         }
668
669         writables := KeepVM.AllWritable()
670         if len(writables) == 0 {
671                 log.Print("No writable volumes.")
672                 return 0, FullError
673         }
674
675         allFull := true
676         for _, vol := range writables {
677                 err := vol.Put(ctx, hash, block)
678                 if ctx.Err() != nil {
679                         return 0, ErrClientDisconnect
680                 }
681                 if err == nil {
682                         return vol.Replication(), nil // success!
683                 }
684                 if err != FullError {
685                         // The volume is not full but the
686                         // write did not succeed.  Report the
687                         // error and continue trying.
688                         allFull = false
689                         log.Printf("%s: Write(%s): %s", vol, hash, err)
690                 }
691         }
692
693         if allFull {
694                 log.Print("All volumes are full.")
695                 return 0, FullError
696         }
697         // Already logged the non-full errors.
698         return 0, GenericError
699 }
700
701 // CompareAndTouch returns the current replication level if one of the
702 // volumes already has the given content and it successfully updates
703 // the relevant block's modification time in order to protect it from
704 // premature garbage collection. Otherwise, it returns a non-nil
705 // error.
706 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
707         var bestErr error = NotFoundError
708         for _, vol := range KeepVM.AllWritable() {
709                 err := vol.Compare(ctx, hash, buf)
710                 if ctx.Err() != nil {
711                         return 0, ctx.Err()
712                 } else if err == CollisionError {
713                         // Stop if we have a block with same hash but
714                         // different content. (It will be impossible
715                         // to tell which one is wanted if we have
716                         // both, so there's no point writing it even
717                         // on a different volume.)
718                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
719                         return 0, err
720                 } else if os.IsNotExist(err) {
721                         // Block does not exist. This is the only
722                         // "normal" error: we don't log anything.
723                         continue
724                 } else if err != nil {
725                         // Couldn't open file, data is corrupt on
726                         // disk, etc.: log this abnormal condition,
727                         // and try the next volume.
728                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
729                         continue
730                 }
731                 if err := vol.Touch(hash); err != nil {
732                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
733                         bestErr = err
734                         continue
735                 }
736                 // Compare and Touch both worked --> done.
737                 return vol.Replication(), nil
738         }
739         return 0, bestErr
740 }
741
742 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
743
744 // IsValidLocator returns true if the specified string is a valid Keep locator.
745 //   When Keep is extended to support hash types other than MD5,
746 //   this should be updated to cover those as well.
747 //
748 func IsValidLocator(loc string) bool {
749         return validLocatorRe.MatchString(loc)
750 }
751
752 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
753
754 // GetAPIToken returns the OAuth2 token from the Authorization
755 // header of a HTTP request, or an empty string if no matching
756 // token is found.
757 func GetAPIToken(req *http.Request) string {
758         if auth, ok := req.Header["Authorization"]; ok {
759                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
760                         return match[1]
761                 }
762         }
763         return ""
764 }
765
766 // IsExpired returns true if the given Unix timestamp (expressed as a
767 // hexadecimal string) is in the past, or if timestampHex cannot be
768 // parsed as a hexadecimal string.
769 func IsExpired(timestampHex string) bool {
770         ts, err := strconv.ParseInt(timestampHex, 16, 0)
771         if err != nil {
772                 log.Printf("IsExpired: %s", err)
773                 return true
774         }
775         return time.Unix(ts, 0).Before(time.Now())
776 }
777
778 // CanDelete returns true if the user identified by apiToken is
779 // allowed to delete blocks.
780 func CanDelete(apiToken string) bool {
781         if apiToken == "" {
782                 return false
783         }
784         // Blocks may be deleted only when Keep has been configured with a
785         // data manager.
786         if IsSystemAuth(apiToken) {
787                 return true
788         }
789         // TODO(twp): look up apiToken with the API server
790         // return true if is_admin is true and if the token
791         // has unlimited scope
792         return false
793 }
794
795 // IsSystemAuth returns true if the given token is allowed to perform
796 // system level actions like deleting data.
797 func IsSystemAuth(token string) bool {
798         return token != "" && token == theConfig.systemAuthToken
799 }