Merge branch 'master' into 14259-pysdk-remote-block-copy
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "github.com/gorilla/mux"
24
25         "git.curoverse.com/arvados.git/sdk/go/arvados"
26         "git.curoverse.com/arvados.git/sdk/go/health"
27         "git.curoverse.com/arvados.git/sdk/go/httpserver"
28 )
29
30 type router struct {
31         *mux.Router
32         limiter     httpserver.RequestCounter
33         cluster     *arvados.Cluster
34         remoteProxy remoteProxy
35 }
36
37 // MakeRESTRouter returns a new router that forwards all Keep requests
38 // to the appropriate handlers.
39 func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
40         rtr := &router{
41                 Router:  mux.NewRouter(),
42                 cluster: cluster,
43         }
44
45         rtr.HandleFunc(
46                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
47         rtr.HandleFunc(
48                 `/{hash:[0-9a-f]{32}}+{hints}`,
49                 rtr.handleGET).Methods("GET", "HEAD")
50
51         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
52         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
53         // List all blocks stored here. Privileged client only.
54         rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
55         // List blocks stored here whose hash has the given prefix.
56         // Privileged client only.
57         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
58
59         // Internals/debugging info (runtime.MemStats)
60         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
61
62         // List volumes: path, device number, bytes used/avail.
63         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
64
65         // List mounts: UUID, readonly, tier, device ID, ...
66         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
67         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
68         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
69
70         // Replace the current pull queue.
71         rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72
73         // Replace the current trash queue.
74         rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
75
76         // Untrash moves blocks from trash back into store
77         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
78
79         rtr.Handle("/_health/{check}", &health.Handler{
80                 Token:  theConfig.ManagementToken,
81                 Prefix: "/_health/",
82         }).Methods("GET")
83
84         // Any request which does not match any of these routes gets
85         // 400 Bad Request.
86         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
87
88         rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
89
90         instrumented := httpserver.Instrument(nil, nil,
91                 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
92         return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
93 }
94
95 // BadRequestHandler is a HandleFunc to address bad requests.
96 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
97         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
98 }
99
100 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
101         ctx, cancel := contextForResponse(context.TODO(), resp)
102         defer cancel()
103
104         locator := req.URL.Path[1:]
105         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
106                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
107                 return
108         }
109
110         if theConfig.RequireSignatures {
111                 locator := req.URL.Path[1:] // strip leading slash
112                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
113                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
114                         return
115                 }
116         }
117
118         // TODO: Probe volumes to check whether the block _might_
119         // exist. Some volumes/types could support a quick existence
120         // check without causing other operations to suffer. If all
121         // volumes support that, and assure us the block definitely
122         // isn't here, we can return 404 now instead of waiting for a
123         // buffer.
124
125         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
126         if err != nil {
127                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
128                 return
129         }
130         defer bufs.Put(buf)
131
132         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
133         if err != nil {
134                 code := http.StatusInternalServerError
135                 if err, ok := err.(*KeepError); ok {
136                         code = err.HTTPCode
137                 }
138                 http.Error(resp, err.Error(), code)
139                 return
140         }
141
142         resp.Header().Set("Content-Length", strconv.Itoa(size))
143         resp.Header().Set("Content-Type", "application/octet-stream")
144         resp.Write(buf[:size])
145 }
146
147 // Return a new context that gets cancelled by resp's CloseNotifier.
148 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
149         ctx, cancel := context.WithCancel(parent)
150         if cn, ok := resp.(http.CloseNotifier); ok {
151                 go func(c <-chan bool) {
152                         select {
153                         case <-c:
154                                 theConfig.debugLogf("cancel context")
155                                 cancel()
156                         case <-ctx.Done():
157                         }
158                 }(cn.CloseNotify())
159         }
160         return ctx, cancel
161 }
162
163 // Get a buffer from the pool -- but give up and return a non-nil
164 // error if ctx ends before we get a buffer.
165 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
166         bufReady := make(chan []byte)
167         go func() {
168                 bufReady <- bufs.Get(bufSize)
169         }()
170         select {
171         case buf := <-bufReady:
172                 return buf, nil
173         case <-ctx.Done():
174                 go func() {
175                         // Even if closeNotifier happened first, we
176                         // need to keep waiting for our buf so we can
177                         // return it to the pool.
178                         bufs.Put(<-bufReady)
179                 }()
180                 return nil, ErrClientDisconnect
181         }
182 }
183
184 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
185         ctx, cancel := contextForResponse(context.TODO(), resp)
186         defer cancel()
187
188         hash := mux.Vars(req)["hash"]
189
190         // Detect as many error conditions as possible before reading
191         // the body: avoid transmitting data that will not end up
192         // being written anyway.
193
194         if req.ContentLength == -1 {
195                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
196                 return
197         }
198
199         if req.ContentLength > BlockSize {
200                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
201                 return
202         }
203
204         if len(KeepVM.AllWritable()) == 0 {
205                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
206                 return
207         }
208
209         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
210         if err != nil {
211                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
212                 return
213         }
214
215         _, err = io.ReadFull(req.Body, buf)
216         if err != nil {
217                 http.Error(resp, err.Error(), 500)
218                 bufs.Put(buf)
219                 return
220         }
221
222         replication, err := PutBlock(ctx, buf, hash)
223         bufs.Put(buf)
224
225         if err != nil {
226                 code := http.StatusInternalServerError
227                 if err, ok := err.(*KeepError); ok {
228                         code = err.HTTPCode
229                 }
230                 http.Error(resp, err.Error(), code)
231                 return
232         }
233
234         // Success; add a size hint, sign the locator if possible, and
235         // return it to the client.
236         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
237         apiToken := GetAPIToken(req)
238         if theConfig.blobSigningKey != nil && apiToken != "" {
239                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
240                 returnHash = SignLocator(returnHash, apiToken, expiry)
241         }
242         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
243         resp.Write([]byte(returnHash + "\n"))
244 }
245
246 // IndexHandler responds to "/index", "/index/{prefix}", and
247 // "/mounts/{uuid}/blocks" requests.
248 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
249         if !IsSystemAuth(GetAPIToken(req)) {
250                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
251                 return
252         }
253
254         prefix := mux.Vars(req)["prefix"]
255         if prefix == "" {
256                 req.ParseForm()
257                 prefix = req.Form.Get("prefix")
258         }
259
260         uuid := mux.Vars(req)["uuid"]
261
262         var vols []Volume
263         if uuid == "" {
264                 vols = KeepVM.AllReadable()
265         } else if v := KeepVM.Lookup(uuid, false); v == nil {
266                 http.Error(resp, "mount not found", http.StatusNotFound)
267                 return
268         } else {
269                 vols = []Volume{v}
270         }
271
272         for _, v := range vols {
273                 if err := v.IndexTo(prefix, resp); err != nil {
274                         // The only errors returned by IndexTo are
275                         // write errors returned by resp.Write(),
276                         // which probably means the client has
277                         // disconnected and this error will never be
278                         // reported to the client -- but it will
279                         // appear in our own error log.
280                         http.Error(resp, err.Error(), http.StatusInternalServerError)
281                         return
282                 }
283         }
284         // An empty line at EOF is the only way the client can be
285         // assured the entire index was received.
286         resp.Write([]byte{'\n'})
287 }
288
289 // MountsHandler responds to "GET /mounts" requests.
290 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
291         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
292         if err != nil {
293                 http.Error(resp, err.Error(), http.StatusInternalServerError)
294         }
295 }
296
297 // PoolStatus struct
298 type PoolStatus struct {
299         Alloc uint64 `json:"BytesAllocatedCumulative"`
300         Cap   int    `json:"BuffersMax"`
301         Len   int    `json:"BuffersInUse"`
302 }
303
304 type volumeStatusEnt struct {
305         Label         string
306         Status        *VolumeStatus `json:",omitempty"`
307         VolumeStats   *ioStats      `json:",omitempty"`
308         InternalStats interface{}   `json:",omitempty"`
309 }
310
311 // NodeStatus struct
312 type NodeStatus struct {
313         Volumes         []*volumeStatusEnt
314         BufferPool      PoolStatus
315         PullQueue       WorkQueueStatus
316         TrashQueue      WorkQueueStatus
317         RequestsCurrent int
318         RequestsMax     int
319         Version         string
320 }
321
322 var st NodeStatus
323 var stLock sync.Mutex
324
325 // DebugHandler addresses /debug.json requests.
326 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
327         type debugStats struct {
328                 MemStats runtime.MemStats
329         }
330         var ds debugStats
331         runtime.ReadMemStats(&ds.MemStats)
332         err := json.NewEncoder(resp).Encode(&ds)
333         if err != nil {
334                 http.Error(resp, err.Error(), 500)
335         }
336 }
337
338 // StatusHandler addresses /status.json requests.
339 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
340         stLock.Lock()
341         rtr.readNodeStatus(&st)
342         jstat, err := json.Marshal(&st)
343         stLock.Unlock()
344         if err == nil {
345                 resp.Write(jstat)
346         } else {
347                 log.Printf("json.Marshal: %s", err)
348                 log.Printf("NodeStatus = %v", &st)
349                 http.Error(resp, err.Error(), 500)
350         }
351 }
352
353 // populate the given NodeStatus struct with current values.
354 func (rtr *router) readNodeStatus(st *NodeStatus) {
355         st.Version = version
356         vols := KeepVM.AllReadable()
357         if cap(st.Volumes) < len(vols) {
358                 st.Volumes = make([]*volumeStatusEnt, len(vols))
359         }
360         st.Volumes = st.Volumes[:0]
361         for _, vol := range vols {
362                 var internalStats interface{}
363                 if vol, ok := vol.(InternalStatser); ok {
364                         internalStats = vol.InternalStats()
365                 }
366                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
367                         Label:         vol.String(),
368                         Status:        vol.Status(),
369                         InternalStats: internalStats,
370                         //VolumeStats: KeepVM.VolumeStats(vol),
371                 })
372         }
373         st.BufferPool.Alloc = bufs.Alloc()
374         st.BufferPool.Cap = bufs.Cap()
375         st.BufferPool.Len = bufs.Len()
376         st.PullQueue = getWorkQueueStatus(pullq)
377         st.TrashQueue = getWorkQueueStatus(trashq)
378         if rtr.limiter != nil {
379                 st.RequestsCurrent = rtr.limiter.Current()
380                 st.RequestsMax = rtr.limiter.Max()
381         }
382 }
383
384 // return a WorkQueueStatus for the given queue. If q is nil (which
385 // should never happen except in test suites), return a zero status
386 // value instead of crashing.
387 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
388         if q == nil {
389                 // This should only happen during tests.
390                 return WorkQueueStatus{}
391         }
392         return q.Status()
393 }
394
395 // DeleteHandler processes DELETE requests.
396 //
397 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
398 // from all connected volumes.
399 //
400 // Only the Data Manager, or an Arvados admin with scope "all", are
401 // allowed to issue DELETE requests.  If a DELETE request is not
402 // authenticated or is issued by a non-admin user, the server returns
403 // a PermissionError.
404 //
405 // Upon receiving a valid request from an authorized user,
406 // DeleteHandler deletes all copies of the specified block on local
407 // writable volumes.
408 //
409 // Response format:
410 //
411 // If the requested blocks was not found on any volume, the response
412 // code is HTTP 404 Not Found.
413 //
414 // Otherwise, the response code is 200 OK, with a response body
415 // consisting of the JSON message
416 //
417 //    {"copies_deleted":d,"copies_failed":f}
418 //
419 // where d and f are integers representing the number of blocks that
420 // were successfully and unsuccessfully deleted.
421 //
422 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
423         hash := mux.Vars(req)["hash"]
424
425         // Confirm that this user is an admin and has a token with unlimited scope.
426         var tok = GetAPIToken(req)
427         if tok == "" || !CanDelete(tok) {
428                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
429                 return
430         }
431
432         if !theConfig.EnableDelete {
433                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
434                 return
435         }
436
437         // Delete copies of this block from all available volumes.
438         // Report how many blocks were successfully deleted, and how
439         // many were found on writable volumes but not deleted.
440         var result struct {
441                 Deleted int `json:"copies_deleted"`
442                 Failed  int `json:"copies_failed"`
443         }
444         for _, vol := range KeepVM.AllWritable() {
445                 if err := vol.Trash(hash); err == nil {
446                         result.Deleted++
447                 } else if os.IsNotExist(err) {
448                         continue
449                 } else {
450                         result.Failed++
451                         log.Println("DeleteHandler:", err)
452                 }
453         }
454
455         var st int
456
457         if result.Deleted == 0 && result.Failed == 0 {
458                 st = http.StatusNotFound
459         } else {
460                 st = http.StatusOK
461         }
462
463         resp.WriteHeader(st)
464
465         if st == http.StatusOK {
466                 if body, err := json.Marshal(result); err == nil {
467                         resp.Write(body)
468                 } else {
469                         log.Printf("json.Marshal: %s (result = %v)", err, result)
470                         http.Error(resp, err.Error(), 500)
471                 }
472         }
473 }
474
475 /* PullHandler processes "PUT /pull" requests for the data manager.
476    The request body is a JSON message containing a list of pull
477    requests in the following format:
478
479    [
480       {
481          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
482          "servers":[
483                         "keep0.qr1hi.arvadosapi.com:25107",
484                         "keep1.qr1hi.arvadosapi.com:25108"
485                  ]
486           },
487           {
488                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
489                  "servers":[
490                         "10.0.1.5:25107",
491                         "10.0.1.6:25107",
492                         "10.0.1.7:25108"
493                  ]
494           },
495           ...
496    ]
497
498    Each pull request in the list consists of a block locator string
499    and an ordered list of servers.  Keepstore should try to fetch the
500    block from each server in turn.
501
502    If the request has not been sent by the Data Manager, return 401
503    Unauthorized.
504
505    If the JSON unmarshalling fails, return 400 Bad Request.
506 */
507
508 // PullRequest consists of a block locator and an ordered list of servers
509 type PullRequest struct {
510         Locator string   `json:"locator"`
511         Servers []string `json:"servers"`
512
513         // Destination mount, or "" for "anywhere"
514         MountUUID string `json:"mount_uuid"`
515 }
516
517 // PullHandler processes "PUT /pull" requests for the data manager.
518 func PullHandler(resp http.ResponseWriter, req *http.Request) {
519         // Reject unauthorized requests.
520         if !IsSystemAuth(GetAPIToken(req)) {
521                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
522                 return
523         }
524
525         // Parse the request body.
526         var pr []PullRequest
527         r := json.NewDecoder(req.Body)
528         if err := r.Decode(&pr); err != nil {
529                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
530                 return
531         }
532
533         // We have a properly formatted pull list sent from the data
534         // manager.  Report success and send the list to the pull list
535         // manager for further handling.
536         resp.WriteHeader(http.StatusOK)
537         resp.Write([]byte(
538                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
539
540         plist := list.New()
541         for _, p := range pr {
542                 plist.PushBack(p)
543         }
544         pullq.ReplaceQueue(plist)
545 }
546
547 // TrashRequest consists of a block locator and its Mtime
548 type TrashRequest struct {
549         Locator    string `json:"locator"`
550         BlockMtime int64  `json:"block_mtime"`
551
552         // Target mount, or "" for "everywhere"
553         MountUUID string `json:"mount_uuid"`
554 }
555
556 // TrashHandler processes /trash requests.
557 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
558         // Reject unauthorized requests.
559         if !IsSystemAuth(GetAPIToken(req)) {
560                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
561                 return
562         }
563
564         // Parse the request body.
565         var trash []TrashRequest
566         r := json.NewDecoder(req.Body)
567         if err := r.Decode(&trash); err != nil {
568                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
569                 return
570         }
571
572         // We have a properly formatted trash list sent from the data
573         // manager.  Report success and send the list to the trash work
574         // queue for further handling.
575         resp.WriteHeader(http.StatusOK)
576         resp.Write([]byte(
577                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
578
579         tlist := list.New()
580         for _, t := range trash {
581                 tlist.PushBack(t)
582         }
583         trashq.ReplaceQueue(tlist)
584 }
585
586 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
587 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
588         // Reject unauthorized requests.
589         if !IsSystemAuth(GetAPIToken(req)) {
590                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
591                 return
592         }
593
594         hash := mux.Vars(req)["hash"]
595
596         if len(KeepVM.AllWritable()) == 0 {
597                 http.Error(resp, "No writable volumes", http.StatusNotFound)
598                 return
599         }
600
601         var untrashedOn, failedOn []string
602         var numNotFound int
603         for _, vol := range KeepVM.AllWritable() {
604                 err := vol.Untrash(hash)
605
606                 if os.IsNotExist(err) {
607                         numNotFound++
608                 } else if err != nil {
609                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
610                         failedOn = append(failedOn, vol.String())
611                 } else {
612                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
613                         untrashedOn = append(untrashedOn, vol.String())
614                 }
615         }
616
617         if numNotFound == len(KeepVM.AllWritable()) {
618                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
619                 return
620         }
621
622         if len(failedOn) == len(KeepVM.AllWritable()) {
623                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
624         } else {
625                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
626                 if len(failedOn) > 0 {
627                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
628                 }
629                 resp.Write([]byte(respBody))
630         }
631 }
632
633 // GetBlock and PutBlock implement lower-level code for handling
634 // blocks by rooting through volumes connected to the local machine.
635 // Once the handler has determined that system policy permits the
636 // request, it calls these methods to perform the actual operation.
637 //
638 // TODO(twp): this code would probably be better located in the
639 // VolumeManager interface. As an abstraction, the VolumeManager
640 // should be the only part of the code that cares about which volume a
641 // block is stored on, so it should be responsible for figuring out
642 // which volume to check for fetching blocks, storing blocks, etc.
643
644 // GetBlock fetches the block identified by "hash" into the provided
645 // buf, and returns the data size.
646 //
647 // If the block cannot be found on any volume, returns NotFoundError.
648 //
649 // If the block found does not have the correct MD5 hash, returns
650 // DiskHashError.
651 //
652 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
653         // Attempt to read the requested hash from a keep volume.
654         errorToCaller := NotFoundError
655
656         for _, vol := range KeepVM.AllReadable() {
657                 size, err := vol.Get(ctx, hash, buf)
658                 select {
659                 case <-ctx.Done():
660                         return 0, ErrClientDisconnect
661                 default:
662                 }
663                 if err != nil {
664                         // IsNotExist is an expected error and may be
665                         // ignored. All other errors are logged. In
666                         // any case we continue trying to read other
667                         // volumes. If all volumes report IsNotExist,
668                         // we return a NotFoundError.
669                         if !os.IsNotExist(err) {
670                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
671                         }
672                         continue
673                 }
674                 // Check the file checksum.
675                 //
676                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
677                 if filehash != hash {
678                         // TODO: Try harder to tell a sysadmin about
679                         // this.
680                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
681                                 vol, hash, filehash)
682                         errorToCaller = DiskHashError
683                         continue
684                 }
685                 if errorToCaller == DiskHashError {
686                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
687                                 vol, hash)
688                 }
689                 return size, nil
690         }
691         return 0, errorToCaller
692 }
693
694 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
695 //
696 // PutBlock(ctx, block, hash)
697 //   Stores the BLOCK (identified by the content id HASH) in Keep.
698 //
699 //   The MD5 checksum of the block must be identical to the content id HASH.
700 //   If not, an error is returned.
701 //
702 //   PutBlock stores the BLOCK on the first Keep volume with free space.
703 //   A failure code is returned to the user only if all volumes fail.
704 //
705 //   On success, PutBlock returns nil.
706 //   On failure, it returns a KeepError with one of the following codes:
707 //
708 //   500 Collision
709 //          A different block with the same hash already exists on this
710 //          Keep server.
711 //   422 MD5Fail
712 //          The MD5 hash of the BLOCK does not match the argument HASH.
713 //   503 Full
714 //          There was not enough space left in any Keep volume to store
715 //          the object.
716 //   500 Fail
717 //          The object could not be stored for some other reason (e.g.
718 //          all writes failed). The text of the error message should
719 //          provide as much detail as possible.
720 //
721 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
722         // Check that BLOCK's checksum matches HASH.
723         blockhash := fmt.Sprintf("%x", md5.Sum(block))
724         if blockhash != hash {
725                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
726                 return 0, RequestHashError
727         }
728
729         // If we already have this data, it's intact on disk, and we
730         // can update its timestamp, return success. If we have
731         // different data with the same hash, return failure.
732         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
733                 return n, err
734         } else if ctx.Err() != nil {
735                 return 0, ErrClientDisconnect
736         }
737
738         // Choose a Keep volume to write to.
739         // If this volume fails, try all of the volumes in order.
740         if vol := KeepVM.NextWritable(); vol != nil {
741                 if err := vol.Put(ctx, hash, block); err == nil {
742                         return vol.Replication(), nil // success!
743                 }
744                 if ctx.Err() != nil {
745                         return 0, ErrClientDisconnect
746                 }
747         }
748
749         writables := KeepVM.AllWritable()
750         if len(writables) == 0 {
751                 log.Print("No writable volumes.")
752                 return 0, FullError
753         }
754
755         allFull := true
756         for _, vol := range writables {
757                 err := vol.Put(ctx, hash, block)
758                 if ctx.Err() != nil {
759                         return 0, ErrClientDisconnect
760                 }
761                 if err == nil {
762                         return vol.Replication(), nil // success!
763                 }
764                 if err != FullError {
765                         // The volume is not full but the
766                         // write did not succeed.  Report the
767                         // error and continue trying.
768                         allFull = false
769                         log.Printf("%s: Write(%s): %s", vol, hash, err)
770                 }
771         }
772
773         if allFull {
774                 log.Print("All volumes are full.")
775                 return 0, FullError
776         }
777         // Already logged the non-full errors.
778         return 0, GenericError
779 }
780
781 // CompareAndTouch returns the current replication level if one of the
782 // volumes already has the given content and it successfully updates
783 // the relevant block's modification time in order to protect it from
784 // premature garbage collection. Otherwise, it returns a non-nil
785 // error.
786 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
787         var bestErr error = NotFoundError
788         for _, vol := range KeepVM.AllWritable() {
789                 err := vol.Compare(ctx, hash, buf)
790                 if ctx.Err() != nil {
791                         return 0, ctx.Err()
792                 } else if err == CollisionError {
793                         // Stop if we have a block with same hash but
794                         // different content. (It will be impossible
795                         // to tell which one is wanted if we have
796                         // both, so there's no point writing it even
797                         // on a different volume.)
798                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
799                         return 0, err
800                 } else if os.IsNotExist(err) {
801                         // Block does not exist. This is the only
802                         // "normal" error: we don't log anything.
803                         continue
804                 } else if err != nil {
805                         // Couldn't open file, data is corrupt on
806                         // disk, etc.: log this abnormal condition,
807                         // and try the next volume.
808                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
809                         continue
810                 }
811                 if err := vol.Touch(hash); err != nil {
812                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
813                         bestErr = err
814                         continue
815                 }
816                 // Compare and Touch both worked --> done.
817                 return vol.Replication(), nil
818         }
819         return 0, bestErr
820 }
821
822 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
823
824 // IsValidLocator returns true if the specified string is a valid Keep locator.
825 //   When Keep is extended to support hash types other than MD5,
826 //   this should be updated to cover those as well.
827 //
828 func IsValidLocator(loc string) bool {
829         return validLocatorRe.MatchString(loc)
830 }
831
832 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
833
834 // GetAPIToken returns the OAuth2 token from the Authorization
835 // header of a HTTP request, or an empty string if no matching
836 // token is found.
837 func GetAPIToken(req *http.Request) string {
838         if auth, ok := req.Header["Authorization"]; ok {
839                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
840                         return match[2]
841                 }
842         }
843         return ""
844 }
845
846 // IsExpired returns true if the given Unix timestamp (expressed as a
847 // hexadecimal string) is in the past, or if timestampHex cannot be
848 // parsed as a hexadecimal string.
849 func IsExpired(timestampHex string) bool {
850         ts, err := strconv.ParseInt(timestampHex, 16, 0)
851         if err != nil {
852                 log.Printf("IsExpired: %s", err)
853                 return true
854         }
855         return time.Unix(ts, 0).Before(time.Now())
856 }
857
858 // CanDelete returns true if the user identified by apiToken is
859 // allowed to delete blocks.
860 func CanDelete(apiToken string) bool {
861         if apiToken == "" {
862                 return false
863         }
864         // Blocks may be deleted only when Keep has been configured with a
865         // data manager.
866         if IsSystemAuth(apiToken) {
867                 return true
868         }
869         // TODO(twp): look up apiToken with the API server
870         // return true if is_admin is true and if the token
871         // has unlimited scope
872         return false
873 }
874
875 // IsSystemAuth returns true if the given token is allowed to perform
876 // system level actions like deleting data.
877 func IsSystemAuth(token string) bool {
878         return token != "" && token == theConfig.systemAuthToken
879 }