11158: Handle trash/untrash events for collections in projects.
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // REST handlers for Keep are implemented here.
8 //
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler    (GET /index, GET /index/prefix)
12 // StatusHandler   (GET /status.json)
13
14 import (
15         "container/list"
16         "context"
17         "crypto/md5"
18         "encoding/json"
19         "fmt"
20         "io"
21         "net/http"
22         "os"
23         "regexp"
24         "runtime"
25         "strconv"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gorilla/mux"
31
32         "git.curoverse.com/arvados.git/sdk/go/health"
33         "git.curoverse.com/arvados.git/sdk/go/httpserver"
34         log "github.com/Sirupsen/logrus"
35 )
36
37 type router struct {
38         *mux.Router
39         limiter httpserver.RequestCounter
40 }
41
42 // MakeRESTRouter returns a new router that forwards all Keep requests
43 // to the appropriate handlers.
44 func MakeRESTRouter() *router {
45         rest := mux.NewRouter()
46         rtr := &router{Router: rest}
47
48         rest.HandleFunc(
49                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
50         rest.HandleFunc(
51                 `/{hash:[0-9a-f]{32}}+{hints}`,
52                 GetBlockHandler).Methods("GET", "HEAD")
53
54         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
55         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
56         // List all blocks stored here. Privileged client only.
57         rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
58         // List blocks stored here whose hash has the given prefix.
59         // Privileged client only.
60         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
61
62         // Internals/debugging info (runtime.MemStats)
63         rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
64
65         // List volumes: path, device number, bytes used/avail.
66         rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
67
68         // List mounts: UUID, readonly, tier, device ID, ...
69         rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
70         rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
71         rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
72
73         // Replace the current pull queue.
74         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
75
76         // Replace the current trash queue.
77         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
78
79         // Untrash moves blocks from trash back into store
80         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
81
82         rest.Handle("/_health/{check}", &health.Handler{
83                 Token:  theConfig.ManagementToken,
84                 Prefix: "/_health/",
85         }).Methods("GET")
86
87         // Any request which does not match any of these routes gets
88         // 400 Bad Request.
89         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
90
91         return rtr
92 }
93
94 // BadRequestHandler is a HandleFunc to address bad requests.
95 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
96         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
97 }
98
99 // GetBlockHandler is a HandleFunc to address Get block requests.
100 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
101         ctx, cancel := contextForResponse(context.TODO(), resp)
102         defer cancel()
103
104         if theConfig.RequireSignatures {
105                 locator := req.URL.Path[1:] // strip leading slash
106                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
107                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
108                         return
109                 }
110         }
111
112         // TODO: Probe volumes to check whether the block _might_
113         // exist. Some volumes/types could support a quick existence
114         // check without causing other operations to suffer. If all
115         // volumes support that, and assure us the block definitely
116         // isn't here, we can return 404 now instead of waiting for a
117         // buffer.
118
119         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
120         if err != nil {
121                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
122                 return
123         }
124         defer bufs.Put(buf)
125
126         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
127         if err != nil {
128                 code := http.StatusInternalServerError
129                 if err, ok := err.(*KeepError); ok {
130                         code = err.HTTPCode
131                 }
132                 http.Error(resp, err.Error(), code)
133                 return
134         }
135
136         resp.Header().Set("Content-Length", strconv.Itoa(size))
137         resp.Header().Set("Content-Type", "application/octet-stream")
138         resp.Write(buf[:size])
139 }
140
141 // Return a new context that gets cancelled by resp's CloseNotifier.
142 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
143         ctx, cancel := context.WithCancel(parent)
144         if cn, ok := resp.(http.CloseNotifier); ok {
145                 go func(c <-chan bool) {
146                         select {
147                         case <-c:
148                                 theConfig.debugLogf("cancel context")
149                                 cancel()
150                         case <-ctx.Done():
151                         }
152                 }(cn.CloseNotify())
153         }
154         return ctx, cancel
155 }
156
157 // Get a buffer from the pool -- but give up and return a non-nil
158 // error if ctx ends before we get a buffer.
159 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
160         bufReady := make(chan []byte)
161         go func() {
162                 bufReady <- bufs.Get(bufSize)
163         }()
164         select {
165         case buf := <-bufReady:
166                 return buf, nil
167         case <-ctx.Done():
168                 go func() {
169                         // Even if closeNotifier happened first, we
170                         // need to keep waiting for our buf so we can
171                         // return it to the pool.
172                         bufs.Put(<-bufReady)
173                 }()
174                 return nil, ErrClientDisconnect
175         }
176 }
177
178 // PutBlockHandler is a HandleFunc to address Put block requests.
179 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
180         ctx, cancel := contextForResponse(context.TODO(), resp)
181         defer cancel()
182
183         hash := mux.Vars(req)["hash"]
184
185         // Detect as many error conditions as possible before reading
186         // the body: avoid transmitting data that will not end up
187         // being written anyway.
188
189         if req.ContentLength == -1 {
190                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
191                 return
192         }
193
194         if req.ContentLength > BlockSize {
195                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
196                 return
197         }
198
199         if len(KeepVM.AllWritable()) == 0 {
200                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
201                 return
202         }
203
204         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
205         if err != nil {
206                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
207                 return
208         }
209
210         _, err = io.ReadFull(req.Body, buf)
211         if err != nil {
212                 http.Error(resp, err.Error(), 500)
213                 bufs.Put(buf)
214                 return
215         }
216
217         replication, err := PutBlock(ctx, buf, hash)
218         bufs.Put(buf)
219
220         if err != nil {
221                 code := http.StatusInternalServerError
222                 if err, ok := err.(*KeepError); ok {
223                         code = err.HTTPCode
224                 }
225                 http.Error(resp, err.Error(), code)
226                 return
227         }
228
229         // Success; add a size hint, sign the locator if possible, and
230         // return it to the client.
231         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
232         apiToken := GetAPIToken(req)
233         if theConfig.blobSigningKey != nil && apiToken != "" {
234                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
235                 returnHash = SignLocator(returnHash, apiToken, expiry)
236         }
237         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
238         resp.Write([]byte(returnHash + "\n"))
239 }
240
241 // IndexHandler responds to "/index", "/index/{prefix}", and
242 // "/mounts/{uuid}/blocks" requests.
243 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
244         if !IsSystemAuth(GetAPIToken(req)) {
245                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
246                 return
247         }
248
249         prefix := mux.Vars(req)["prefix"]
250         if prefix == "" {
251                 req.ParseForm()
252                 prefix = req.Form.Get("prefix")
253         }
254
255         uuid := mux.Vars(req)["uuid"]
256
257         var vols []Volume
258         if uuid == "" {
259                 vols = KeepVM.AllReadable()
260         } else if v := KeepVM.Lookup(uuid, false); v == nil {
261                 http.Error(resp, "mount not found", http.StatusNotFound)
262                 return
263         } else {
264                 vols = []Volume{v}
265         }
266
267         for _, v := range vols {
268                 if err := v.IndexTo(prefix, resp); err != nil {
269                         // The only errors returned by IndexTo are
270                         // write errors returned by resp.Write(),
271                         // which probably means the client has
272                         // disconnected and this error will never be
273                         // reported to the client -- but it will
274                         // appear in our own error log.
275                         http.Error(resp, err.Error(), http.StatusInternalServerError)
276                         return
277                 }
278         }
279         // An empty line at EOF is the only way the client can be
280         // assured the entire index was received.
281         resp.Write([]byte{'\n'})
282 }
283
284 // MountsHandler responds to "GET /mounts" requests.
285 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
286         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
287         if err != nil {
288                 http.Error(resp, err.Error(), http.StatusInternalServerError)
289         }
290 }
291
292 // PoolStatus struct
293 type PoolStatus struct {
294         Alloc uint64 `json:"BytesAllocated"`
295         Cap   int    `json:"BuffersMax"`
296         Len   int    `json:"BuffersInUse"`
297 }
298
299 type volumeStatusEnt struct {
300         Label         string
301         Status        *VolumeStatus `json:",omitempty"`
302         VolumeStats   *ioStats      `json:",omitempty"`
303         InternalStats interface{}   `json:",omitempty"`
304 }
305
306 // NodeStatus struct
307 type NodeStatus struct {
308         Volumes         []*volumeStatusEnt
309         BufferPool      PoolStatus
310         PullQueue       WorkQueueStatus
311         TrashQueue      WorkQueueStatus
312         RequestsCurrent int
313         RequestsMax     int
314 }
315
316 var st NodeStatus
317 var stLock sync.Mutex
318
319 // DebugHandler addresses /debug.json requests.
320 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
321         type debugStats struct {
322                 MemStats runtime.MemStats
323         }
324         var ds debugStats
325         runtime.ReadMemStats(&ds.MemStats)
326         err := json.NewEncoder(resp).Encode(&ds)
327         if err != nil {
328                 http.Error(resp, err.Error(), 500)
329         }
330 }
331
332 // StatusHandler addresses /status.json requests.
333 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
334         stLock.Lock()
335         rtr.readNodeStatus(&st)
336         jstat, err := json.Marshal(&st)
337         stLock.Unlock()
338         if err == nil {
339                 resp.Write(jstat)
340         } else {
341                 log.Printf("json.Marshal: %s", err)
342                 log.Printf("NodeStatus = %v", &st)
343                 http.Error(resp, err.Error(), 500)
344         }
345 }
346
347 // populate the given NodeStatus struct with current values.
348 func (rtr *router) readNodeStatus(st *NodeStatus) {
349         vols := KeepVM.AllReadable()
350         if cap(st.Volumes) < len(vols) {
351                 st.Volumes = make([]*volumeStatusEnt, len(vols))
352         }
353         st.Volumes = st.Volumes[:0]
354         for _, vol := range vols {
355                 var internalStats interface{}
356                 if vol, ok := vol.(InternalStatser); ok {
357                         internalStats = vol.InternalStats()
358                 }
359                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
360                         Label:         vol.String(),
361                         Status:        vol.Status(),
362                         InternalStats: internalStats,
363                         //VolumeStats: KeepVM.VolumeStats(vol),
364                 })
365         }
366         st.BufferPool.Alloc = bufs.Alloc()
367         st.BufferPool.Cap = bufs.Cap()
368         st.BufferPool.Len = bufs.Len()
369         st.PullQueue = getWorkQueueStatus(pullq)
370         st.TrashQueue = getWorkQueueStatus(trashq)
371         if rtr.limiter != nil {
372                 st.RequestsCurrent = rtr.limiter.Current()
373                 st.RequestsMax = rtr.limiter.Max()
374         }
375 }
376
377 // return a WorkQueueStatus for the given queue. If q is nil (which
378 // should never happen except in test suites), return a zero status
379 // value instead of crashing.
380 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
381         if q == nil {
382                 // This should only happen during tests.
383                 return WorkQueueStatus{}
384         }
385         return q.Status()
386 }
387
388 // DeleteHandler processes DELETE requests.
389 //
390 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
391 // from all connected volumes.
392 //
393 // Only the Data Manager, or an Arvados admin with scope "all", are
394 // allowed to issue DELETE requests.  If a DELETE request is not
395 // authenticated or is issued by a non-admin user, the server returns
396 // a PermissionError.
397 //
398 // Upon receiving a valid request from an authorized user,
399 // DeleteHandler deletes all copies of the specified block on local
400 // writable volumes.
401 //
402 // Response format:
403 //
404 // If the requested blocks was not found on any volume, the response
405 // code is HTTP 404 Not Found.
406 //
407 // Otherwise, the response code is 200 OK, with a response body
408 // consisting of the JSON message
409 //
410 //    {"copies_deleted":d,"copies_failed":f}
411 //
412 // where d and f are integers representing the number of blocks that
413 // were successfully and unsuccessfully deleted.
414 //
415 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
416         hash := mux.Vars(req)["hash"]
417
418         // Confirm that this user is an admin and has a token with unlimited scope.
419         var tok = GetAPIToken(req)
420         if tok == "" || !CanDelete(tok) {
421                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
422                 return
423         }
424
425         if !theConfig.EnableDelete {
426                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
427                 return
428         }
429
430         // Delete copies of this block from all available volumes.
431         // Report how many blocks were successfully deleted, and how
432         // many were found on writable volumes but not deleted.
433         var result struct {
434                 Deleted int `json:"copies_deleted"`
435                 Failed  int `json:"copies_failed"`
436         }
437         for _, vol := range KeepVM.AllWritable() {
438                 if err := vol.Trash(hash); err == nil {
439                         result.Deleted++
440                 } else if os.IsNotExist(err) {
441                         continue
442                 } else {
443                         result.Failed++
444                         log.Println("DeleteHandler:", err)
445                 }
446         }
447
448         var st int
449
450         if result.Deleted == 0 && result.Failed == 0 {
451                 st = http.StatusNotFound
452         } else {
453                 st = http.StatusOK
454         }
455
456         resp.WriteHeader(st)
457
458         if st == http.StatusOK {
459                 if body, err := json.Marshal(result); err == nil {
460                         resp.Write(body)
461                 } else {
462                         log.Printf("json.Marshal: %s (result = %v)", err, result)
463                         http.Error(resp, err.Error(), 500)
464                 }
465         }
466 }
467
468 /* PullHandler processes "PUT /pull" requests for the data manager.
469    The request body is a JSON message containing a list of pull
470    requests in the following format:
471
472    [
473       {
474          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
475          "servers":[
476                         "keep0.qr1hi.arvadosapi.com:25107",
477                         "keep1.qr1hi.arvadosapi.com:25108"
478                  ]
479           },
480           {
481                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
482                  "servers":[
483                         "10.0.1.5:25107",
484                         "10.0.1.6:25107",
485                         "10.0.1.7:25108"
486                  ]
487           },
488           ...
489    ]
490
491    Each pull request in the list consists of a block locator string
492    and an ordered list of servers.  Keepstore should try to fetch the
493    block from each server in turn.
494
495    If the request has not been sent by the Data Manager, return 401
496    Unauthorized.
497
498    If the JSON unmarshalling fails, return 400 Bad Request.
499 */
500
501 // PullRequest consists of a block locator and an ordered list of servers
502 type PullRequest struct {
503         Locator string   `json:"locator"`
504         Servers []string `json:"servers"`
505
506         // Destination mount, or "" for "anywhere"
507         MountUUID string
508 }
509
510 // PullHandler processes "PUT /pull" requests for the data manager.
511 func PullHandler(resp http.ResponseWriter, req *http.Request) {
512         // Reject unauthorized requests.
513         if !IsSystemAuth(GetAPIToken(req)) {
514                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
515                 return
516         }
517
518         // Parse the request body.
519         var pr []PullRequest
520         r := json.NewDecoder(req.Body)
521         if err := r.Decode(&pr); err != nil {
522                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
523                 return
524         }
525
526         // We have a properly formatted pull list sent from the data
527         // manager.  Report success and send the list to the pull list
528         // manager for further handling.
529         resp.WriteHeader(http.StatusOK)
530         resp.Write([]byte(
531                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
532
533         plist := list.New()
534         for _, p := range pr {
535                 plist.PushBack(p)
536         }
537         pullq.ReplaceQueue(plist)
538 }
539
540 // TrashRequest consists of a block locator and it's Mtime
541 type TrashRequest struct {
542         Locator    string `json:"locator"`
543         BlockMtime int64  `json:"block_mtime"`
544
545         // Target mount, or "" for "everywhere"
546         MountUUID string
547 }
548
549 // TrashHandler processes /trash requests.
550 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
551         // Reject unauthorized requests.
552         if !IsSystemAuth(GetAPIToken(req)) {
553                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
554                 return
555         }
556
557         // Parse the request body.
558         var trash []TrashRequest
559         r := json.NewDecoder(req.Body)
560         if err := r.Decode(&trash); err != nil {
561                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
562                 return
563         }
564
565         // We have a properly formatted trash list sent from the data
566         // manager.  Report success and send the list to the trash work
567         // queue for further handling.
568         resp.WriteHeader(http.StatusOK)
569         resp.Write([]byte(
570                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
571
572         tlist := list.New()
573         for _, t := range trash {
574                 tlist.PushBack(t)
575         }
576         trashq.ReplaceQueue(tlist)
577 }
578
579 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
580 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
581         // Reject unauthorized requests.
582         if !IsSystemAuth(GetAPIToken(req)) {
583                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
584                 return
585         }
586
587         hash := mux.Vars(req)["hash"]
588
589         if len(KeepVM.AllWritable()) == 0 {
590                 http.Error(resp, "No writable volumes", http.StatusNotFound)
591                 return
592         }
593
594         var untrashedOn, failedOn []string
595         var numNotFound int
596         for _, vol := range KeepVM.AllWritable() {
597                 err := vol.Untrash(hash)
598
599                 if os.IsNotExist(err) {
600                         numNotFound++
601                 } else if err != nil {
602                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
603                         failedOn = append(failedOn, vol.String())
604                 } else {
605                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
606                         untrashedOn = append(untrashedOn, vol.String())
607                 }
608         }
609
610         if numNotFound == len(KeepVM.AllWritable()) {
611                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
612                 return
613         }
614
615         if len(failedOn) == len(KeepVM.AllWritable()) {
616                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
617         } else {
618                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
619                 if len(failedOn) > 0 {
620                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
621                 }
622                 resp.Write([]byte(respBody))
623         }
624 }
625
626 // GetBlock and PutBlock implement lower-level code for handling
627 // blocks by rooting through volumes connected to the local machine.
628 // Once the handler has determined that system policy permits the
629 // request, it calls these methods to perform the actual operation.
630 //
631 // TODO(twp): this code would probably be better located in the
632 // VolumeManager interface. As an abstraction, the VolumeManager
633 // should be the only part of the code that cares about which volume a
634 // block is stored on, so it should be responsible for figuring out
635 // which volume to check for fetching blocks, storing blocks, etc.
636
637 // GetBlock fetches the block identified by "hash" into the provided
638 // buf, and returns the data size.
639 //
640 // If the block cannot be found on any volume, returns NotFoundError.
641 //
642 // If the block found does not have the correct MD5 hash, returns
643 // DiskHashError.
644 //
645 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
646         // Attempt to read the requested hash from a keep volume.
647         errorToCaller := NotFoundError
648
649         for _, vol := range KeepVM.AllReadable() {
650                 size, err := vol.Get(ctx, hash, buf)
651                 select {
652                 case <-ctx.Done():
653                         return 0, ErrClientDisconnect
654                 default:
655                 }
656                 if err != nil {
657                         // IsNotExist is an expected error and may be
658                         // ignored. All other errors are logged. In
659                         // any case we continue trying to read other
660                         // volumes. If all volumes report IsNotExist,
661                         // we return a NotFoundError.
662                         if !os.IsNotExist(err) {
663                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
664                         }
665                         continue
666                 }
667                 // Check the file checksum.
668                 //
669                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
670                 if filehash != hash {
671                         // TODO: Try harder to tell a sysadmin about
672                         // this.
673                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
674                                 vol, hash, filehash)
675                         errorToCaller = DiskHashError
676                         continue
677                 }
678                 if errorToCaller == DiskHashError {
679                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
680                                 vol, hash)
681                 }
682                 return size, nil
683         }
684         return 0, errorToCaller
685 }
686
687 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
688 //
689 // PutBlock(ctx, block, hash)
690 //   Stores the BLOCK (identified by the content id HASH) in Keep.
691 //
692 //   The MD5 checksum of the block must be identical to the content id HASH.
693 //   If not, an error is returned.
694 //
695 //   PutBlock stores the BLOCK on the first Keep volume with free space.
696 //   A failure code is returned to the user only if all volumes fail.
697 //
698 //   On success, PutBlock returns nil.
699 //   On failure, it returns a KeepError with one of the following codes:
700 //
701 //   500 Collision
702 //          A different block with the same hash already exists on this
703 //          Keep server.
704 //   422 MD5Fail
705 //          The MD5 hash of the BLOCK does not match the argument HASH.
706 //   503 Full
707 //          There was not enough space left in any Keep volume to store
708 //          the object.
709 //   500 Fail
710 //          The object could not be stored for some other reason (e.g.
711 //          all writes failed). The text of the error message should
712 //          provide as much detail as possible.
713 //
714 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
715         // Check that BLOCK's checksum matches HASH.
716         blockhash := fmt.Sprintf("%x", md5.Sum(block))
717         if blockhash != hash {
718                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
719                 return 0, RequestHashError
720         }
721
722         // If we already have this data, it's intact on disk, and we
723         // can update its timestamp, return success. If we have
724         // different data with the same hash, return failure.
725         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
726                 return n, err
727         } else if ctx.Err() != nil {
728                 return 0, ErrClientDisconnect
729         }
730
731         // Choose a Keep volume to write to.
732         // If this volume fails, try all of the volumes in order.
733         if vol := KeepVM.NextWritable(); vol != nil {
734                 if err := vol.Put(ctx, hash, block); err == nil {
735                         return vol.Replication(), nil // success!
736                 }
737                 if ctx.Err() != nil {
738                         return 0, ErrClientDisconnect
739                 }
740         }
741
742         writables := KeepVM.AllWritable()
743         if len(writables) == 0 {
744                 log.Print("No writable volumes.")
745                 return 0, FullError
746         }
747
748         allFull := true
749         for _, vol := range writables {
750                 err := vol.Put(ctx, hash, block)
751                 if ctx.Err() != nil {
752                         return 0, ErrClientDisconnect
753                 }
754                 if err == nil {
755                         return vol.Replication(), nil // success!
756                 }
757                 if err != FullError {
758                         // The volume is not full but the
759                         // write did not succeed.  Report the
760                         // error and continue trying.
761                         allFull = false
762                         log.Printf("%s: Write(%s): %s", vol, hash, err)
763                 }
764         }
765
766         if allFull {
767                 log.Print("All volumes are full.")
768                 return 0, FullError
769         }
770         // Already logged the non-full errors.
771         return 0, GenericError
772 }
773
774 // CompareAndTouch returns the current replication level if one of the
775 // volumes already has the given content and it successfully updates
776 // the relevant block's modification time in order to protect it from
777 // premature garbage collection. Otherwise, it returns a non-nil
778 // error.
779 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
780         var bestErr error = NotFoundError
781         for _, vol := range KeepVM.AllWritable() {
782                 err := vol.Compare(ctx, hash, buf)
783                 if ctx.Err() != nil {
784                         return 0, ctx.Err()
785                 } else if err == CollisionError {
786                         // Stop if we have a block with same hash but
787                         // different content. (It will be impossible
788                         // to tell which one is wanted if we have
789                         // both, so there's no point writing it even
790                         // on a different volume.)
791                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
792                         return 0, err
793                 } else if os.IsNotExist(err) {
794                         // Block does not exist. This is the only
795                         // "normal" error: we don't log anything.
796                         continue
797                 } else if err != nil {
798                         // Couldn't open file, data is corrupt on
799                         // disk, etc.: log this abnormal condition,
800                         // and try the next volume.
801                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
802                         continue
803                 }
804                 if err := vol.Touch(hash); err != nil {
805                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
806                         bestErr = err
807                         continue
808                 }
809                 // Compare and Touch both worked --> done.
810                 return vol.Replication(), nil
811         }
812         return 0, bestErr
813 }
814
815 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
816
817 // IsValidLocator returns true if the specified string is a valid Keep locator.
818 //   When Keep is extended to support hash types other than MD5,
819 //   this should be updated to cover those as well.
820 //
821 func IsValidLocator(loc string) bool {
822         return validLocatorRe.MatchString(loc)
823 }
824
825 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
826
827 // GetAPIToken returns the OAuth2 token from the Authorization
828 // header of a HTTP request, or an empty string if no matching
829 // token is found.
830 func GetAPIToken(req *http.Request) string {
831         if auth, ok := req.Header["Authorization"]; ok {
832                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
833                         return match[1]
834                 }
835         }
836         return ""
837 }
838
839 // IsExpired returns true if the given Unix timestamp (expressed as a
840 // hexadecimal string) is in the past, or if timestampHex cannot be
841 // parsed as a hexadecimal string.
842 func IsExpired(timestampHex string) bool {
843         ts, err := strconv.ParseInt(timestampHex, 16, 0)
844         if err != nil {
845                 log.Printf("IsExpired: %s", err)
846                 return true
847         }
848         return time.Unix(ts, 0).Before(time.Now())
849 }
850
851 // CanDelete returns true if the user identified by apiToken is
852 // allowed to delete blocks.
853 func CanDelete(apiToken string) bool {
854         if apiToken == "" {
855                 return false
856         }
857         // Blocks may be deleted only when Keep has been configured with a
858         // data manager.
859         if IsSystemAuth(apiToken) {
860                 return true
861         }
862         // TODO(twp): look up apiToken with the API server
863         // return true if is_admin is true and if the token
864         // has unlimited scope
865         return false
866 }
867
868 // IsSystemAuth returns true if the given token is allowed to perform
869 // system level actions like deleting data.
870 func IsSystemAuth(token string) bool {
871         return token != "" && token == theConfig.systemAuthToken
872 }