Merge branch '8784-dir-listings'
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // REST handlers for Keep are implemented here.
8 //
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler    (GET /index, GET /index/prefix)
12 // StatusHandler   (GET /status.json)
13
14 import (
15         "container/list"
16         "context"
17         "crypto/md5"
18         "encoding/json"
19         "fmt"
20         "io"
21         "net/http"
22         "os"
23         "regexp"
24         "runtime"
25         "strconv"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gorilla/mux"
31
32         "git.curoverse.com/arvados.git/sdk/go/httpserver"
33         log "github.com/Sirupsen/logrus"
34 )
35
36 type router struct {
37         *mux.Router
38         limiter httpserver.RequestCounter
39 }
40
41 // MakeRESTRouter returns a new router that forwards all Keep requests
42 // to the appropriate handlers.
43 func MakeRESTRouter() *router {
44         rest := mux.NewRouter()
45         rtr := &router{Router: rest}
46
47         rest.HandleFunc(
48                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
49         rest.HandleFunc(
50                 `/{hash:[0-9a-f]{32}}+{hints}`,
51                 GetBlockHandler).Methods("GET", "HEAD")
52
53         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
54         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
55         // List all blocks stored here. Privileged client only.
56         rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
57         // List blocks stored here whose hash has the given prefix.
58         // Privileged client only.
59         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
60
61         // Internals/debugging info (runtime.MemStats)
62         rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
63
64         // List volumes: path, device number, bytes used/avail.
65         rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
66
67         // List mounts: UUID, readonly, tier, device ID, ...
68         rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
69         rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
70         rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
71
72         // Replace the current pull queue.
73         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
74
75         // Replace the current trash queue.
76         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
77
78         // Untrash moves blocks from trash back into store
79         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
80
81         // Any request which does not match any of these routes gets
82         // 400 Bad Request.
83         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
84
85         return rtr
86 }
87
88 // BadRequestHandler is a HandleFunc to address bad requests.
89 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
90         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
91 }
92
93 // GetBlockHandler is a HandleFunc to address Get block requests.
94 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
95         ctx, cancel := contextForResponse(context.TODO(), resp)
96         defer cancel()
97
98         if theConfig.RequireSignatures {
99                 locator := req.URL.Path[1:] // strip leading slash
100                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
101                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
102                         return
103                 }
104         }
105
106         // TODO: Probe volumes to check whether the block _might_
107         // exist. Some volumes/types could support a quick existence
108         // check without causing other operations to suffer. If all
109         // volumes support that, and assure us the block definitely
110         // isn't here, we can return 404 now instead of waiting for a
111         // buffer.
112
113         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
114         if err != nil {
115                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
116                 return
117         }
118         defer bufs.Put(buf)
119
120         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
121         if err != nil {
122                 code := http.StatusInternalServerError
123                 if err, ok := err.(*KeepError); ok {
124                         code = err.HTTPCode
125                 }
126                 http.Error(resp, err.Error(), code)
127                 return
128         }
129
130         resp.Header().Set("Content-Length", strconv.Itoa(size))
131         resp.Header().Set("Content-Type", "application/octet-stream")
132         resp.Write(buf[:size])
133 }
134
135 // Return a new context that gets cancelled by resp's CloseNotifier.
136 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
137         ctx, cancel := context.WithCancel(parent)
138         if cn, ok := resp.(http.CloseNotifier); ok {
139                 go func(c <-chan bool) {
140                         select {
141                         case <-c:
142                                 theConfig.debugLogf("cancel context")
143                                 cancel()
144                         case <-ctx.Done():
145                         }
146                 }(cn.CloseNotify())
147         }
148         return ctx, cancel
149 }
150
151 // Get a buffer from the pool -- but give up and return a non-nil
152 // error if ctx ends before we get a buffer.
153 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
154         bufReady := make(chan []byte)
155         go func() {
156                 bufReady <- bufs.Get(bufSize)
157         }()
158         select {
159         case buf := <-bufReady:
160                 return buf, nil
161         case <-ctx.Done():
162                 go func() {
163                         // Even if closeNotifier happened first, we
164                         // need to keep waiting for our buf so we can
165                         // return it to the pool.
166                         bufs.Put(<-bufReady)
167                 }()
168                 return nil, ErrClientDisconnect
169         }
170 }
171
172 // PutBlockHandler is a HandleFunc to address Put block requests.
173 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
174         ctx, cancel := contextForResponse(context.TODO(), resp)
175         defer cancel()
176
177         hash := mux.Vars(req)["hash"]
178
179         // Detect as many error conditions as possible before reading
180         // the body: avoid transmitting data that will not end up
181         // being written anyway.
182
183         if req.ContentLength == -1 {
184                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
185                 return
186         }
187
188         if req.ContentLength > BlockSize {
189                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
190                 return
191         }
192
193         if len(KeepVM.AllWritable()) == 0 {
194                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
195                 return
196         }
197
198         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
199         if err != nil {
200                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
201                 return
202         }
203
204         _, err = io.ReadFull(req.Body, buf)
205         if err != nil {
206                 http.Error(resp, err.Error(), 500)
207                 bufs.Put(buf)
208                 return
209         }
210
211         replication, err := PutBlock(ctx, buf, hash)
212         bufs.Put(buf)
213
214         if err != nil {
215                 code := http.StatusInternalServerError
216                 if err, ok := err.(*KeepError); ok {
217                         code = err.HTTPCode
218                 }
219                 http.Error(resp, err.Error(), code)
220                 return
221         }
222
223         // Success; add a size hint, sign the locator if possible, and
224         // return it to the client.
225         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
226         apiToken := GetAPIToken(req)
227         if theConfig.blobSigningKey != nil && apiToken != "" {
228                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
229                 returnHash = SignLocator(returnHash, apiToken, expiry)
230         }
231         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
232         resp.Write([]byte(returnHash + "\n"))
233 }
234
235 // IndexHandler responds to "/index", "/index/{prefix}", and
236 // "/mounts/{uuid}/blocks" requests.
237 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
238         if !IsSystemAuth(GetAPIToken(req)) {
239                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
240                 return
241         }
242
243         prefix := mux.Vars(req)["prefix"]
244         if prefix == "" {
245                 req.ParseForm()
246                 prefix = req.Form.Get("prefix")
247         }
248
249         uuid := mux.Vars(req)["uuid"]
250
251         var vols []Volume
252         if uuid == "" {
253                 vols = KeepVM.AllReadable()
254         } else if v := KeepVM.Lookup(uuid, false); v == nil {
255                 http.Error(resp, "mount not found", http.StatusNotFound)
256                 return
257         } else {
258                 vols = []Volume{v}
259         }
260
261         for _, v := range vols {
262                 if err := v.IndexTo(prefix, resp); err != nil {
263                         // The only errors returned by IndexTo are
264                         // write errors returned by resp.Write(),
265                         // which probably means the client has
266                         // disconnected and this error will never be
267                         // reported to the client -- but it will
268                         // appear in our own error log.
269                         http.Error(resp, err.Error(), http.StatusInternalServerError)
270                         return
271                 }
272         }
273         // An empty line at EOF is the only way the client can be
274         // assured the entire index was received.
275         resp.Write([]byte{'\n'})
276 }
277
278 // MountsHandler responds to "GET /mounts" requests.
279 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
280         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
281         if err != nil {
282                 http.Error(resp, err.Error(), http.StatusInternalServerError)
283         }
284 }
285
286 // PoolStatus struct
287 type PoolStatus struct {
288         Alloc uint64 `json:"BytesAllocated"`
289         Cap   int    `json:"BuffersMax"`
290         Len   int    `json:"BuffersInUse"`
291 }
292
293 type volumeStatusEnt struct {
294         Label         string
295         Status        *VolumeStatus `json:",omitempty"`
296         VolumeStats   *ioStats      `json:",omitempty"`
297         InternalStats interface{}   `json:",omitempty"`
298 }
299
300 // NodeStatus struct
301 type NodeStatus struct {
302         Volumes         []*volumeStatusEnt
303         BufferPool      PoolStatus
304         PullQueue       WorkQueueStatus
305         TrashQueue      WorkQueueStatus
306         RequestsCurrent int
307         RequestsMax     int
308 }
309
310 var st NodeStatus
311 var stLock sync.Mutex
312
313 // DebugHandler addresses /debug.json requests.
314 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
315         type debugStats struct {
316                 MemStats runtime.MemStats
317         }
318         var ds debugStats
319         runtime.ReadMemStats(&ds.MemStats)
320         err := json.NewEncoder(resp).Encode(&ds)
321         if err != nil {
322                 http.Error(resp, err.Error(), 500)
323         }
324 }
325
326 // StatusHandler addresses /status.json requests.
327 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
328         stLock.Lock()
329         rtr.readNodeStatus(&st)
330         jstat, err := json.Marshal(&st)
331         stLock.Unlock()
332         if err == nil {
333                 resp.Write(jstat)
334         } else {
335                 log.Printf("json.Marshal: %s", err)
336                 log.Printf("NodeStatus = %v", &st)
337                 http.Error(resp, err.Error(), 500)
338         }
339 }
340
341 // populate the given NodeStatus struct with current values.
342 func (rtr *router) readNodeStatus(st *NodeStatus) {
343         vols := KeepVM.AllReadable()
344         if cap(st.Volumes) < len(vols) {
345                 st.Volumes = make([]*volumeStatusEnt, len(vols))
346         }
347         st.Volumes = st.Volumes[:0]
348         for _, vol := range vols {
349                 var internalStats interface{}
350                 if vol, ok := vol.(InternalStatser); ok {
351                         internalStats = vol.InternalStats()
352                 }
353                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
354                         Label:         vol.String(),
355                         Status:        vol.Status(),
356                         InternalStats: internalStats,
357                         //VolumeStats: KeepVM.VolumeStats(vol),
358                 })
359         }
360         st.BufferPool.Alloc = bufs.Alloc()
361         st.BufferPool.Cap = bufs.Cap()
362         st.BufferPool.Len = bufs.Len()
363         st.PullQueue = getWorkQueueStatus(pullq)
364         st.TrashQueue = getWorkQueueStatus(trashq)
365         if rtr.limiter != nil {
366                 st.RequestsCurrent = rtr.limiter.Current()
367                 st.RequestsMax = rtr.limiter.Max()
368         }
369 }
370
371 // return a WorkQueueStatus for the given queue. If q is nil (which
372 // should never happen except in test suites), return a zero status
373 // value instead of crashing.
374 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
375         if q == nil {
376                 // This should only happen during tests.
377                 return WorkQueueStatus{}
378         }
379         return q.Status()
380 }
381
382 // DeleteHandler processes DELETE requests.
383 //
384 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
385 // from all connected volumes.
386 //
387 // Only the Data Manager, or an Arvados admin with scope "all", are
388 // allowed to issue DELETE requests.  If a DELETE request is not
389 // authenticated or is issued by a non-admin user, the server returns
390 // a PermissionError.
391 //
392 // Upon receiving a valid request from an authorized user,
393 // DeleteHandler deletes all copies of the specified block on local
394 // writable volumes.
395 //
396 // Response format:
397 //
398 // If the requested blocks was not found on any volume, the response
399 // code is HTTP 404 Not Found.
400 //
401 // Otherwise, the response code is 200 OK, with a response body
402 // consisting of the JSON message
403 //
404 //    {"copies_deleted":d,"copies_failed":f}
405 //
406 // where d and f are integers representing the number of blocks that
407 // were successfully and unsuccessfully deleted.
408 //
409 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
410         hash := mux.Vars(req)["hash"]
411
412         // Confirm that this user is an admin and has a token with unlimited scope.
413         var tok = GetAPIToken(req)
414         if tok == "" || !CanDelete(tok) {
415                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
416                 return
417         }
418
419         if !theConfig.EnableDelete {
420                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
421                 return
422         }
423
424         // Delete copies of this block from all available volumes.
425         // Report how many blocks were successfully deleted, and how
426         // many were found on writable volumes but not deleted.
427         var result struct {
428                 Deleted int `json:"copies_deleted"`
429                 Failed  int `json:"copies_failed"`
430         }
431         for _, vol := range KeepVM.AllWritable() {
432                 if err := vol.Trash(hash); err == nil {
433                         result.Deleted++
434                 } else if os.IsNotExist(err) {
435                         continue
436                 } else {
437                         result.Failed++
438                         log.Println("DeleteHandler:", err)
439                 }
440         }
441
442         var st int
443
444         if result.Deleted == 0 && result.Failed == 0 {
445                 st = http.StatusNotFound
446         } else {
447                 st = http.StatusOK
448         }
449
450         resp.WriteHeader(st)
451
452         if st == http.StatusOK {
453                 if body, err := json.Marshal(result); err == nil {
454                         resp.Write(body)
455                 } else {
456                         log.Printf("json.Marshal: %s (result = %v)", err, result)
457                         http.Error(resp, err.Error(), 500)
458                 }
459         }
460 }
461
462 /* PullHandler processes "PUT /pull" requests for the data manager.
463    The request body is a JSON message containing a list of pull
464    requests in the following format:
465
466    [
467       {
468          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
469          "servers":[
470                         "keep0.qr1hi.arvadosapi.com:25107",
471                         "keep1.qr1hi.arvadosapi.com:25108"
472                  ]
473           },
474           {
475                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
476                  "servers":[
477                         "10.0.1.5:25107",
478                         "10.0.1.6:25107",
479                         "10.0.1.7:25108"
480                  ]
481           },
482           ...
483    ]
484
485    Each pull request in the list consists of a block locator string
486    and an ordered list of servers.  Keepstore should try to fetch the
487    block from each server in turn.
488
489    If the request has not been sent by the Data Manager, return 401
490    Unauthorized.
491
492    If the JSON unmarshalling fails, return 400 Bad Request.
493 */
494
495 // PullRequest consists of a block locator and an ordered list of servers
496 type PullRequest struct {
497         Locator string   `json:"locator"`
498         Servers []string `json:"servers"`
499
500         // Destination mount, or "" for "anywhere"
501         MountUUID string
502 }
503
504 // PullHandler processes "PUT /pull" requests for the data manager.
505 func PullHandler(resp http.ResponseWriter, req *http.Request) {
506         // Reject unauthorized requests.
507         if !IsSystemAuth(GetAPIToken(req)) {
508                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
509                 return
510         }
511
512         // Parse the request body.
513         var pr []PullRequest
514         r := json.NewDecoder(req.Body)
515         if err := r.Decode(&pr); err != nil {
516                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
517                 return
518         }
519
520         // We have a properly formatted pull list sent from the data
521         // manager.  Report success and send the list to the pull list
522         // manager for further handling.
523         resp.WriteHeader(http.StatusOK)
524         resp.Write([]byte(
525                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
526
527         plist := list.New()
528         for _, p := range pr {
529                 plist.PushBack(p)
530         }
531         pullq.ReplaceQueue(plist)
532 }
533
534 // TrashRequest consists of a block locator and it's Mtime
535 type TrashRequest struct {
536         Locator    string `json:"locator"`
537         BlockMtime int64  `json:"block_mtime"`
538
539         // Target mount, or "" for "everywhere"
540         MountUUID string
541 }
542
543 // TrashHandler processes /trash requests.
544 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
545         // Reject unauthorized requests.
546         if !IsSystemAuth(GetAPIToken(req)) {
547                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
548                 return
549         }
550
551         // Parse the request body.
552         var trash []TrashRequest
553         r := json.NewDecoder(req.Body)
554         if err := r.Decode(&trash); err != nil {
555                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
556                 return
557         }
558
559         // We have a properly formatted trash list sent from the data
560         // manager.  Report success and send the list to the trash work
561         // queue for further handling.
562         resp.WriteHeader(http.StatusOK)
563         resp.Write([]byte(
564                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
565
566         tlist := list.New()
567         for _, t := range trash {
568                 tlist.PushBack(t)
569         }
570         trashq.ReplaceQueue(tlist)
571 }
572
573 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
574 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
575         // Reject unauthorized requests.
576         if !IsSystemAuth(GetAPIToken(req)) {
577                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
578                 return
579         }
580
581         hash := mux.Vars(req)["hash"]
582
583         if len(KeepVM.AllWritable()) == 0 {
584                 http.Error(resp, "No writable volumes", http.StatusNotFound)
585                 return
586         }
587
588         var untrashedOn, failedOn []string
589         var numNotFound int
590         for _, vol := range KeepVM.AllWritable() {
591                 err := vol.Untrash(hash)
592
593                 if os.IsNotExist(err) {
594                         numNotFound++
595                 } else if err != nil {
596                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
597                         failedOn = append(failedOn, vol.String())
598                 } else {
599                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
600                         untrashedOn = append(untrashedOn, vol.String())
601                 }
602         }
603
604         if numNotFound == len(KeepVM.AllWritable()) {
605                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
606                 return
607         }
608
609         if len(failedOn) == len(KeepVM.AllWritable()) {
610                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
611         } else {
612                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
613                 if len(failedOn) > 0 {
614                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
615                 }
616                 resp.Write([]byte(respBody))
617         }
618 }
619
620 // GetBlock and PutBlock implement lower-level code for handling
621 // blocks by rooting through volumes connected to the local machine.
622 // Once the handler has determined that system policy permits the
623 // request, it calls these methods to perform the actual operation.
624 //
625 // TODO(twp): this code would probably be better located in the
626 // VolumeManager interface. As an abstraction, the VolumeManager
627 // should be the only part of the code that cares about which volume a
628 // block is stored on, so it should be responsible for figuring out
629 // which volume to check for fetching blocks, storing blocks, etc.
630
631 // GetBlock fetches the block identified by "hash" into the provided
632 // buf, and returns the data size.
633 //
634 // If the block cannot be found on any volume, returns NotFoundError.
635 //
636 // If the block found does not have the correct MD5 hash, returns
637 // DiskHashError.
638 //
639 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
640         // Attempt to read the requested hash from a keep volume.
641         errorToCaller := NotFoundError
642
643         for _, vol := range KeepVM.AllReadable() {
644                 size, err := vol.Get(ctx, hash, buf)
645                 select {
646                 case <-ctx.Done():
647                         return 0, ErrClientDisconnect
648                 default:
649                 }
650                 if err != nil {
651                         // IsNotExist is an expected error and may be
652                         // ignored. All other errors are logged. In
653                         // any case we continue trying to read other
654                         // volumes. If all volumes report IsNotExist,
655                         // we return a NotFoundError.
656                         if !os.IsNotExist(err) {
657                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
658                         }
659                         continue
660                 }
661                 // Check the file checksum.
662                 //
663                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
664                 if filehash != hash {
665                         // TODO: Try harder to tell a sysadmin about
666                         // this.
667                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
668                                 vol, hash, filehash)
669                         errorToCaller = DiskHashError
670                         continue
671                 }
672                 if errorToCaller == DiskHashError {
673                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
674                                 vol, hash)
675                 }
676                 return size, nil
677         }
678         return 0, errorToCaller
679 }
680
681 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
682 //
683 // PutBlock(ctx, block, hash)
684 //   Stores the BLOCK (identified by the content id HASH) in Keep.
685 //
686 //   The MD5 checksum of the block must be identical to the content id HASH.
687 //   If not, an error is returned.
688 //
689 //   PutBlock stores the BLOCK on the first Keep volume with free space.
690 //   A failure code is returned to the user only if all volumes fail.
691 //
692 //   On success, PutBlock returns nil.
693 //   On failure, it returns a KeepError with one of the following codes:
694 //
695 //   500 Collision
696 //          A different block with the same hash already exists on this
697 //          Keep server.
698 //   422 MD5Fail
699 //          The MD5 hash of the BLOCK does not match the argument HASH.
700 //   503 Full
701 //          There was not enough space left in any Keep volume to store
702 //          the object.
703 //   500 Fail
704 //          The object could not be stored for some other reason (e.g.
705 //          all writes failed). The text of the error message should
706 //          provide as much detail as possible.
707 //
708 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
709         // Check that BLOCK's checksum matches HASH.
710         blockhash := fmt.Sprintf("%x", md5.Sum(block))
711         if blockhash != hash {
712                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
713                 return 0, RequestHashError
714         }
715
716         // If we already have this data, it's intact on disk, and we
717         // can update its timestamp, return success. If we have
718         // different data with the same hash, return failure.
719         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
720                 return n, err
721         } else if ctx.Err() != nil {
722                 return 0, ErrClientDisconnect
723         }
724
725         // Choose a Keep volume to write to.
726         // If this volume fails, try all of the volumes in order.
727         if vol := KeepVM.NextWritable(); vol != nil {
728                 if err := vol.Put(ctx, hash, block); err == nil {
729                         return vol.Replication(), nil // success!
730                 }
731                 if ctx.Err() != nil {
732                         return 0, ErrClientDisconnect
733                 }
734         }
735
736         writables := KeepVM.AllWritable()
737         if len(writables) == 0 {
738                 log.Print("No writable volumes.")
739                 return 0, FullError
740         }
741
742         allFull := true
743         for _, vol := range writables {
744                 err := vol.Put(ctx, hash, block)
745                 if ctx.Err() != nil {
746                         return 0, ErrClientDisconnect
747                 }
748                 if err == nil {
749                         return vol.Replication(), nil // success!
750                 }
751                 if err != FullError {
752                         // The volume is not full but the
753                         // write did not succeed.  Report the
754                         // error and continue trying.
755                         allFull = false
756                         log.Printf("%s: Write(%s): %s", vol, hash, err)
757                 }
758         }
759
760         if allFull {
761                 log.Print("All volumes are full.")
762                 return 0, FullError
763         }
764         // Already logged the non-full errors.
765         return 0, GenericError
766 }
767
768 // CompareAndTouch returns the current replication level if one of the
769 // volumes already has the given content and it successfully updates
770 // the relevant block's modification time in order to protect it from
771 // premature garbage collection. Otherwise, it returns a non-nil
772 // error.
773 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
774         var bestErr error = NotFoundError
775         for _, vol := range KeepVM.AllWritable() {
776                 err := vol.Compare(ctx, hash, buf)
777                 if ctx.Err() != nil {
778                         return 0, ctx.Err()
779                 } else if err == CollisionError {
780                         // Stop if we have a block with same hash but
781                         // different content. (It will be impossible
782                         // to tell which one is wanted if we have
783                         // both, so there's no point writing it even
784                         // on a different volume.)
785                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
786                         return 0, err
787                 } else if os.IsNotExist(err) {
788                         // Block does not exist. This is the only
789                         // "normal" error: we don't log anything.
790                         continue
791                 } else if err != nil {
792                         // Couldn't open file, data is corrupt on
793                         // disk, etc.: log this abnormal condition,
794                         // and try the next volume.
795                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
796                         continue
797                 }
798                 if err := vol.Touch(hash); err != nil {
799                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
800                         bestErr = err
801                         continue
802                 }
803                 // Compare and Touch both worked --> done.
804                 return vol.Replication(), nil
805         }
806         return 0, bestErr
807 }
808
809 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
810
811 // IsValidLocator returns true if the specified string is a valid Keep locator.
812 //   When Keep is extended to support hash types other than MD5,
813 //   this should be updated to cover those as well.
814 //
815 func IsValidLocator(loc string) bool {
816         return validLocatorRe.MatchString(loc)
817 }
818
819 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
820
821 // GetAPIToken returns the OAuth2 token from the Authorization
822 // header of a HTTP request, or an empty string if no matching
823 // token is found.
824 func GetAPIToken(req *http.Request) string {
825         if auth, ok := req.Header["Authorization"]; ok {
826                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
827                         return match[1]
828                 }
829         }
830         return ""
831 }
832
833 // IsExpired returns true if the given Unix timestamp (expressed as a
834 // hexadecimal string) is in the past, or if timestampHex cannot be
835 // parsed as a hexadecimal string.
836 func IsExpired(timestampHex string) bool {
837         ts, err := strconv.ParseInt(timestampHex, 16, 0)
838         if err != nil {
839                 log.Printf("IsExpired: %s", err)
840                 return true
841         }
842         return time.Unix(ts, 0).Before(time.Now())
843 }
844
845 // CanDelete returns true if the user identified by apiToken is
846 // allowed to delete blocks.
847 func CanDelete(apiToken string) bool {
848         if apiToken == "" {
849                 return false
850         }
851         // Blocks may be deleted only when Keep has been configured with a
852         // data manager.
853         if IsSystemAuth(apiToken) {
854                 return true
855         }
856         // TODO(twp): look up apiToken with the API server
857         // return true if is_admin is true and if the token
858         // has unlimited scope
859         return false
860 }
861
862 // IsSystemAuth returns true if the given token is allowed to perform
863 // system level actions like deleting data.
864 func IsSystemAuth(token string) bool {
865         return token != "" && token == theConfig.systemAuthToken
866 }