5748: Remove unused import. refs #5748
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "strconv"
23         "strings"
24         "syscall"
25         "time"
26 )
27
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
30 //
31 func MakeRESTRouter() *mux.Router {
32         rest := mux.NewRouter()
33
34         rest.HandleFunc(
35                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}+{hints}`,
38                 GetBlockHandler).Methods("GET", "HEAD")
39
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42         // List all blocks stored here. Privileged client only.
43         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44         // List blocks stored here whose hash has the given prefix.
45         // Privileged client only.
46         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47
48         // List volumes: path, device number, bytes used/avail.
49         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50
51         // Replace the current pull queue.
52         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53
54         // Replace the current trash queue.
55         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56
57         // Any request which does not match any of these routes gets
58         // 400 Bad Request.
59         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
60
61         return rest
62 }
63
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
66 }
67
68 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
69         hash := mux.Vars(req)["hash"]
70
71         hints := mux.Vars(req)["hints"]
72
73         // Parse the locator string and hints from the request.
74         // TODO(twp): implement a Locator type.
75         var signature, timestamp string
76         if hints != "" {
77                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
78                 for _, hint := range strings.Split(hints, "+") {
79                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
80                                 // Server ignores size hints
81                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
82                                 signature = m[1]
83                                 timestamp = m[2]
84                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
85                                 // Any unknown hint that starts with an uppercase letter is
86                                 // presumed to be valid and ignored, to permit forward compatibility.
87                         } else {
88                                 // Unknown format; not a valid locator.
89                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
90                                 return
91                         }
92                 }
93         }
94
95         // If permission checking is in effect, verify this
96         // request's permission signature.
97         if enforce_permissions {
98                 if signature == "" || timestamp == "" {
99                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
100                         return
101                 } else if IsExpired(timestamp) {
102                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
103                         return
104                 } else {
105                         req_locator := req.URL.Path[1:] // strip leading slash
106                         if !VerifySignature(req_locator, GetApiToken(req)) {
107                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
108                                 return
109                         }
110                 }
111         }
112
113         block, err := GetBlock(hash, false)
114         if err != nil {
115                 // This type assertion is safe because the only errors
116                 // GetBlock can return are DiskHashError or NotFoundError.
117                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
118                 return
119         }
120         defer bufs.Put(block)
121
122         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
123         resp.Header().Set("Content-Type", "application/octet-stream")
124         resp.Write(block)
125 }
126
127 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
128         hash := mux.Vars(req)["hash"]
129
130         // Detect as many error conditions as possible before reading
131         // the body: avoid transmitting data that will not end up
132         // being written anyway.
133
134         if req.ContentLength == -1 {
135                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
136                 return
137         }
138
139         if req.ContentLength > BLOCKSIZE {
140                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
141                 return
142         }
143
144         if len(KeepVM.AllWritable()) == 0 {
145                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
146                 return
147         }
148
149         buf := bufs.Get(int(req.ContentLength))
150         _, err := io.ReadFull(req.Body, buf)
151         if err != nil {
152                 http.Error(resp, err.Error(), 500)
153                 bufs.Put(buf)
154                 return
155         }
156
157         err = PutBlock(buf, hash)
158         bufs.Put(buf)
159
160         if err != nil {
161                 ke := err.(*KeepError)
162                 http.Error(resp, ke.Error(), ke.HTTPCode)
163                 return
164         }
165
166         // Success; add a size hint, sign the locator if possible, and
167         // return it to the client.
168         return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
169         api_token := GetApiToken(req)
170         if PermissionSecret != nil && api_token != "" {
171                 expiry := time.Now().Add(blob_signature_ttl)
172                 return_hash = SignLocator(return_hash, api_token, expiry)
173         }
174         resp.Write([]byte(return_hash + "\n"))
175 }
176
177 // IndexHandler
178 //     A HandleFunc to address /index and /index/{prefix} requests.
179 //
180 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
181         // Reject unauthorized requests.
182         if !IsDataManagerToken(GetApiToken(req)) {
183                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
184                 return
185         }
186
187         prefix := mux.Vars(req)["prefix"]
188
189         for _, vol := range KeepVM.AllReadable() {
190                 if err := vol.IndexTo(prefix, resp); err != nil {
191                         // The only errors returned by IndexTo are
192                         // write errors returned by resp.Write(),
193                         // which probably means the client has
194                         // disconnected and this error will never be
195                         // reported to the client -- but it will
196                         // appear in our own error log.
197                         http.Error(resp, err.Error(), http.StatusInternalServerError)
198                         return
199                 }
200         }
201 }
202
203 // StatusHandler
204 //     Responds to /status.json requests with the current node status,
205 //     described in a JSON structure.
206 //
207 //     The data given in a status.json response includes:
208 //        volumes - a list of Keep volumes currently in use by this server
209 //          each volume is an object with the following fields:
210 //            * mount_point
211 //            * device_num (an integer identifying the underlying filesystem)
212 //            * bytes_free
213 //            * bytes_used
214 //
215 type VolumeStatus struct {
216         MountPoint string `json:"mount_point"`
217         DeviceNum  uint64 `json:"device_num"`
218         BytesFree  uint64 `json:"bytes_free"`
219         BytesUsed  uint64 `json:"bytes_used"`
220 }
221
222 type NodeStatus struct {
223         Volumes []*VolumeStatus `json:"volumes"`
224 }
225
226 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
227         st := GetNodeStatus()
228         if jstat, err := json.Marshal(st); err == nil {
229                 resp.Write(jstat)
230         } else {
231                 log.Printf("json.Marshal: %s\n", err)
232                 log.Printf("NodeStatus = %v\n", st)
233                 http.Error(resp, err.Error(), 500)
234         }
235 }
236
237 // GetNodeStatus
238 //     Returns a NodeStatus struct describing this Keep
239 //     node's current status.
240 //
241 func GetNodeStatus() *NodeStatus {
242         st := new(NodeStatus)
243
244         st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
245         for i, vol := range KeepVM.AllReadable() {
246                 st.Volumes[i] = vol.Status()
247         }
248         return st
249 }
250
251 // GetVolumeStatus
252 //     Returns a VolumeStatus describing the requested volume.
253 //
254 func GetVolumeStatus(volume string) *VolumeStatus {
255         var fs syscall.Statfs_t
256         var devnum uint64
257
258         if fi, err := os.Stat(volume); err == nil {
259                 devnum = fi.Sys().(*syscall.Stat_t).Dev
260         } else {
261                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
262                 return nil
263         }
264
265         err := syscall.Statfs(volume, &fs)
266         if err != nil {
267                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
268                 return nil
269         }
270         // These calculations match the way df calculates disk usage:
271         // "free" space is measured by fs.Bavail, but "used" space
272         // uses fs.Blocks - fs.Bfree.
273         free := fs.Bavail * uint64(fs.Bsize)
274         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
275         return &VolumeStatus{volume, devnum, free, used}
276 }
277
278 // DeleteHandler processes DELETE requests.
279 //
280 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
281 // from all connected volumes.
282 //
283 // Only the Data Manager, or an Arvados admin with scope "all", are
284 // allowed to issue DELETE requests.  If a DELETE request is not
285 // authenticated or is issued by a non-admin user, the server returns
286 // a PermissionError.
287 //
288 // Upon receiving a valid request from an authorized user,
289 // DeleteHandler deletes all copies of the specified block on local
290 // writable volumes.
291 //
292 // Response format:
293 //
294 // If the requested blocks was not found on any volume, the response
295 // code is HTTP 404 Not Found.
296 //
297 // Otherwise, the response code is 200 OK, with a response body
298 // consisting of the JSON message
299 //
300 //    {"copies_deleted":d,"copies_failed":f}
301 //
302 // where d and f are integers representing the number of blocks that
303 // were successfully and unsuccessfully deleted.
304 //
305 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
306         hash := mux.Vars(req)["hash"]
307
308         // Confirm that this user is an admin and has a token with unlimited scope.
309         var tok = GetApiToken(req)
310         if tok == "" || !CanDelete(tok) {
311                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
312                 return
313         }
314
315         if never_delete {
316                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
317                 return
318         }
319
320         // Delete copies of this block from all available volumes.
321         // Report how many blocks were successfully deleted, and how
322         // many were found on writable volumes but not deleted.
323         var result struct {
324                 Deleted int `json:"copies_deleted"`
325                 Failed  int `json:"copies_failed"`
326         }
327         for _, vol := range KeepVM.AllWritable() {
328                 if err := vol.Delete(hash); err == nil {
329                         result.Deleted++
330                 } else if os.IsNotExist(err) {
331                         continue
332                 } else {
333                         result.Failed++
334                         log.Println("DeleteHandler:", err)
335                 }
336         }
337
338         var st int
339
340         if result.Deleted == 0 && result.Failed == 0 {
341                 st = http.StatusNotFound
342         } else {
343                 st = http.StatusOK
344         }
345
346         resp.WriteHeader(st)
347
348         if st == http.StatusOK {
349                 if body, err := json.Marshal(result); err == nil {
350                         resp.Write(body)
351                 } else {
352                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
353                         http.Error(resp, err.Error(), 500)
354                 }
355         }
356 }
357
358 /* PullHandler processes "PUT /pull" requests for the data manager.
359    The request body is a JSON message containing a list of pull
360    requests in the following format:
361
362    [
363       {
364          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
365          "servers":[
366                         "keep0.qr1hi.arvadosapi.com:25107",
367                         "keep1.qr1hi.arvadosapi.com:25108"
368                  ]
369           },
370           {
371                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
372                  "servers":[
373                         "10.0.1.5:25107",
374                         "10.0.1.6:25107",
375                         "10.0.1.7:25108"
376                  ]
377           },
378           ...
379    ]
380
381    Each pull request in the list consists of a block locator string
382    and an ordered list of servers.  Keepstore should try to fetch the
383    block from each server in turn.
384
385    If the request has not been sent by the Data Manager, return 401
386    Unauthorized.
387
388    If the JSON unmarshalling fails, return 400 Bad Request.
389 */
390
391 type PullRequest struct {
392         Locator string   `json:"locator"`
393         Servers []string `json:"servers"`
394 }
395
396 func PullHandler(resp http.ResponseWriter, req *http.Request) {
397         // Reject unauthorized requests.
398         if !IsDataManagerToken(GetApiToken(req)) {
399                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
400                 return
401         }
402
403         // Parse the request body.
404         var pr []PullRequest
405         r := json.NewDecoder(req.Body)
406         if err := r.Decode(&pr); err != nil {
407                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
408                 return
409         }
410
411         // We have a properly formatted pull list sent from the data
412         // manager.  Report success and send the list to the pull list
413         // manager for further handling.
414         resp.WriteHeader(http.StatusOK)
415         resp.Write([]byte(
416                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
417
418         plist := list.New()
419         for _, p := range pr {
420                 plist.PushBack(p)
421         }
422         pullq.ReplaceQueue(plist)
423 }
424
425 type TrashRequest struct {
426         Locator    string `json:"locator"`
427         BlockMtime int64  `json:"block_mtime"`
428 }
429
430 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
431         // Reject unauthorized requests.
432         if !IsDataManagerToken(GetApiToken(req)) {
433                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
434                 return
435         }
436
437         // Parse the request body.
438         var trash []TrashRequest
439         r := json.NewDecoder(req.Body)
440         if err := r.Decode(&trash); err != nil {
441                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
442                 return
443         }
444
445         // We have a properly formatted trash list sent from the data
446         // manager.  Report success and send the list to the trash work
447         // queue for further handling.
448         resp.WriteHeader(http.StatusOK)
449         resp.Write([]byte(
450                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
451
452         tlist := list.New()
453         for _, t := range trash {
454                 tlist.PushBack(t)
455         }
456         trashq.ReplaceQueue(tlist)
457 }
458
459 // ==============================
460 // GetBlock and PutBlock implement lower-level code for handling
461 // blocks by rooting through volumes connected to the local machine.
462 // Once the handler has determined that system policy permits the
463 // request, it calls these methods to perform the actual operation.
464 //
465 // TODO(twp): this code would probably be better located in the
466 // VolumeManager interface. As an abstraction, the VolumeManager
467 // should be the only part of the code that cares about which volume a
468 // block is stored on, so it should be responsible for figuring out
469 // which volume to check for fetching blocks, storing blocks, etc.
470
471 // ==============================
472 // GetBlock fetches and returns the block identified by "hash".  If
473 // the update_timestamp argument is true, GetBlock also updates the
474 // block's file modification time (for the sake of PutBlock, which
475 // must update the file's timestamp when the block already exists).
476 //
477 // On success, GetBlock returns a byte slice with the block data, and
478 // a nil error.
479 //
480 // If the block cannot be found on any volume, returns NotFoundError.
481 //
482 // If the block found does not have the correct MD5 hash, returns
483 // DiskHashError.
484 //
485
486 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
487         // Attempt to read the requested hash from a keep volume.
488         error_to_caller := NotFoundError
489
490         var vols []Volume
491         if update_timestamp {
492                 // Pointless to find the block on an unwritable volume
493                 // because Touch() will fail -- this is as good as
494                 // "not found" for purposes of callers who need to
495                 // update_timestamp.
496                 vols = KeepVM.AllWritable()
497         } else {
498                 vols = KeepVM.AllReadable()
499         }
500
501         for _, vol := range vols {
502                 buf, err := vol.Get(hash)
503                 if err != nil {
504                         // IsNotExist is an expected error and may be
505                         // ignored. All other errors are logged. In
506                         // any case we continue trying to read other
507                         // volumes. If all volumes report IsNotExist,
508                         // we return a NotFoundError.
509                         if !os.IsNotExist(err) {
510                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
511                         }
512                         continue
513                 }
514                 // Check the file checksum.
515                 //
516                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
517                 if filehash != hash {
518                         // TODO: Try harder to tell a sysadmin about
519                         // this.
520                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
521                                 vol, hash, filehash)
522                         error_to_caller = DiskHashError
523                         bufs.Put(buf)
524                         continue
525                 }
526                 if error_to_caller == DiskHashError {
527                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
528                                 vol, hash)
529                 }
530                 if update_timestamp {
531                         if err := vol.Touch(hash); err != nil {
532                                 error_to_caller = GenericError
533                                 log.Printf("%s: Touch %s failed: %s",
534                                         vol, hash, error_to_caller)
535                                 bufs.Put(buf)
536                                 continue
537                         }
538                 }
539                 return buf, nil
540         }
541         return nil, error_to_caller
542 }
543
544 /* PutBlock(block, hash)
545    Stores the BLOCK (identified by the content id HASH) in Keep.
546
547    The MD5 checksum of the block must be identical to the content id HASH.
548    If not, an error is returned.
549
550    PutBlock stores the BLOCK on the first Keep volume with free space.
551    A failure code is returned to the user only if all volumes fail.
552
553    On success, PutBlock returns nil.
554    On failure, it returns a KeepError with one of the following codes:
555
556    500 Collision
557           A different block with the same hash already exists on this
558           Keep server.
559    422 MD5Fail
560           The MD5 hash of the BLOCK does not match the argument HASH.
561    503 Full
562           There was not enough space left in any Keep volume to store
563           the object.
564    500 Fail
565           The object could not be stored for some other reason (e.g.
566           all writes failed). The text of the error message should
567           provide as much detail as possible.
568 */
569
570 func PutBlock(block []byte, hash string) error {
571         // Check that BLOCK's checksum matches HASH.
572         blockhash := fmt.Sprintf("%x", md5.Sum(block))
573         if blockhash != hash {
574                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
575                 return RequestHashError
576         }
577
578         // If we already have a block on disk under this identifier, return
579         // success (but check for MD5 collisions).  While fetching the block,
580         // update its timestamp.
581         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
582         // In either case, we want to write our new (good) block to disk,
583         // so there is nothing special to do if err != nil.
584         //
585         if oldblock, err := GetBlock(hash, true); err == nil {
586                 defer bufs.Put(oldblock)
587                 if bytes.Compare(block, oldblock) == 0 {
588                         // The block already exists; return success.
589                         return nil
590                 } else {
591                         return CollisionError
592                 }
593         }
594
595         // Choose a Keep volume to write to.
596         // If this volume fails, try all of the volumes in order.
597         if vol := KeepVM.NextWritable(); vol != nil {
598                 if err := vol.Put(hash, block); err == nil {
599                         return nil // success!
600                 }
601         }
602
603         writables := KeepVM.AllWritable()
604         if len(writables) == 0 {
605                 log.Print("No writable volumes.")
606                 return FullError
607         }
608
609         allFull := true
610         for _, vol := range writables {
611                 err := vol.Put(hash, block)
612                 if err == nil {
613                         return nil // success!
614                 }
615                 if err != FullError {
616                         // The volume is not full but the
617                         // write did not succeed.  Report the
618                         // error and continue trying.
619                         allFull = false
620                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
621                 }
622         }
623
624         if allFull {
625                 log.Print("All volumes are full.")
626                 return FullError
627         } else {
628                 // Already logged the non-full errors.
629                 return GenericError
630         }
631 }
632
633 // IsValidLocator
634 //     Return true if the specified string is a valid Keep locator.
635 //     When Keep is extended to support hash types other than MD5,
636 //     this should be updated to cover those as well.
637 //
638 func IsValidLocator(loc string) bool {
639         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
640         if err == nil {
641                 return match
642         }
643         log.Printf("IsValidLocator: %s\n", err)
644         return false
645 }
646
647 // GetApiToken returns the OAuth2 token from the Authorization
648 // header of a HTTP request, or an empty string if no matching
649 // token is found.
650 func GetApiToken(req *http.Request) string {
651         if auth, ok := req.Header["Authorization"]; ok {
652                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
653                         log.Println(err)
654                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
655                         return match[1]
656                 }
657         }
658         return ""
659 }
660
661 // IsExpired returns true if the given Unix timestamp (expressed as a
662 // hexadecimal string) is in the past, or if timestamp_hex cannot be
663 // parsed as a hexadecimal string.
664 func IsExpired(timestamp_hex string) bool {
665         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
666         if err != nil {
667                 log.Printf("IsExpired: %s\n", err)
668                 return true
669         }
670         return time.Unix(ts, 0).Before(time.Now())
671 }
672
673 // CanDelete returns true if the user identified by api_token is
674 // allowed to delete blocks.
675 func CanDelete(api_token string) bool {
676         if api_token == "" {
677                 return false
678         }
679         // Blocks may be deleted only when Keep has been configured with a
680         // data manager.
681         if IsDataManagerToken(api_token) {
682                 return true
683         }
684         // TODO(twp): look up api_token with the API server
685         // return true if is_admin is true and if the token
686         // has unlimited scope
687         return false
688 }
689
690 // IsDataManagerToken returns true if api_token represents the data
691 // manager's token.
692 func IsDataManagerToken(api_token string) bool {
693         return data_manager_token != "" && api_token == data_manager_token
694 }