Merge branch 'master' into 11454-wb-federated-search
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // REST handlers for Keep are implemented here.
8 //
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler    (GET /index, GET /index/prefix)
12 // StatusHandler   (GET /status.json)
13
14 import (
15         "container/list"
16         "context"
17         "crypto/md5"
18         "encoding/json"
19         "fmt"
20         "io"
21         "net/http"
22         "os"
23         "regexp"
24         "runtime"
25         "strconv"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gorilla/mux"
31
32         "git.curoverse.com/arvados.git/sdk/go/health"
33         "git.curoverse.com/arvados.git/sdk/go/httpserver"
34         log "github.com/Sirupsen/logrus"
35 )
36
37 type router struct {
38         *mux.Router
39         limiter httpserver.RequestCounter
40 }
41
42 // MakeRESTRouter returns a new router that forwards all Keep requests
43 // to the appropriate handlers.
44 func MakeRESTRouter() *router {
45         rest := mux.NewRouter()
46         rtr := &router{Router: rest}
47
48         rest.HandleFunc(
49                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
50         rest.HandleFunc(
51                 `/{hash:[0-9a-f]{32}}+{hints}`,
52                 GetBlockHandler).Methods("GET", "HEAD")
53
54         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
55         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
56         // List all blocks stored here. Privileged client only.
57         rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
58         // List blocks stored here whose hash has the given prefix.
59         // Privileged client only.
60         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
61
62         // Internals/debugging info (runtime.MemStats)
63         rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
64
65         // List volumes: path, device number, bytes used/avail.
66         rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
67
68         // List mounts: UUID, readonly, tier, device ID, ...
69         rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
70         rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
71         rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
72
73         // Replace the current pull queue.
74         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
75
76         // Replace the current trash queue.
77         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
78
79         // Untrash moves blocks from trash back into store
80         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
81
82         rest.Handle("/_health/{check}", &health.Handler{
83                 Token:  theConfig.ManagementToken,
84                 Prefix: "/_health/",
85         }).Methods("GET")
86
87         // Any request which does not match any of these routes gets
88         // 400 Bad Request.
89         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
90
91         return rtr
92 }
93
94 // BadRequestHandler is a HandleFunc to address bad requests.
95 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
96         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
97 }
98
99 // GetBlockHandler is a HandleFunc to address Get block requests.
100 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
101         ctx, cancel := contextForResponse(context.TODO(), resp)
102         defer cancel()
103
104         if theConfig.RequireSignatures {
105                 locator := req.URL.Path[1:] // strip leading slash
106                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
107                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
108                         return
109                 }
110         }
111
112         // TODO: Probe volumes to check whether the block _might_
113         // exist. Some volumes/types could support a quick existence
114         // check without causing other operations to suffer. If all
115         // volumes support that, and assure us the block definitely
116         // isn't here, we can return 404 now instead of waiting for a
117         // buffer.
118
119         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
120         if err != nil {
121                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
122                 return
123         }
124         defer bufs.Put(buf)
125
126         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
127         if err != nil {
128                 code := http.StatusInternalServerError
129                 if err, ok := err.(*KeepError); ok {
130                         code = err.HTTPCode
131                 }
132                 http.Error(resp, err.Error(), code)
133                 return
134         }
135
136         resp.Header().Set("Content-Length", strconv.Itoa(size))
137         resp.Header().Set("Content-Type", "application/octet-stream")
138         resp.Write(buf[:size])
139 }
140
141 // Return a new context that gets cancelled by resp's CloseNotifier.
142 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
143         ctx, cancel := context.WithCancel(parent)
144         if cn, ok := resp.(http.CloseNotifier); ok {
145                 go func(c <-chan bool) {
146                         select {
147                         case <-c:
148                                 theConfig.debugLogf("cancel context")
149                                 cancel()
150                         case <-ctx.Done():
151                         }
152                 }(cn.CloseNotify())
153         }
154         return ctx, cancel
155 }
156
157 // Get a buffer from the pool -- but give up and return a non-nil
158 // error if ctx ends before we get a buffer.
159 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
160         bufReady := make(chan []byte)
161         go func() {
162                 bufReady <- bufs.Get(bufSize)
163         }()
164         select {
165         case buf := <-bufReady:
166                 return buf, nil
167         case <-ctx.Done():
168                 go func() {
169                         // Even if closeNotifier happened first, we
170                         // need to keep waiting for our buf so we can
171                         // return it to the pool.
172                         bufs.Put(<-bufReady)
173                 }()
174                 return nil, ErrClientDisconnect
175         }
176 }
177
178 // PutBlockHandler is a HandleFunc to address Put block requests.
179 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
180         ctx, cancel := contextForResponse(context.TODO(), resp)
181         defer cancel()
182
183         hash := mux.Vars(req)["hash"]
184
185         // Detect as many error conditions as possible before reading
186         // the body: avoid transmitting data that will not end up
187         // being written anyway.
188
189         if req.ContentLength == -1 {
190                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
191                 return
192         }
193
194         if req.ContentLength > BlockSize {
195                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
196                 return
197         }
198
199         if len(KeepVM.AllWritable()) == 0 {
200                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
201                 return
202         }
203
204         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
205         if err != nil {
206                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
207                 return
208         }
209
210         _, err = io.ReadFull(req.Body, buf)
211         if err != nil {
212                 http.Error(resp, err.Error(), 500)
213                 bufs.Put(buf)
214                 return
215         }
216
217         replication, err := PutBlock(ctx, buf, hash)
218         bufs.Put(buf)
219
220         if err != nil {
221                 code := http.StatusInternalServerError
222                 if err, ok := err.(*KeepError); ok {
223                         code = err.HTTPCode
224                 }
225                 http.Error(resp, err.Error(), code)
226                 return
227         }
228
229         // Success; add a size hint, sign the locator if possible, and
230         // return it to the client.
231         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
232         apiToken := GetAPIToken(req)
233         if theConfig.blobSigningKey != nil && apiToken != "" {
234                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
235                 returnHash = SignLocator(returnHash, apiToken, expiry)
236         }
237         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
238         resp.Write([]byte(returnHash + "\n"))
239 }
240
241 // IndexHandler responds to "/index", "/index/{prefix}", and
242 // "/mounts/{uuid}/blocks" requests.
243 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
244         if !IsSystemAuth(GetAPIToken(req)) {
245                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
246                 return
247         }
248
249         prefix := mux.Vars(req)["prefix"]
250         if prefix == "" {
251                 req.ParseForm()
252                 prefix = req.Form.Get("prefix")
253         }
254
255         uuid := mux.Vars(req)["uuid"]
256
257         var vols []Volume
258         if uuid == "" {
259                 vols = KeepVM.AllReadable()
260         } else if v := KeepVM.Lookup(uuid, false); v == nil {
261                 http.Error(resp, "mount not found", http.StatusNotFound)
262                 return
263         } else {
264                 vols = []Volume{v}
265         }
266
267         for _, v := range vols {
268                 if err := v.IndexTo(prefix, resp); err != nil {
269                         // The only errors returned by IndexTo are
270                         // write errors returned by resp.Write(),
271                         // which probably means the client has
272                         // disconnected and this error will never be
273                         // reported to the client -- but it will
274                         // appear in our own error log.
275                         http.Error(resp, err.Error(), http.StatusInternalServerError)
276                         return
277                 }
278         }
279         // An empty line at EOF is the only way the client can be
280         // assured the entire index was received.
281         resp.Write([]byte{'\n'})
282 }
283
284 // MountsHandler responds to "GET /mounts" requests.
285 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
286         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
287         if err != nil {
288                 http.Error(resp, err.Error(), http.StatusInternalServerError)
289         }
290 }
291
292 // PoolStatus struct
293 type PoolStatus struct {
294         Alloc uint64 `json:"BytesAllocatedCumulative"`
295         Cap   int    `json:"BuffersMax"`
296         Len   int    `json:"BuffersInUse"`
297 }
298
299 type volumeStatusEnt struct {
300         Label         string
301         Status        *VolumeStatus `json:",omitempty"`
302         VolumeStats   *ioStats      `json:",omitempty"`
303         InternalStats interface{}   `json:",omitempty"`
304 }
305
306 // NodeStatus struct
307 type NodeStatus struct {
308         Volumes         []*volumeStatusEnt
309         BufferPool      PoolStatus
310         PullQueue       WorkQueueStatus
311         TrashQueue      WorkQueueStatus
312         RequestsCurrent int
313         RequestsMax     int
314         Version         string
315 }
316
317 var st NodeStatus
318 var stLock sync.Mutex
319
320 // DebugHandler addresses /debug.json requests.
321 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
322         type debugStats struct {
323                 MemStats runtime.MemStats
324         }
325         var ds debugStats
326         runtime.ReadMemStats(&ds.MemStats)
327         err := json.NewEncoder(resp).Encode(&ds)
328         if err != nil {
329                 http.Error(resp, err.Error(), 500)
330         }
331 }
332
333 // StatusHandler addresses /status.json requests.
334 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
335         stLock.Lock()
336         rtr.readNodeStatus(&st)
337         jstat, err := json.Marshal(&st)
338         stLock.Unlock()
339         if err == nil {
340                 resp.Write(jstat)
341         } else {
342                 log.Printf("json.Marshal: %s", err)
343                 log.Printf("NodeStatus = %v", &st)
344                 http.Error(resp, err.Error(), 500)
345         }
346 }
347
348 // populate the given NodeStatus struct with current values.
349 func (rtr *router) readNodeStatus(st *NodeStatus) {
350         st.Version = version
351         vols := KeepVM.AllReadable()
352         if cap(st.Volumes) < len(vols) {
353                 st.Volumes = make([]*volumeStatusEnt, len(vols))
354         }
355         st.Volumes = st.Volumes[:0]
356         for _, vol := range vols {
357                 var internalStats interface{}
358                 if vol, ok := vol.(InternalStatser); ok {
359                         internalStats = vol.InternalStats()
360                 }
361                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
362                         Label:         vol.String(),
363                         Status:        vol.Status(),
364                         InternalStats: internalStats,
365                         //VolumeStats: KeepVM.VolumeStats(vol),
366                 })
367         }
368         st.BufferPool.Alloc = bufs.Alloc()
369         st.BufferPool.Cap = bufs.Cap()
370         st.BufferPool.Len = bufs.Len()
371         st.PullQueue = getWorkQueueStatus(pullq)
372         st.TrashQueue = getWorkQueueStatus(trashq)
373         if rtr.limiter != nil {
374                 st.RequestsCurrent = rtr.limiter.Current()
375                 st.RequestsMax = rtr.limiter.Max()
376         }
377 }
378
379 // return a WorkQueueStatus for the given queue. If q is nil (which
380 // should never happen except in test suites), return a zero status
381 // value instead of crashing.
382 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
383         if q == nil {
384                 // This should only happen during tests.
385                 return WorkQueueStatus{}
386         }
387         return q.Status()
388 }
389
390 // DeleteHandler processes DELETE requests.
391 //
392 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
393 // from all connected volumes.
394 //
395 // Only the Data Manager, or an Arvados admin with scope "all", are
396 // allowed to issue DELETE requests.  If a DELETE request is not
397 // authenticated or is issued by a non-admin user, the server returns
398 // a PermissionError.
399 //
400 // Upon receiving a valid request from an authorized user,
401 // DeleteHandler deletes all copies of the specified block on local
402 // writable volumes.
403 //
404 // Response format:
405 //
406 // If the requested blocks was not found on any volume, the response
407 // code is HTTP 404 Not Found.
408 //
409 // Otherwise, the response code is 200 OK, with a response body
410 // consisting of the JSON message
411 //
412 //    {"copies_deleted":d,"copies_failed":f}
413 //
414 // where d and f are integers representing the number of blocks that
415 // were successfully and unsuccessfully deleted.
416 //
417 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
418         hash := mux.Vars(req)["hash"]
419
420         // Confirm that this user is an admin and has a token with unlimited scope.
421         var tok = GetAPIToken(req)
422         if tok == "" || !CanDelete(tok) {
423                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
424                 return
425         }
426
427         if !theConfig.EnableDelete {
428                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
429                 return
430         }
431
432         // Delete copies of this block from all available volumes.
433         // Report how many blocks were successfully deleted, and how
434         // many were found on writable volumes but not deleted.
435         var result struct {
436                 Deleted int `json:"copies_deleted"`
437                 Failed  int `json:"copies_failed"`
438         }
439         for _, vol := range KeepVM.AllWritable() {
440                 if err := vol.Trash(hash); err == nil {
441                         result.Deleted++
442                 } else if os.IsNotExist(err) {
443                         continue
444                 } else {
445                         result.Failed++
446                         log.Println("DeleteHandler:", err)
447                 }
448         }
449
450         var st int
451
452         if result.Deleted == 0 && result.Failed == 0 {
453                 st = http.StatusNotFound
454         } else {
455                 st = http.StatusOK
456         }
457
458         resp.WriteHeader(st)
459
460         if st == http.StatusOK {
461                 if body, err := json.Marshal(result); err == nil {
462                         resp.Write(body)
463                 } else {
464                         log.Printf("json.Marshal: %s (result = %v)", err, result)
465                         http.Error(resp, err.Error(), 500)
466                 }
467         }
468 }
469
470 /* PullHandler processes "PUT /pull" requests for the data manager.
471    The request body is a JSON message containing a list of pull
472    requests in the following format:
473
474    [
475       {
476          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
477          "servers":[
478                         "keep0.qr1hi.arvadosapi.com:25107",
479                         "keep1.qr1hi.arvadosapi.com:25108"
480                  ]
481           },
482           {
483                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
484                  "servers":[
485                         "10.0.1.5:25107",
486                         "10.0.1.6:25107",
487                         "10.0.1.7:25108"
488                  ]
489           },
490           ...
491    ]
492
493    Each pull request in the list consists of a block locator string
494    and an ordered list of servers.  Keepstore should try to fetch the
495    block from each server in turn.
496
497    If the request has not been sent by the Data Manager, return 401
498    Unauthorized.
499
500    If the JSON unmarshalling fails, return 400 Bad Request.
501 */
502
503 // PullRequest consists of a block locator and an ordered list of servers
504 type PullRequest struct {
505         Locator string   `json:"locator"`
506         Servers []string `json:"servers"`
507
508         // Destination mount, or "" for "anywhere"
509         MountUUID string
510 }
511
512 // PullHandler processes "PUT /pull" requests for the data manager.
513 func PullHandler(resp http.ResponseWriter, req *http.Request) {
514         // Reject unauthorized requests.
515         if !IsSystemAuth(GetAPIToken(req)) {
516                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
517                 return
518         }
519
520         // Parse the request body.
521         var pr []PullRequest
522         r := json.NewDecoder(req.Body)
523         if err := r.Decode(&pr); err != nil {
524                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
525                 return
526         }
527
528         // We have a properly formatted pull list sent from the data
529         // manager.  Report success and send the list to the pull list
530         // manager for further handling.
531         resp.WriteHeader(http.StatusOK)
532         resp.Write([]byte(
533                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
534
535         plist := list.New()
536         for _, p := range pr {
537                 plist.PushBack(p)
538         }
539         pullq.ReplaceQueue(plist)
540 }
541
542 // TrashRequest consists of a block locator and it's Mtime
543 type TrashRequest struct {
544         Locator    string `json:"locator"`
545         BlockMtime int64  `json:"block_mtime"`
546
547         // Target mount, or "" for "everywhere"
548         MountUUID string
549 }
550
551 // TrashHandler processes /trash requests.
552 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
553         // Reject unauthorized requests.
554         if !IsSystemAuth(GetAPIToken(req)) {
555                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
556                 return
557         }
558
559         // Parse the request body.
560         var trash []TrashRequest
561         r := json.NewDecoder(req.Body)
562         if err := r.Decode(&trash); err != nil {
563                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
564                 return
565         }
566
567         // We have a properly formatted trash list sent from the data
568         // manager.  Report success and send the list to the trash work
569         // queue for further handling.
570         resp.WriteHeader(http.StatusOK)
571         resp.Write([]byte(
572                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
573
574         tlist := list.New()
575         for _, t := range trash {
576                 tlist.PushBack(t)
577         }
578         trashq.ReplaceQueue(tlist)
579 }
580
581 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
582 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
583         // Reject unauthorized requests.
584         if !IsSystemAuth(GetAPIToken(req)) {
585                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
586                 return
587         }
588
589         hash := mux.Vars(req)["hash"]
590
591         if len(KeepVM.AllWritable()) == 0 {
592                 http.Error(resp, "No writable volumes", http.StatusNotFound)
593                 return
594         }
595
596         var untrashedOn, failedOn []string
597         var numNotFound int
598         for _, vol := range KeepVM.AllWritable() {
599                 err := vol.Untrash(hash)
600
601                 if os.IsNotExist(err) {
602                         numNotFound++
603                 } else if err != nil {
604                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
605                         failedOn = append(failedOn, vol.String())
606                 } else {
607                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
608                         untrashedOn = append(untrashedOn, vol.String())
609                 }
610         }
611
612         if numNotFound == len(KeepVM.AllWritable()) {
613                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
614                 return
615         }
616
617         if len(failedOn) == len(KeepVM.AllWritable()) {
618                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
619         } else {
620                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
621                 if len(failedOn) > 0 {
622                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
623                 }
624                 resp.Write([]byte(respBody))
625         }
626 }
627
628 // GetBlock and PutBlock implement lower-level code for handling
629 // blocks by rooting through volumes connected to the local machine.
630 // Once the handler has determined that system policy permits the
631 // request, it calls these methods to perform the actual operation.
632 //
633 // TODO(twp): this code would probably be better located in the
634 // VolumeManager interface. As an abstraction, the VolumeManager
635 // should be the only part of the code that cares about which volume a
636 // block is stored on, so it should be responsible for figuring out
637 // which volume to check for fetching blocks, storing blocks, etc.
638
639 // GetBlock fetches the block identified by "hash" into the provided
640 // buf, and returns the data size.
641 //
642 // If the block cannot be found on any volume, returns NotFoundError.
643 //
644 // If the block found does not have the correct MD5 hash, returns
645 // DiskHashError.
646 //
647 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
648         // Attempt to read the requested hash from a keep volume.
649         errorToCaller := NotFoundError
650
651         for _, vol := range KeepVM.AllReadable() {
652                 size, err := vol.Get(ctx, hash, buf)
653                 select {
654                 case <-ctx.Done():
655                         return 0, ErrClientDisconnect
656                 default:
657                 }
658                 if err != nil {
659                         // IsNotExist is an expected error and may be
660                         // ignored. All other errors are logged. In
661                         // any case we continue trying to read other
662                         // volumes. If all volumes report IsNotExist,
663                         // we return a NotFoundError.
664                         if !os.IsNotExist(err) {
665                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
666                         }
667                         continue
668                 }
669                 // Check the file checksum.
670                 //
671                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
672                 if filehash != hash {
673                         // TODO: Try harder to tell a sysadmin about
674                         // this.
675                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
676                                 vol, hash, filehash)
677                         errorToCaller = DiskHashError
678                         continue
679                 }
680                 if errorToCaller == DiskHashError {
681                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
682                                 vol, hash)
683                 }
684                 return size, nil
685         }
686         return 0, errorToCaller
687 }
688
689 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
690 //
691 // PutBlock(ctx, block, hash)
692 //   Stores the BLOCK (identified by the content id HASH) in Keep.
693 //
694 //   The MD5 checksum of the block must be identical to the content id HASH.
695 //   If not, an error is returned.
696 //
697 //   PutBlock stores the BLOCK on the first Keep volume with free space.
698 //   A failure code is returned to the user only if all volumes fail.
699 //
700 //   On success, PutBlock returns nil.
701 //   On failure, it returns a KeepError with one of the following codes:
702 //
703 //   500 Collision
704 //          A different block with the same hash already exists on this
705 //          Keep server.
706 //   422 MD5Fail
707 //          The MD5 hash of the BLOCK does not match the argument HASH.
708 //   503 Full
709 //          There was not enough space left in any Keep volume to store
710 //          the object.
711 //   500 Fail
712 //          The object could not be stored for some other reason (e.g.
713 //          all writes failed). The text of the error message should
714 //          provide as much detail as possible.
715 //
716 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
717         // Check that BLOCK's checksum matches HASH.
718         blockhash := fmt.Sprintf("%x", md5.Sum(block))
719         if blockhash != hash {
720                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
721                 return 0, RequestHashError
722         }
723
724         // If we already have this data, it's intact on disk, and we
725         // can update its timestamp, return success. If we have
726         // different data with the same hash, return failure.
727         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
728                 return n, err
729         } else if ctx.Err() != nil {
730                 return 0, ErrClientDisconnect
731         }
732
733         // Choose a Keep volume to write to.
734         // If this volume fails, try all of the volumes in order.
735         if vol := KeepVM.NextWritable(); vol != nil {
736                 if err := vol.Put(ctx, hash, block); err == nil {
737                         return vol.Replication(), nil // success!
738                 }
739                 if ctx.Err() != nil {
740                         return 0, ErrClientDisconnect
741                 }
742         }
743
744         writables := KeepVM.AllWritable()
745         if len(writables) == 0 {
746                 log.Print("No writable volumes.")
747                 return 0, FullError
748         }
749
750         allFull := true
751         for _, vol := range writables {
752                 err := vol.Put(ctx, hash, block)
753                 if ctx.Err() != nil {
754                         return 0, ErrClientDisconnect
755                 }
756                 if err == nil {
757                         return vol.Replication(), nil // success!
758                 }
759                 if err != FullError {
760                         // The volume is not full but the
761                         // write did not succeed.  Report the
762                         // error and continue trying.
763                         allFull = false
764                         log.Printf("%s: Write(%s): %s", vol, hash, err)
765                 }
766         }
767
768         if allFull {
769                 log.Print("All volumes are full.")
770                 return 0, FullError
771         }
772         // Already logged the non-full errors.
773         return 0, GenericError
774 }
775
776 // CompareAndTouch returns the current replication level if one of the
777 // volumes already has the given content and it successfully updates
778 // the relevant block's modification time in order to protect it from
779 // premature garbage collection. Otherwise, it returns a non-nil
780 // error.
781 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
782         var bestErr error = NotFoundError
783         for _, vol := range KeepVM.AllWritable() {
784                 err := vol.Compare(ctx, hash, buf)
785                 if ctx.Err() != nil {
786                         return 0, ctx.Err()
787                 } else if err == CollisionError {
788                         // Stop if we have a block with same hash but
789                         // different content. (It will be impossible
790                         // to tell which one is wanted if we have
791                         // both, so there's no point writing it even
792                         // on a different volume.)
793                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
794                         return 0, err
795                 } else if os.IsNotExist(err) {
796                         // Block does not exist. This is the only
797                         // "normal" error: we don't log anything.
798                         continue
799                 } else if err != nil {
800                         // Couldn't open file, data is corrupt on
801                         // disk, etc.: log this abnormal condition,
802                         // and try the next volume.
803                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
804                         continue
805                 }
806                 if err := vol.Touch(hash); err != nil {
807                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
808                         bestErr = err
809                         continue
810                 }
811                 // Compare and Touch both worked --> done.
812                 return vol.Replication(), nil
813         }
814         return 0, bestErr
815 }
816
817 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
818
819 // IsValidLocator returns true if the specified string is a valid Keep locator.
820 //   When Keep is extended to support hash types other than MD5,
821 //   this should be updated to cover those as well.
822 //
823 func IsValidLocator(loc string) bool {
824         return validLocatorRe.MatchString(loc)
825 }
826
827 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
828
829 // GetAPIToken returns the OAuth2 token from the Authorization
830 // header of a HTTP request, or an empty string if no matching
831 // token is found.
832 func GetAPIToken(req *http.Request) string {
833         if auth, ok := req.Header["Authorization"]; ok {
834                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
835                         return match[1]
836                 }
837         }
838         return ""
839 }
840
841 // IsExpired returns true if the given Unix timestamp (expressed as a
842 // hexadecimal string) is in the past, or if timestampHex cannot be
843 // parsed as a hexadecimal string.
844 func IsExpired(timestampHex string) bool {
845         ts, err := strconv.ParseInt(timestampHex, 16, 0)
846         if err != nil {
847                 log.Printf("IsExpired: %s", err)
848                 return true
849         }
850         return time.Unix(ts, 0).Before(time.Now())
851 }
852
853 // CanDelete returns true if the user identified by apiToken is
854 // allowed to delete blocks.
855 func CanDelete(apiToken string) bool {
856         if apiToken == "" {
857                 return false
858         }
859         // Blocks may be deleted only when Keep has been configured with a
860         // data manager.
861         if IsSystemAuth(apiToken) {
862                 return true
863         }
864         // TODO(twp): look up apiToken with the API server
865         // return true if is_admin is true and if the token
866         // has unlimited scope
867         return false
868 }
869
870 // IsSystemAuth returns true if the given token is allowed to perform
871 // system level actions like deleting data.
872 func IsSystemAuth(token string) bool {
873         return token != "" && token == theConfig.systemAuthToken
874 }