11644: Add /mounts endpoint using random mount UUIDs assigned at runtime.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "io"
17         "net/http"
18         "os"
19         "regexp"
20         "runtime"
21         "strconv"
22         "strings"
23         "sync"
24         "time"
25
26         "github.com/gorilla/mux"
27
28         "git.curoverse.com/arvados.git/sdk/go/httpserver"
29         log "github.com/Sirupsen/logrus"
30 )
31
32 type router struct {
33         *mux.Router
34         limiter httpserver.RequestCounter
35 }
36
37 // MakeRESTRouter returns a new router that forwards all Keep requests
38 // to the appropriate handlers.
39 func MakeRESTRouter() *router {
40         rest := mux.NewRouter()
41         rtr := &router{Router: rest}
42
43         rest.HandleFunc(
44                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
45         rest.HandleFunc(
46                 `/{hash:[0-9a-f]{32}}+{hints}`,
47                 GetBlockHandler).Methods("GET", "HEAD")
48
49         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
50         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
51         // List all blocks stored here. Privileged client only.
52         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
53         // List blocks stored here whose hash has the given prefix.
54         // Privileged client only.
55         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
56
57         // Internals/debugging info (runtime.MemStats)
58         rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
59
60         // List volumes: path, device number, bytes used/avail.
61         rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
62
63         // List mounts: UUID, readonly, tier, device ID, ...
64         rest.HandleFunc(`/mounts`, rtr.Mounts).Methods("GET")
65
66         // Replace the current pull queue.
67         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
68
69         // Replace the current trash queue.
70         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
71
72         // Untrash moves blocks from trash back into store
73         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
74
75         // Any request which does not match any of these routes gets
76         // 400 Bad Request.
77         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
78
79         return rtr
80 }
81
82 // BadRequestHandler is a HandleFunc to address bad requests.
83 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
84         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
85 }
86
87 // GetBlockHandler is a HandleFunc to address Get block requests.
88 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
89         ctx, cancel := contextForResponse(context.TODO(), resp)
90         defer cancel()
91
92         if theConfig.RequireSignatures {
93                 locator := req.URL.Path[1:] // strip leading slash
94                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
95                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
96                         return
97                 }
98         }
99
100         // TODO: Probe volumes to check whether the block _might_
101         // exist. Some volumes/types could support a quick existence
102         // check without causing other operations to suffer. If all
103         // volumes support that, and assure us the block definitely
104         // isn't here, we can return 404 now instead of waiting for a
105         // buffer.
106
107         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
108         if err != nil {
109                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
110                 return
111         }
112         defer bufs.Put(buf)
113
114         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
115         if err != nil {
116                 code := http.StatusInternalServerError
117                 if err, ok := err.(*KeepError); ok {
118                         code = err.HTTPCode
119                 }
120                 http.Error(resp, err.Error(), code)
121                 return
122         }
123
124         resp.Header().Set("Content-Length", strconv.Itoa(size))
125         resp.Header().Set("Content-Type", "application/octet-stream")
126         resp.Write(buf[:size])
127 }
128
129 // Return a new context that gets cancelled by resp's CloseNotifier.
130 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
131         ctx, cancel := context.WithCancel(parent)
132         if cn, ok := resp.(http.CloseNotifier); ok {
133                 go func(c <-chan bool) {
134                         select {
135                         case <-c:
136                                 theConfig.debugLogf("cancel context")
137                                 cancel()
138                         case <-ctx.Done():
139                         }
140                 }(cn.CloseNotify())
141         }
142         return ctx, cancel
143 }
144
145 // Get a buffer from the pool -- but give up and return a non-nil
146 // error if ctx ends before we get a buffer.
147 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
148         bufReady := make(chan []byte)
149         go func() {
150                 bufReady <- bufs.Get(bufSize)
151         }()
152         select {
153         case buf := <-bufReady:
154                 return buf, nil
155         case <-ctx.Done():
156                 go func() {
157                         // Even if closeNotifier happened first, we
158                         // need to keep waiting for our buf so we can
159                         // return it to the pool.
160                         bufs.Put(<-bufReady)
161                 }()
162                 return nil, ErrClientDisconnect
163         }
164 }
165
166 // PutBlockHandler is a HandleFunc to address Put block requests.
167 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
168         ctx, cancel := contextForResponse(context.TODO(), resp)
169         defer cancel()
170
171         hash := mux.Vars(req)["hash"]
172
173         // Detect as many error conditions as possible before reading
174         // the body: avoid transmitting data that will not end up
175         // being written anyway.
176
177         if req.ContentLength == -1 {
178                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
179                 return
180         }
181
182         if req.ContentLength > BlockSize {
183                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
184                 return
185         }
186
187         if len(KeepVM.AllWritable()) == 0 {
188                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
189                 return
190         }
191
192         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
193         if err != nil {
194                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
195                 return
196         }
197
198         _, err = io.ReadFull(req.Body, buf)
199         if err != nil {
200                 http.Error(resp, err.Error(), 500)
201                 bufs.Put(buf)
202                 return
203         }
204
205         replication, err := PutBlock(ctx, buf, hash)
206         bufs.Put(buf)
207
208         if err != nil {
209                 code := http.StatusInternalServerError
210                 if err, ok := err.(*KeepError); ok {
211                         code = err.HTTPCode
212                 }
213                 http.Error(resp, err.Error(), code)
214                 return
215         }
216
217         // Success; add a size hint, sign the locator if possible, and
218         // return it to the client.
219         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
220         apiToken := GetAPIToken(req)
221         if theConfig.blobSigningKey != nil && apiToken != "" {
222                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
223                 returnHash = SignLocator(returnHash, apiToken, expiry)
224         }
225         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
226         resp.Write([]byte(returnHash + "\n"))
227 }
228
229 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
230 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
231         // Reject unauthorized requests.
232         if !IsSystemAuth(GetAPIToken(req)) {
233                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
234                 return
235         }
236
237         prefix := mux.Vars(req)["prefix"]
238
239         for _, vol := range KeepVM.AllReadable() {
240                 if err := vol.IndexTo(prefix, resp); err != nil {
241                         // The only errors returned by IndexTo are
242                         // write errors returned by resp.Write(),
243                         // which probably means the client has
244                         // disconnected and this error will never be
245                         // reported to the client -- but it will
246                         // appear in our own error log.
247                         http.Error(resp, err.Error(), http.StatusInternalServerError)
248                         return
249                 }
250         }
251         // An empty line at EOF is the only way the client can be
252         // assured the entire index was received.
253         resp.Write([]byte{'\n'})
254 }
255
256 // Mounts responds to "GET /mounts" requests.
257 func (rtr *router) Mounts(resp http.ResponseWriter, req *http.Request) {
258         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
259         if err != nil {
260                 http.Error(resp, err.Error(), http.StatusInternalServerError)
261         }
262 }
263
264 // PoolStatus struct
265 type PoolStatus struct {
266         Alloc uint64 `json:"BytesAllocated"`
267         Cap   int    `json:"BuffersMax"`
268         Len   int    `json:"BuffersInUse"`
269 }
270
271 type volumeStatusEnt struct {
272         Label         string
273         Status        *VolumeStatus `json:",omitempty"`
274         VolumeStats   *ioStats      `json:",omitempty"`
275         InternalStats interface{}   `json:",omitempty"`
276 }
277
278 // NodeStatus struct
279 type NodeStatus struct {
280         Volumes         []*volumeStatusEnt
281         BufferPool      PoolStatus
282         PullQueue       WorkQueueStatus
283         TrashQueue      WorkQueueStatus
284         RequestsCurrent int
285         RequestsMax     int
286 }
287
288 var st NodeStatus
289 var stLock sync.Mutex
290
291 // DebugHandler addresses /debug.json requests.
292 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
293         type debugStats struct {
294                 MemStats runtime.MemStats
295         }
296         var ds debugStats
297         runtime.ReadMemStats(&ds.MemStats)
298         err := json.NewEncoder(resp).Encode(&ds)
299         if err != nil {
300                 http.Error(resp, err.Error(), 500)
301         }
302 }
303
304 // StatusHandler addresses /status.json requests.
305 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
306         stLock.Lock()
307         rtr.readNodeStatus(&st)
308         jstat, err := json.Marshal(&st)
309         stLock.Unlock()
310         if err == nil {
311                 resp.Write(jstat)
312         } else {
313                 log.Printf("json.Marshal: %s", err)
314                 log.Printf("NodeStatus = %v", &st)
315                 http.Error(resp, err.Error(), 500)
316         }
317 }
318
319 // populate the given NodeStatus struct with current values.
320 func (rtr *router) readNodeStatus(st *NodeStatus) {
321         vols := KeepVM.AllReadable()
322         if cap(st.Volumes) < len(vols) {
323                 st.Volumes = make([]*volumeStatusEnt, len(vols))
324         }
325         st.Volumes = st.Volumes[:0]
326         for _, vol := range vols {
327                 var internalStats interface{}
328                 if vol, ok := vol.(InternalStatser); ok {
329                         internalStats = vol.InternalStats()
330                 }
331                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
332                         Label:         vol.String(),
333                         Status:        vol.Status(),
334                         InternalStats: internalStats,
335                         //VolumeStats: KeepVM.VolumeStats(vol),
336                 })
337         }
338         st.BufferPool.Alloc = bufs.Alloc()
339         st.BufferPool.Cap = bufs.Cap()
340         st.BufferPool.Len = bufs.Len()
341         st.PullQueue = getWorkQueueStatus(pullq)
342         st.TrashQueue = getWorkQueueStatus(trashq)
343         if rtr.limiter != nil {
344                 st.RequestsCurrent = rtr.limiter.Current()
345                 st.RequestsMax = rtr.limiter.Max()
346         }
347 }
348
349 // return a WorkQueueStatus for the given queue. If q is nil (which
350 // should never happen except in test suites), return a zero status
351 // value instead of crashing.
352 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
353         if q == nil {
354                 // This should only happen during tests.
355                 return WorkQueueStatus{}
356         }
357         return q.Status()
358 }
359
360 // DeleteHandler processes DELETE requests.
361 //
362 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
363 // from all connected volumes.
364 //
365 // Only the Data Manager, or an Arvados admin with scope "all", are
366 // allowed to issue DELETE requests.  If a DELETE request is not
367 // authenticated or is issued by a non-admin user, the server returns
368 // a PermissionError.
369 //
370 // Upon receiving a valid request from an authorized user,
371 // DeleteHandler deletes all copies of the specified block on local
372 // writable volumes.
373 //
374 // Response format:
375 //
376 // If the requested blocks was not found on any volume, the response
377 // code is HTTP 404 Not Found.
378 //
379 // Otherwise, the response code is 200 OK, with a response body
380 // consisting of the JSON message
381 //
382 //    {"copies_deleted":d,"copies_failed":f}
383 //
384 // where d and f are integers representing the number of blocks that
385 // were successfully and unsuccessfully deleted.
386 //
387 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
388         hash := mux.Vars(req)["hash"]
389
390         // Confirm that this user is an admin and has a token with unlimited scope.
391         var tok = GetAPIToken(req)
392         if tok == "" || !CanDelete(tok) {
393                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
394                 return
395         }
396
397         if !theConfig.EnableDelete {
398                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
399                 return
400         }
401
402         // Delete copies of this block from all available volumes.
403         // Report how many blocks were successfully deleted, and how
404         // many were found on writable volumes but not deleted.
405         var result struct {
406                 Deleted int `json:"copies_deleted"`
407                 Failed  int `json:"copies_failed"`
408         }
409         for _, vol := range KeepVM.AllWritable() {
410                 if err := vol.Trash(hash); err == nil {
411                         result.Deleted++
412                 } else if os.IsNotExist(err) {
413                         continue
414                 } else {
415                         result.Failed++
416                         log.Println("DeleteHandler:", err)
417                 }
418         }
419
420         var st int
421
422         if result.Deleted == 0 && result.Failed == 0 {
423                 st = http.StatusNotFound
424         } else {
425                 st = http.StatusOK
426         }
427
428         resp.WriteHeader(st)
429
430         if st == http.StatusOK {
431                 if body, err := json.Marshal(result); err == nil {
432                         resp.Write(body)
433                 } else {
434                         log.Printf("json.Marshal: %s (result = %v)", err, result)
435                         http.Error(resp, err.Error(), 500)
436                 }
437         }
438 }
439
440 /* PullHandler processes "PUT /pull" requests for the data manager.
441    The request body is a JSON message containing a list of pull
442    requests in the following format:
443
444    [
445       {
446          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
447          "servers":[
448                         "keep0.qr1hi.arvadosapi.com:25107",
449                         "keep1.qr1hi.arvadosapi.com:25108"
450                  ]
451           },
452           {
453                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
454                  "servers":[
455                         "10.0.1.5:25107",
456                         "10.0.1.6:25107",
457                         "10.0.1.7:25108"
458                  ]
459           },
460           ...
461    ]
462
463    Each pull request in the list consists of a block locator string
464    and an ordered list of servers.  Keepstore should try to fetch the
465    block from each server in turn.
466
467    If the request has not been sent by the Data Manager, return 401
468    Unauthorized.
469
470    If the JSON unmarshalling fails, return 400 Bad Request.
471 */
472
473 // PullRequest consists of a block locator and an ordered list of servers
474 type PullRequest struct {
475         Locator string   `json:"locator"`
476         Servers []string `json:"servers"`
477 }
478
479 // PullHandler processes "PUT /pull" requests for the data manager.
480 func PullHandler(resp http.ResponseWriter, req *http.Request) {
481         // Reject unauthorized requests.
482         if !IsSystemAuth(GetAPIToken(req)) {
483                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
484                 return
485         }
486
487         // Parse the request body.
488         var pr []PullRequest
489         r := json.NewDecoder(req.Body)
490         if err := r.Decode(&pr); err != nil {
491                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
492                 return
493         }
494
495         // We have a properly formatted pull list sent from the data
496         // manager.  Report success and send the list to the pull list
497         // manager for further handling.
498         resp.WriteHeader(http.StatusOK)
499         resp.Write([]byte(
500                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
501
502         plist := list.New()
503         for _, p := range pr {
504                 plist.PushBack(p)
505         }
506         pullq.ReplaceQueue(plist)
507 }
508
509 // TrashRequest consists of a block locator and it's Mtime
510 type TrashRequest struct {
511         Locator    string `json:"locator"`
512         BlockMtime int64  `json:"block_mtime"`
513 }
514
515 // TrashHandler processes /trash requests.
516 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
517         // Reject unauthorized requests.
518         if !IsSystemAuth(GetAPIToken(req)) {
519                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
520                 return
521         }
522
523         // Parse the request body.
524         var trash []TrashRequest
525         r := json.NewDecoder(req.Body)
526         if err := r.Decode(&trash); err != nil {
527                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
528                 return
529         }
530
531         // We have a properly formatted trash list sent from the data
532         // manager.  Report success and send the list to the trash work
533         // queue for further handling.
534         resp.WriteHeader(http.StatusOK)
535         resp.Write([]byte(
536                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
537
538         tlist := list.New()
539         for _, t := range trash {
540                 tlist.PushBack(t)
541         }
542         trashq.ReplaceQueue(tlist)
543 }
544
545 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
546 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
547         // Reject unauthorized requests.
548         if !IsSystemAuth(GetAPIToken(req)) {
549                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
550                 return
551         }
552
553         hash := mux.Vars(req)["hash"]
554
555         if len(KeepVM.AllWritable()) == 0 {
556                 http.Error(resp, "No writable volumes", http.StatusNotFound)
557                 return
558         }
559
560         var untrashedOn, failedOn []string
561         var numNotFound int
562         for _, vol := range KeepVM.AllWritable() {
563                 err := vol.Untrash(hash)
564
565                 if os.IsNotExist(err) {
566                         numNotFound++
567                 } else if err != nil {
568                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
569                         failedOn = append(failedOn, vol.String())
570                 } else {
571                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
572                         untrashedOn = append(untrashedOn, vol.String())
573                 }
574         }
575
576         if numNotFound == len(KeepVM.AllWritable()) {
577                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
578                 return
579         }
580
581         if len(failedOn) == len(KeepVM.AllWritable()) {
582                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
583         } else {
584                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
585                 if len(failedOn) > 0 {
586                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
587                 }
588                 resp.Write([]byte(respBody))
589         }
590 }
591
592 // GetBlock and PutBlock implement lower-level code for handling
593 // blocks by rooting through volumes connected to the local machine.
594 // Once the handler has determined that system policy permits the
595 // request, it calls these methods to perform the actual operation.
596 //
597 // TODO(twp): this code would probably be better located in the
598 // VolumeManager interface. As an abstraction, the VolumeManager
599 // should be the only part of the code that cares about which volume a
600 // block is stored on, so it should be responsible for figuring out
601 // which volume to check for fetching blocks, storing blocks, etc.
602
603 // GetBlock fetches the block identified by "hash" into the provided
604 // buf, and returns the data size.
605 //
606 // If the block cannot be found on any volume, returns NotFoundError.
607 //
608 // If the block found does not have the correct MD5 hash, returns
609 // DiskHashError.
610 //
611 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
612         // Attempt to read the requested hash from a keep volume.
613         errorToCaller := NotFoundError
614
615         for _, vol := range KeepVM.AllReadable() {
616                 size, err := vol.Get(ctx, hash, buf)
617                 select {
618                 case <-ctx.Done():
619                         return 0, ErrClientDisconnect
620                 default:
621                 }
622                 if err != nil {
623                         // IsNotExist is an expected error and may be
624                         // ignored. All other errors are logged. In
625                         // any case we continue trying to read other
626                         // volumes. If all volumes report IsNotExist,
627                         // we return a NotFoundError.
628                         if !os.IsNotExist(err) {
629                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
630                         }
631                         continue
632                 }
633                 // Check the file checksum.
634                 //
635                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
636                 if filehash != hash {
637                         // TODO: Try harder to tell a sysadmin about
638                         // this.
639                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
640                                 vol, hash, filehash)
641                         errorToCaller = DiskHashError
642                         continue
643                 }
644                 if errorToCaller == DiskHashError {
645                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
646                                 vol, hash)
647                 }
648                 return size, nil
649         }
650         return 0, errorToCaller
651 }
652
653 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
654 //
655 // PutBlock(ctx, block, hash)
656 //   Stores the BLOCK (identified by the content id HASH) in Keep.
657 //
658 //   The MD5 checksum of the block must be identical to the content id HASH.
659 //   If not, an error is returned.
660 //
661 //   PutBlock stores the BLOCK on the first Keep volume with free space.
662 //   A failure code is returned to the user only if all volumes fail.
663 //
664 //   On success, PutBlock returns nil.
665 //   On failure, it returns a KeepError with one of the following codes:
666 //
667 //   500 Collision
668 //          A different block with the same hash already exists on this
669 //          Keep server.
670 //   422 MD5Fail
671 //          The MD5 hash of the BLOCK does not match the argument HASH.
672 //   503 Full
673 //          There was not enough space left in any Keep volume to store
674 //          the object.
675 //   500 Fail
676 //          The object could not be stored for some other reason (e.g.
677 //          all writes failed). The text of the error message should
678 //          provide as much detail as possible.
679 //
680 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
681         // Check that BLOCK's checksum matches HASH.
682         blockhash := fmt.Sprintf("%x", md5.Sum(block))
683         if blockhash != hash {
684                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
685                 return 0, RequestHashError
686         }
687
688         // If we already have this data, it's intact on disk, and we
689         // can update its timestamp, return success. If we have
690         // different data with the same hash, return failure.
691         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
692                 return n, err
693         } else if ctx.Err() != nil {
694                 return 0, ErrClientDisconnect
695         }
696
697         // Choose a Keep volume to write to.
698         // If this volume fails, try all of the volumes in order.
699         if vol := KeepVM.NextWritable(); vol != nil {
700                 if err := vol.Put(ctx, hash, block); err == nil {
701                         return vol.Replication(), nil // success!
702                 }
703                 if ctx.Err() != nil {
704                         return 0, ErrClientDisconnect
705                 }
706         }
707
708         writables := KeepVM.AllWritable()
709         if len(writables) == 0 {
710                 log.Print("No writable volumes.")
711                 return 0, FullError
712         }
713
714         allFull := true
715         for _, vol := range writables {
716                 err := vol.Put(ctx, hash, block)
717                 if ctx.Err() != nil {
718                         return 0, ErrClientDisconnect
719                 }
720                 if err == nil {
721                         return vol.Replication(), nil // success!
722                 }
723                 if err != FullError {
724                         // The volume is not full but the
725                         // write did not succeed.  Report the
726                         // error and continue trying.
727                         allFull = false
728                         log.Printf("%s: Write(%s): %s", vol, hash, err)
729                 }
730         }
731
732         if allFull {
733                 log.Print("All volumes are full.")
734                 return 0, FullError
735         }
736         // Already logged the non-full errors.
737         return 0, GenericError
738 }
739
740 // CompareAndTouch returns the current replication level if one of the
741 // volumes already has the given content and it successfully updates
742 // the relevant block's modification time in order to protect it from
743 // premature garbage collection. Otherwise, it returns a non-nil
744 // error.
745 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
746         var bestErr error = NotFoundError
747         for _, vol := range KeepVM.AllWritable() {
748                 err := vol.Compare(ctx, hash, buf)
749                 if ctx.Err() != nil {
750                         return 0, ctx.Err()
751                 } else if err == CollisionError {
752                         // Stop if we have a block with same hash but
753                         // different content. (It will be impossible
754                         // to tell which one is wanted if we have
755                         // both, so there's no point writing it even
756                         // on a different volume.)
757                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
758                         return 0, err
759                 } else if os.IsNotExist(err) {
760                         // Block does not exist. This is the only
761                         // "normal" error: we don't log anything.
762                         continue
763                 } else if err != nil {
764                         // Couldn't open file, data is corrupt on
765                         // disk, etc.: log this abnormal condition,
766                         // and try the next volume.
767                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
768                         continue
769                 }
770                 if err := vol.Touch(hash); err != nil {
771                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
772                         bestErr = err
773                         continue
774                 }
775                 // Compare and Touch both worked --> done.
776                 return vol.Replication(), nil
777         }
778         return 0, bestErr
779 }
780
781 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
782
783 // IsValidLocator returns true if the specified string is a valid Keep locator.
784 //   When Keep is extended to support hash types other than MD5,
785 //   this should be updated to cover those as well.
786 //
787 func IsValidLocator(loc string) bool {
788         return validLocatorRe.MatchString(loc)
789 }
790
791 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
792
793 // GetAPIToken returns the OAuth2 token from the Authorization
794 // header of a HTTP request, or an empty string if no matching
795 // token is found.
796 func GetAPIToken(req *http.Request) string {
797         if auth, ok := req.Header["Authorization"]; ok {
798                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
799                         return match[1]
800                 }
801         }
802         return ""
803 }
804
805 // IsExpired returns true if the given Unix timestamp (expressed as a
806 // hexadecimal string) is in the past, or if timestampHex cannot be
807 // parsed as a hexadecimal string.
808 func IsExpired(timestampHex string) bool {
809         ts, err := strconv.ParseInt(timestampHex, 16, 0)
810         if err != nil {
811                 log.Printf("IsExpired: %s", err)
812                 return true
813         }
814         return time.Unix(ts, 0).Before(time.Now())
815 }
816
817 // CanDelete returns true if the user identified by apiToken is
818 // allowed to delete blocks.
819 func CanDelete(apiToken string) bool {
820         if apiToken == "" {
821                 return false
822         }
823         // Blocks may be deleted only when Keep has been configured with a
824         // data manager.
825         if IsSystemAuth(apiToken) {
826                 return true
827         }
828         // TODO(twp): look up apiToken with the API server
829         // return true if is_admin is true and if the token
830         // has unlimited scope
831         return false
832 }
833
834 // IsSystemAuth returns true if the given token is allowed to perform
835 // system level actions like deleting data.
836 func IsSystemAuth(token string) bool {
837         return token != "" && token == theConfig.systemAuthToken
838 }