11065: Merge branch 'master' into 11065-optional-audit-logging
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // REST handlers for Keep are implemented here.
8 //
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler    (GET /index, GET /index/prefix)
12 // StatusHandler   (GET /status.json)
13
14 import (
15         "container/list"
16         "context"
17         "crypto/md5"
18         "encoding/json"
19         "fmt"
20         "io"
21         "net/http"
22         "os"
23         "regexp"
24         "runtime"
25         "strconv"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gorilla/mux"
31
32         "git.curoverse.com/arvados.git/sdk/go/health"
33         "git.curoverse.com/arvados.git/sdk/go/httpserver"
34 )
35
36 type router struct {
37         *mux.Router
38         limiter httpserver.RequestCounter
39 }
40
41 // MakeRESTRouter returns a new router that forwards all Keep requests
42 // to the appropriate handlers.
43 func MakeRESTRouter() http.Handler {
44         rtr := &router{Router: mux.NewRouter()}
45
46         rtr.HandleFunc(
47                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
48         rtr.HandleFunc(
49                 `/{hash:[0-9a-f]{32}}+{hints}`,
50                 GetBlockHandler).Methods("GET", "HEAD")
51
52         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
53         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
54         // List all blocks stored here. Privileged client only.
55         rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
56         // List blocks stored here whose hash has the given prefix.
57         // Privileged client only.
58         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
59
60         // Internals/debugging info (runtime.MemStats)
61         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
62
63         // List volumes: path, device number, bytes used/avail.
64         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
65
66         // List mounts: UUID, readonly, tier, device ID, ...
67         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
68         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
69         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
70
71         // Replace the current pull queue.
72         rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
73
74         // Replace the current trash queue.
75         rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
76
77         // Untrash moves blocks from trash back into store
78         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
79
80         rtr.Handle("/_health/{check}", &health.Handler{
81                 Token:  theConfig.ManagementToken,
82                 Prefix: "/_health/",
83         }).Methods("GET")
84
85         // Any request which does not match any of these routes gets
86         // 400 Bad Request.
87         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
88
89         rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
90
91         stack := httpserver.Instrument(nil, nil,
92                 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
93         return stack.ServeAPI(stack)
94 }
95
96 // BadRequestHandler is a HandleFunc to address bad requests.
97 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
98         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
99 }
100
101 // GetBlockHandler is a HandleFunc to address Get block requests.
102 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
103         ctx, cancel := contextForResponse(context.TODO(), resp)
104         defer cancel()
105
106         if theConfig.RequireSignatures {
107                 locator := req.URL.Path[1:] // strip leading slash
108                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
109                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
110                         return
111                 }
112         }
113
114         // TODO: Probe volumes to check whether the block _might_
115         // exist. Some volumes/types could support a quick existence
116         // check without causing other operations to suffer. If all
117         // volumes support that, and assure us the block definitely
118         // isn't here, we can return 404 now instead of waiting for a
119         // buffer.
120
121         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
122         if err != nil {
123                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
124                 return
125         }
126         defer bufs.Put(buf)
127
128         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
129         if err != nil {
130                 code := http.StatusInternalServerError
131                 if err, ok := err.(*KeepError); ok {
132                         code = err.HTTPCode
133                 }
134                 http.Error(resp, err.Error(), code)
135                 return
136         }
137
138         resp.Header().Set("Content-Length", strconv.Itoa(size))
139         resp.Header().Set("Content-Type", "application/octet-stream")
140         resp.Write(buf[:size])
141 }
142
143 // Return a new context that gets cancelled by resp's CloseNotifier.
144 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
145         ctx, cancel := context.WithCancel(parent)
146         if cn, ok := resp.(http.CloseNotifier); ok {
147                 go func(c <-chan bool) {
148                         select {
149                         case <-c:
150                                 theConfig.debugLogf("cancel context")
151                                 cancel()
152                         case <-ctx.Done():
153                         }
154                 }(cn.CloseNotify())
155         }
156         return ctx, cancel
157 }
158
159 // Get a buffer from the pool -- but give up and return a non-nil
160 // error if ctx ends before we get a buffer.
161 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
162         bufReady := make(chan []byte)
163         go func() {
164                 bufReady <- bufs.Get(bufSize)
165         }()
166         select {
167         case buf := <-bufReady:
168                 return buf, nil
169         case <-ctx.Done():
170                 go func() {
171                         // Even if closeNotifier happened first, we
172                         // need to keep waiting for our buf so we can
173                         // return it to the pool.
174                         bufs.Put(<-bufReady)
175                 }()
176                 return nil, ErrClientDisconnect
177         }
178 }
179
180 // PutBlockHandler is a HandleFunc to address Put block requests.
181 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
182         ctx, cancel := contextForResponse(context.TODO(), resp)
183         defer cancel()
184
185         hash := mux.Vars(req)["hash"]
186
187         // Detect as many error conditions as possible before reading
188         // the body: avoid transmitting data that will not end up
189         // being written anyway.
190
191         if req.ContentLength == -1 {
192                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
193                 return
194         }
195
196         if req.ContentLength > BlockSize {
197                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
198                 return
199         }
200
201         if len(KeepVM.AllWritable()) == 0 {
202                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
203                 return
204         }
205
206         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
207         if err != nil {
208                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
209                 return
210         }
211
212         _, err = io.ReadFull(req.Body, buf)
213         if err != nil {
214                 http.Error(resp, err.Error(), 500)
215                 bufs.Put(buf)
216                 return
217         }
218
219         replication, err := PutBlock(ctx, buf, hash)
220         bufs.Put(buf)
221
222         if err != nil {
223                 code := http.StatusInternalServerError
224                 if err, ok := err.(*KeepError); ok {
225                         code = err.HTTPCode
226                 }
227                 http.Error(resp, err.Error(), code)
228                 return
229         }
230
231         // Success; add a size hint, sign the locator if possible, and
232         // return it to the client.
233         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
234         apiToken := GetAPIToken(req)
235         if theConfig.blobSigningKey != nil && apiToken != "" {
236                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
237                 returnHash = SignLocator(returnHash, apiToken, expiry)
238         }
239         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
240         resp.Write([]byte(returnHash + "\n"))
241 }
242
243 // IndexHandler responds to "/index", "/index/{prefix}", and
244 // "/mounts/{uuid}/blocks" requests.
245 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
246         if !IsSystemAuth(GetAPIToken(req)) {
247                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
248                 return
249         }
250
251         prefix := mux.Vars(req)["prefix"]
252         if prefix == "" {
253                 req.ParseForm()
254                 prefix = req.Form.Get("prefix")
255         }
256
257         uuid := mux.Vars(req)["uuid"]
258
259         var vols []Volume
260         if uuid == "" {
261                 vols = KeepVM.AllReadable()
262         } else if v := KeepVM.Lookup(uuid, false); v == nil {
263                 http.Error(resp, "mount not found", http.StatusNotFound)
264                 return
265         } else {
266                 vols = []Volume{v}
267         }
268
269         for _, v := range vols {
270                 if err := v.IndexTo(prefix, resp); err != nil {
271                         // The only errors returned by IndexTo are
272                         // write errors returned by resp.Write(),
273                         // which probably means the client has
274                         // disconnected and this error will never be
275                         // reported to the client -- but it will
276                         // appear in our own error log.
277                         http.Error(resp, err.Error(), http.StatusInternalServerError)
278                         return
279                 }
280         }
281         // An empty line at EOF is the only way the client can be
282         // assured the entire index was received.
283         resp.Write([]byte{'\n'})
284 }
285
286 // MountsHandler responds to "GET /mounts" requests.
287 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
288         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
289         if err != nil {
290                 http.Error(resp, err.Error(), http.StatusInternalServerError)
291         }
292 }
293
294 // PoolStatus struct
295 type PoolStatus struct {
296         Alloc uint64 `json:"BytesAllocatedCumulative"`
297         Cap   int    `json:"BuffersMax"`
298         Len   int    `json:"BuffersInUse"`
299 }
300
301 type volumeStatusEnt struct {
302         Label         string
303         Status        *VolumeStatus `json:",omitempty"`
304         VolumeStats   *ioStats      `json:",omitempty"`
305         InternalStats interface{}   `json:",omitempty"`
306 }
307
308 // NodeStatus struct
309 type NodeStatus struct {
310         Volumes         []*volumeStatusEnt
311         BufferPool      PoolStatus
312         PullQueue       WorkQueueStatus
313         TrashQueue      WorkQueueStatus
314         RequestsCurrent int
315         RequestsMax     int
316         Version         string
317 }
318
319 var st NodeStatus
320 var stLock sync.Mutex
321
322 // DebugHandler addresses /debug.json requests.
323 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
324         type debugStats struct {
325                 MemStats runtime.MemStats
326         }
327         var ds debugStats
328         runtime.ReadMemStats(&ds.MemStats)
329         err := json.NewEncoder(resp).Encode(&ds)
330         if err != nil {
331                 http.Error(resp, err.Error(), 500)
332         }
333 }
334
335 // StatusHandler addresses /status.json requests.
336 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
337         stLock.Lock()
338         rtr.readNodeStatus(&st)
339         jstat, err := json.Marshal(&st)
340         stLock.Unlock()
341         if err == nil {
342                 resp.Write(jstat)
343         } else {
344                 log.Printf("json.Marshal: %s", err)
345                 log.Printf("NodeStatus = %v", &st)
346                 http.Error(resp, err.Error(), 500)
347         }
348 }
349
350 // populate the given NodeStatus struct with current values.
351 func (rtr *router) readNodeStatus(st *NodeStatus) {
352         st.Version = version
353         vols := KeepVM.AllReadable()
354         if cap(st.Volumes) < len(vols) {
355                 st.Volumes = make([]*volumeStatusEnt, len(vols))
356         }
357         st.Volumes = st.Volumes[:0]
358         for _, vol := range vols {
359                 var internalStats interface{}
360                 if vol, ok := vol.(InternalStatser); ok {
361                         internalStats = vol.InternalStats()
362                 }
363                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
364                         Label:         vol.String(),
365                         Status:        vol.Status(),
366                         InternalStats: internalStats,
367                         //VolumeStats: KeepVM.VolumeStats(vol),
368                 })
369         }
370         st.BufferPool.Alloc = bufs.Alloc()
371         st.BufferPool.Cap = bufs.Cap()
372         st.BufferPool.Len = bufs.Len()
373         st.PullQueue = getWorkQueueStatus(pullq)
374         st.TrashQueue = getWorkQueueStatus(trashq)
375         if rtr.limiter != nil {
376                 st.RequestsCurrent = rtr.limiter.Current()
377                 st.RequestsMax = rtr.limiter.Max()
378         }
379 }
380
381 // return a WorkQueueStatus for the given queue. If q is nil (which
382 // should never happen except in test suites), return a zero status
383 // value instead of crashing.
384 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
385         if q == nil {
386                 // This should only happen during tests.
387                 return WorkQueueStatus{}
388         }
389         return q.Status()
390 }
391
392 // DeleteHandler processes DELETE requests.
393 //
394 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
395 // from all connected volumes.
396 //
397 // Only the Data Manager, or an Arvados admin with scope "all", are
398 // allowed to issue DELETE requests.  If a DELETE request is not
399 // authenticated or is issued by a non-admin user, the server returns
400 // a PermissionError.
401 //
402 // Upon receiving a valid request from an authorized user,
403 // DeleteHandler deletes all copies of the specified block on local
404 // writable volumes.
405 //
406 // Response format:
407 //
408 // If the requested blocks was not found on any volume, the response
409 // code is HTTP 404 Not Found.
410 //
411 // Otherwise, the response code is 200 OK, with a response body
412 // consisting of the JSON message
413 //
414 //    {"copies_deleted":d,"copies_failed":f}
415 //
416 // where d and f are integers representing the number of blocks that
417 // were successfully and unsuccessfully deleted.
418 //
419 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
420         hash := mux.Vars(req)["hash"]
421
422         // Confirm that this user is an admin and has a token with unlimited scope.
423         var tok = GetAPIToken(req)
424         if tok == "" || !CanDelete(tok) {
425                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
426                 return
427         }
428
429         if !theConfig.EnableDelete {
430                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
431                 return
432         }
433
434         // Delete copies of this block from all available volumes.
435         // Report how many blocks were successfully deleted, and how
436         // many were found on writable volumes but not deleted.
437         var result struct {
438                 Deleted int `json:"copies_deleted"`
439                 Failed  int `json:"copies_failed"`
440         }
441         for _, vol := range KeepVM.AllWritable() {
442                 if err := vol.Trash(hash); err == nil {
443                         result.Deleted++
444                 } else if os.IsNotExist(err) {
445                         continue
446                 } else {
447                         result.Failed++
448                         log.Println("DeleteHandler:", err)
449                 }
450         }
451
452         var st int
453
454         if result.Deleted == 0 && result.Failed == 0 {
455                 st = http.StatusNotFound
456         } else {
457                 st = http.StatusOK
458         }
459
460         resp.WriteHeader(st)
461
462         if st == http.StatusOK {
463                 if body, err := json.Marshal(result); err == nil {
464                         resp.Write(body)
465                 } else {
466                         log.Printf("json.Marshal: %s (result = %v)", err, result)
467                         http.Error(resp, err.Error(), 500)
468                 }
469         }
470 }
471
472 /* PullHandler processes "PUT /pull" requests for the data manager.
473    The request body is a JSON message containing a list of pull
474    requests in the following format:
475
476    [
477       {
478          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
479          "servers":[
480                         "keep0.qr1hi.arvadosapi.com:25107",
481                         "keep1.qr1hi.arvadosapi.com:25108"
482                  ]
483           },
484           {
485                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
486                  "servers":[
487                         "10.0.1.5:25107",
488                         "10.0.1.6:25107",
489                         "10.0.1.7:25108"
490                  ]
491           },
492           ...
493    ]
494
495    Each pull request in the list consists of a block locator string
496    and an ordered list of servers.  Keepstore should try to fetch the
497    block from each server in turn.
498
499    If the request has not been sent by the Data Manager, return 401
500    Unauthorized.
501
502    If the JSON unmarshalling fails, return 400 Bad Request.
503 */
504
505 // PullRequest consists of a block locator and an ordered list of servers
506 type PullRequest struct {
507         Locator string   `json:"locator"`
508         Servers []string `json:"servers"`
509
510         // Destination mount, or "" for "anywhere"
511         MountUUID string `json:"mount_uuid"`
512 }
513
514 // PullHandler processes "PUT /pull" requests for the data manager.
515 func PullHandler(resp http.ResponseWriter, req *http.Request) {
516         // Reject unauthorized requests.
517         if !IsSystemAuth(GetAPIToken(req)) {
518                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
519                 return
520         }
521
522         // Parse the request body.
523         var pr []PullRequest
524         r := json.NewDecoder(req.Body)
525         if err := r.Decode(&pr); err != nil {
526                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
527                 return
528         }
529
530         // We have a properly formatted pull list sent from the data
531         // manager.  Report success and send the list to the pull list
532         // manager for further handling.
533         resp.WriteHeader(http.StatusOK)
534         resp.Write([]byte(
535                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
536
537         plist := list.New()
538         for _, p := range pr {
539                 plist.PushBack(p)
540         }
541         pullq.ReplaceQueue(plist)
542 }
543
544 // TrashRequest consists of a block locator and its Mtime
545 type TrashRequest struct {
546         Locator    string `json:"locator"`
547         BlockMtime int64  `json:"block_mtime"`
548
549         // Target mount, or "" for "everywhere"
550         MountUUID string `json:"mount_uuid"`
551 }
552
553 // TrashHandler processes /trash requests.
554 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
555         // Reject unauthorized requests.
556         if !IsSystemAuth(GetAPIToken(req)) {
557                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
558                 return
559         }
560
561         // Parse the request body.
562         var trash []TrashRequest
563         r := json.NewDecoder(req.Body)
564         if err := r.Decode(&trash); err != nil {
565                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
566                 return
567         }
568
569         // We have a properly formatted trash list sent from the data
570         // manager.  Report success and send the list to the trash work
571         // queue for further handling.
572         resp.WriteHeader(http.StatusOK)
573         resp.Write([]byte(
574                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
575
576         tlist := list.New()
577         for _, t := range trash {
578                 tlist.PushBack(t)
579         }
580         trashq.ReplaceQueue(tlist)
581 }
582
583 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
584 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
585         // Reject unauthorized requests.
586         if !IsSystemAuth(GetAPIToken(req)) {
587                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
588                 return
589         }
590
591         hash := mux.Vars(req)["hash"]
592
593         if len(KeepVM.AllWritable()) == 0 {
594                 http.Error(resp, "No writable volumes", http.StatusNotFound)
595                 return
596         }
597
598         var untrashedOn, failedOn []string
599         var numNotFound int
600         for _, vol := range KeepVM.AllWritable() {
601                 err := vol.Untrash(hash)
602
603                 if os.IsNotExist(err) {
604                         numNotFound++
605                 } else if err != nil {
606                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
607                         failedOn = append(failedOn, vol.String())
608                 } else {
609                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
610                         untrashedOn = append(untrashedOn, vol.String())
611                 }
612         }
613
614         if numNotFound == len(KeepVM.AllWritable()) {
615                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
616                 return
617         }
618
619         if len(failedOn) == len(KeepVM.AllWritable()) {
620                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
621         } else {
622                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
623                 if len(failedOn) > 0 {
624                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
625                 }
626                 resp.Write([]byte(respBody))
627         }
628 }
629
630 // GetBlock and PutBlock implement lower-level code for handling
631 // blocks by rooting through volumes connected to the local machine.
632 // Once the handler has determined that system policy permits the
633 // request, it calls these methods to perform the actual operation.
634 //
635 // TODO(twp): this code would probably be better located in the
636 // VolumeManager interface. As an abstraction, the VolumeManager
637 // should be the only part of the code that cares about which volume a
638 // block is stored on, so it should be responsible for figuring out
639 // which volume to check for fetching blocks, storing blocks, etc.
640
641 // GetBlock fetches the block identified by "hash" into the provided
642 // buf, and returns the data size.
643 //
644 // If the block cannot be found on any volume, returns NotFoundError.
645 //
646 // If the block found does not have the correct MD5 hash, returns
647 // DiskHashError.
648 //
649 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
650         // Attempt to read the requested hash from a keep volume.
651         errorToCaller := NotFoundError
652
653         for _, vol := range KeepVM.AllReadable() {
654                 size, err := vol.Get(ctx, hash, buf)
655                 select {
656                 case <-ctx.Done():
657                         return 0, ErrClientDisconnect
658                 default:
659                 }
660                 if err != nil {
661                         // IsNotExist is an expected error and may be
662                         // ignored. All other errors are logged. In
663                         // any case we continue trying to read other
664                         // volumes. If all volumes report IsNotExist,
665                         // we return a NotFoundError.
666                         if !os.IsNotExist(err) {
667                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
668                         }
669                         continue
670                 }
671                 // Check the file checksum.
672                 //
673                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
674                 if filehash != hash {
675                         // TODO: Try harder to tell a sysadmin about
676                         // this.
677                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
678                                 vol, hash, filehash)
679                         errorToCaller = DiskHashError
680                         continue
681                 }
682                 if errorToCaller == DiskHashError {
683                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
684                                 vol, hash)
685                 }
686                 return size, nil
687         }
688         return 0, errorToCaller
689 }
690
691 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
692 //
693 // PutBlock(ctx, block, hash)
694 //   Stores the BLOCK (identified by the content id HASH) in Keep.
695 //
696 //   The MD5 checksum of the block must be identical to the content id HASH.
697 //   If not, an error is returned.
698 //
699 //   PutBlock stores the BLOCK on the first Keep volume with free space.
700 //   A failure code is returned to the user only if all volumes fail.
701 //
702 //   On success, PutBlock returns nil.
703 //   On failure, it returns a KeepError with one of the following codes:
704 //
705 //   500 Collision
706 //          A different block with the same hash already exists on this
707 //          Keep server.
708 //   422 MD5Fail
709 //          The MD5 hash of the BLOCK does not match the argument HASH.
710 //   503 Full
711 //          There was not enough space left in any Keep volume to store
712 //          the object.
713 //   500 Fail
714 //          The object could not be stored for some other reason (e.g.
715 //          all writes failed). The text of the error message should
716 //          provide as much detail as possible.
717 //
718 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
719         // Check that BLOCK's checksum matches HASH.
720         blockhash := fmt.Sprintf("%x", md5.Sum(block))
721         if blockhash != hash {
722                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
723                 return 0, RequestHashError
724         }
725
726         // If we already have this data, it's intact on disk, and we
727         // can update its timestamp, return success. If we have
728         // different data with the same hash, return failure.
729         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
730                 return n, err
731         } else if ctx.Err() != nil {
732                 return 0, ErrClientDisconnect
733         }
734
735         // Choose a Keep volume to write to.
736         // If this volume fails, try all of the volumes in order.
737         if vol := KeepVM.NextWritable(); vol != nil {
738                 if err := vol.Put(ctx, hash, block); err == nil {
739                         return vol.Replication(), nil // success!
740                 }
741                 if ctx.Err() != nil {
742                         return 0, ErrClientDisconnect
743                 }
744         }
745
746         writables := KeepVM.AllWritable()
747         if len(writables) == 0 {
748                 log.Print("No writable volumes.")
749                 return 0, FullError
750         }
751
752         allFull := true
753         for _, vol := range writables {
754                 err := vol.Put(ctx, hash, block)
755                 if ctx.Err() != nil {
756                         return 0, ErrClientDisconnect
757                 }
758                 if err == nil {
759                         return vol.Replication(), nil // success!
760                 }
761                 if err != FullError {
762                         // The volume is not full but the
763                         // write did not succeed.  Report the
764                         // error and continue trying.
765                         allFull = false
766                         log.Printf("%s: Write(%s): %s", vol, hash, err)
767                 }
768         }
769
770         if allFull {
771                 log.Print("All volumes are full.")
772                 return 0, FullError
773         }
774         // Already logged the non-full errors.
775         return 0, GenericError
776 }
777
778 // CompareAndTouch returns the current replication level if one of the
779 // volumes already has the given content and it successfully updates
780 // the relevant block's modification time in order to protect it from
781 // premature garbage collection. Otherwise, it returns a non-nil
782 // error.
783 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
784         var bestErr error = NotFoundError
785         for _, vol := range KeepVM.AllWritable() {
786                 err := vol.Compare(ctx, hash, buf)
787                 if ctx.Err() != nil {
788                         return 0, ctx.Err()
789                 } else if err == CollisionError {
790                         // Stop if we have a block with same hash but
791                         // different content. (It will be impossible
792                         // to tell which one is wanted if we have
793                         // both, so there's no point writing it even
794                         // on a different volume.)
795                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
796                         return 0, err
797                 } else if os.IsNotExist(err) {
798                         // Block does not exist. This is the only
799                         // "normal" error: we don't log anything.
800                         continue
801                 } else if err != nil {
802                         // Couldn't open file, data is corrupt on
803                         // disk, etc.: log this abnormal condition,
804                         // and try the next volume.
805                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
806                         continue
807                 }
808                 if err := vol.Touch(hash); err != nil {
809                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
810                         bestErr = err
811                         continue
812                 }
813                 // Compare and Touch both worked --> done.
814                 return vol.Replication(), nil
815         }
816         return 0, bestErr
817 }
818
819 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
820
821 // IsValidLocator returns true if the specified string is a valid Keep locator.
822 //   When Keep is extended to support hash types other than MD5,
823 //   this should be updated to cover those as well.
824 //
825 func IsValidLocator(loc string) bool {
826         return validLocatorRe.MatchString(loc)
827 }
828
829 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
830
831 // GetAPIToken returns the OAuth2 token from the Authorization
832 // header of a HTTP request, or an empty string if no matching
833 // token is found.
834 func GetAPIToken(req *http.Request) string {
835         if auth, ok := req.Header["Authorization"]; ok {
836                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
837                         return match[1]
838                 }
839         }
840         return ""
841 }
842
843 // IsExpired returns true if the given Unix timestamp (expressed as a
844 // hexadecimal string) is in the past, or if timestampHex cannot be
845 // parsed as a hexadecimal string.
846 func IsExpired(timestampHex string) bool {
847         ts, err := strconv.ParseInt(timestampHex, 16, 0)
848         if err != nil {
849                 log.Printf("IsExpired: %s", err)
850                 return true
851         }
852         return time.Unix(ts, 0).Before(time.Now())
853 }
854
855 // CanDelete returns true if the user identified by apiToken is
856 // allowed to delete blocks.
857 func CanDelete(apiToken string) bool {
858         if apiToken == "" {
859                 return false
860         }
861         // Blocks may be deleted only when Keep has been configured with a
862         // data manager.
863         if IsSystemAuth(apiToken) {
864                 return true
865         }
866         // TODO(twp): look up apiToken with the API server
867         // return true if is_admin is true and if the token
868         // has unlimited scope
869         return false
870 }
871
872 // IsSystemAuth returns true if the given token is allowed to perform
873 // system level actions like deleting data.
874 func IsSystemAuth(token string) bool {
875         return token != "" && token == theConfig.systemAuthToken
876 }