Merge branch '3149-filter-any' refs #3149
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "syscall"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43
44         // For IndexHandler we support:
45         //   /index           - returns all locators
46         //   /index/{prefix}  - returns all locators that begin with {prefix}
47         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48         //      If {prefix} is the empty string, return an index of all locators
49         //      (so /index and /index/ behave identically)
50         //      A client may supply a full 32-digit locator string, in which
51         //      case the server will return an index with either zero or one
52         //      entries. This usage allows a client to check whether a block is
53         //      present, and its size and upload time, without retrieving the
54         //      entire block.
55         //
56         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
57         rest.HandleFunc(
58                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
60
61         // Any request which does not match any of these routes gets
62         // 400 Bad Request.
63         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
64
65         return rest
66 }
67
68 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
69         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
70 }
71
72 // FindKeepVolumes scans all mounted volumes on the system for Keep
73 // volumes, and returns a list of matching paths.
74 //
75 // A device is assumed to be a Keep volume if it is a normal or tmpfs
76 // volume and has a "/keep" directory directly underneath the mount
77 // point.
78 //
79 func FindKeepVolumes() []string {
80         vols := make([]string, 0)
81
82         if f, err := os.Open(PROC_MOUNTS); err != nil {
83                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
84         } else {
85                 scanner := bufio.NewScanner(f)
86                 for scanner.Scan() {
87                         args := strings.Fields(scanner.Text())
88                         dev, mount := args[0], args[1]
89                         if mount != "/" &&
90                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
91                                 keep := mount + "/keep"
92                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
93                                         vols = append(vols, keep)
94                                 }
95                         }
96                 }
97                 if err := scanner.Err(); err != nil {
98                         log.Fatal(err)
99                 }
100         }
101         return vols
102 }
103
104 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
105         hash := mux.Vars(req)["hash"]
106
107         log.Printf("%s %s", req.Method, hash)
108
109         hints := mux.Vars(req)["hints"]
110
111         // Parse the locator string and hints from the request.
112         // TODO(twp): implement a Locator type.
113         var signature, timestamp string
114         if hints != "" {
115                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
116                 for _, hint := range strings.Split(hints, "+") {
117                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
118                                 // Server ignores size hints
119                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
120                                 signature = m[1]
121                                 timestamp = m[2]
122                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
123                                 // Any unknown hint that starts with an uppercase letter is
124                                 // presumed to be valid and ignored, to permit forward compatibility.
125                         } else {
126                                 // Unknown format; not a valid locator.
127                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
128                                 return
129                         }
130                 }
131         }
132
133         // If permission checking is in effect, verify this
134         // request's permission signature.
135         if enforce_permissions {
136                 if signature == "" || timestamp == "" {
137                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
138                         return
139                 } else if IsExpired(timestamp) {
140                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
141                         return
142                 } else {
143                         req_locator := req.URL.Path[1:] // strip leading slash
144                         if !VerifySignature(req_locator, GetApiToken(req)) {
145                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
146                                 return
147                         }
148                 }
149         }
150
151         block, err := GetBlock(hash, false)
152
153         // Garbage collect after each GET. Fixes #2865.
154         // TODO(twp): review Keep memory usage and see if there's
155         // a better way to do this than blindly garbage collecting
156         // after every block.
157         defer runtime.GC()
158
159         if err != nil {
160                 // This type assertion is safe because the only errors
161                 // GetBlock can return are DiskHashError or NotFoundError.
162                 if err == NotFoundError {
163                         log.Printf("%s: not found, giving up\n", hash)
164                 }
165                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
166                 return
167         }
168
169         resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
170
171         _, err = resp.Write(block)
172         if err != nil {
173                 log.Printf("GetBlockHandler: writing response: %s", err)
174         }
175
176         return
177 }
178
179 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
180         // Garbage collect after each PUT. Fixes #2865.
181         // See also GetBlockHandler.
182         defer runtime.GC()
183
184         hash := mux.Vars(req)["hash"]
185
186         log.Printf("%s %s", req.Method, hash)
187
188         // Read the block data to be stored.
189         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
190         //
191         if req.ContentLength > BLOCKSIZE {
192                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
193                 return
194         }
195
196         buf := make([]byte, req.ContentLength)
197         nread, err := io.ReadFull(req.Body, buf)
198         if err != nil {
199                 http.Error(resp, err.Error(), 500)
200         } else if int64(nread) < req.ContentLength {
201                 http.Error(resp, "request truncated", 500)
202         } else {
203                 if err := PutBlock(buf, hash); err == nil {
204                         // Success; add a size hint, sign the locator if
205                         // possible, and return it to the client.
206                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
207                         api_token := GetApiToken(req)
208                         if PermissionSecret != nil && api_token != "" {
209                                 expiry := time.Now().Add(permission_ttl)
210                                 return_hash = SignLocator(return_hash, api_token, expiry)
211                         }
212                         resp.Write([]byte(return_hash + "\n"))
213                 } else {
214                         ke := err.(*KeepError)
215                         http.Error(resp, ke.Error(), ke.HTTPCode)
216                 }
217         }
218         return
219 }
220
221 // IndexHandler
222 //     A HandleFunc to address /index and /index/{prefix} requests.
223 //
224 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
225         prefix := mux.Vars(req)["prefix"]
226
227         // Only the data manager may issue /index requests,
228         // and only if enforce_permissions is enabled.
229         // All other requests return 403 Forbidden.
230         api_token := GetApiToken(req)
231         if !enforce_permissions ||
232                 api_token == "" ||
233                 data_manager_token != api_token {
234                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
235                 return
236         }
237         var index string
238         for _, vol := range KeepVM.Volumes() {
239                 index = index + vol.Index(prefix)
240         }
241         resp.Write([]byte(index))
242 }
243
244 // StatusHandler
245 //     Responds to /status.json requests with the current node status,
246 //     described in a JSON structure.
247 //
248 //     The data given in a status.json response includes:
249 //        volumes - a list of Keep volumes currently in use by this server
250 //          each volume is an object with the following fields:
251 //            * mount_point
252 //            * device_num (an integer identifying the underlying filesystem)
253 //            * bytes_free
254 //            * bytes_used
255 //
256 type VolumeStatus struct {
257         MountPoint string `json:"mount_point"`
258         DeviceNum  uint64 `json:"device_num"`
259         BytesFree  uint64 `json:"bytes_free"`
260         BytesUsed  uint64 `json:"bytes_used"`
261 }
262
263 type NodeStatus struct {
264         Volumes []*VolumeStatus `json:"volumes"`
265 }
266
267 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
268         st := GetNodeStatus()
269         if jstat, err := json.Marshal(st); err == nil {
270                 resp.Write(jstat)
271         } else {
272                 log.Printf("json.Marshal: %s\n", err)
273                 log.Printf("NodeStatus = %v\n", st)
274                 http.Error(resp, err.Error(), 500)
275         }
276 }
277
278 // GetNodeStatus
279 //     Returns a NodeStatus struct describing this Keep
280 //     node's current status.
281 //
282 func GetNodeStatus() *NodeStatus {
283         st := new(NodeStatus)
284
285         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
286         for i, vol := range KeepVM.Volumes() {
287                 st.Volumes[i] = vol.Status()
288         }
289         return st
290 }
291
292 // GetVolumeStatus
293 //     Returns a VolumeStatus describing the requested volume.
294 //
295 func GetVolumeStatus(volume string) *VolumeStatus {
296         var fs syscall.Statfs_t
297         var devnum uint64
298
299         if fi, err := os.Stat(volume); err == nil {
300                 devnum = fi.Sys().(*syscall.Stat_t).Dev
301         } else {
302                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
303                 return nil
304         }
305
306         err := syscall.Statfs(volume, &fs)
307         if err != nil {
308                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
309                 return nil
310         }
311         // These calculations match the way df calculates disk usage:
312         // "free" space is measured by fs.Bavail, but "used" space
313         // uses fs.Blocks - fs.Bfree.
314         free := fs.Bavail * uint64(fs.Bsize)
315         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
316         return &VolumeStatus{volume, devnum, free, used}
317 }
318
319 // DeleteHandler processes DELETE requests.
320 //
321 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
322 // from all connected volumes.
323 //
324 // Only the Data Manager, or an Arvados admin with scope "all", are
325 // allowed to issue DELETE requests.  If a DELETE request is not
326 // authenticated or is issued by a non-admin user, the server returns
327 // a PermissionError.
328 //
329 // Upon receiving a valid request from an authorized user,
330 // DeleteHandler deletes all copies of the specified block on local
331 // writable volumes.
332 //
333 // Response format:
334 //
335 // If the requested blocks was not found on any volume, the response
336 // code is HTTP 404 Not Found.
337 //
338 // Otherwise, the response code is 200 OK, with a response body
339 // consisting of the JSON message
340 //
341 //    {"copies_deleted":d,"copies_failed":f}
342 //
343 // where d and f are integers representing the number of blocks that
344 // were successfully and unsuccessfully deleted.
345 //
346 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
347         hash := mux.Vars(req)["hash"]
348         log.Printf("%s %s", req.Method, hash)
349
350         // Confirm that this user is an admin and has a token with unlimited scope.
351         var tok = GetApiToken(req)
352         if tok == "" || !CanDelete(tok) {
353                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
354                 return
355         }
356
357         if never_delete {
358                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
359                 return
360         }
361
362         // Delete copies of this block from all available volumes.  Report
363         // how many blocks were successfully and unsuccessfully
364         // deleted.
365         var result struct {
366                 Deleted int `json:"copies_deleted"`
367                 Failed  int `json:"copies_failed"`
368         }
369         for _, vol := range KeepVM.Volumes() {
370                 if err := vol.Delete(hash); err == nil {
371                         result.Deleted++
372                 } else if os.IsNotExist(err) {
373                         continue
374                 } else {
375                         result.Failed++
376                         log.Println("DeleteHandler:", err)
377                 }
378         }
379
380         var st int
381
382         if result.Deleted == 0 && result.Failed == 0 {
383                 st = http.StatusNotFound
384         } else {
385                 st = http.StatusOK
386         }
387
388         resp.WriteHeader(st)
389
390         if st == http.StatusOK {
391                 if body, err := json.Marshal(result); err == nil {
392                         resp.Write(body)
393                 } else {
394                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
395                         http.Error(resp, err.Error(), 500)
396                 }
397         }
398 }
399
400 // ==============================
401 // GetBlock and PutBlock implement lower-level code for handling
402 // blocks by rooting through volumes connected to the local machine.
403 // Once the handler has determined that system policy permits the
404 // request, it calls these methods to perform the actual operation.
405 //
406 // TODO(twp): this code would probably be better located in the
407 // VolumeManager interface. As an abstraction, the VolumeManager
408 // should be the only part of the code that cares about which volume a
409 // block is stored on, so it should be responsible for figuring out
410 // which volume to check for fetching blocks, storing blocks, etc.
411
412 // ==============================
413 // GetBlock fetches and returns the block identified by "hash".  If
414 // the update_timestamp argument is true, GetBlock also updates the
415 // block's file modification time (for the sake of PutBlock, which
416 // must update the file's timestamp when the block already exists).
417 //
418 // On success, GetBlock returns a byte slice with the block data, and
419 // a nil error.
420 //
421 // If the block cannot be found on any volume, returns NotFoundError.
422 //
423 // If the block found does not have the correct MD5 hash, returns
424 // DiskHashError.
425 //
426
427 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
428         // Attempt to read the requested hash from a keep volume.
429         error_to_caller := NotFoundError
430
431         for _, vol := range KeepVM.Volumes() {
432                 if buf, err := vol.Get(hash); err != nil {
433                         // IsNotExist is an expected error and may be ignored.
434                         // (If all volumes report IsNotExist, we return a NotFoundError)
435                         // All other errors should be logged but we continue trying to
436                         // read.
437                         switch {
438                         case os.IsNotExist(err):
439                                 continue
440                         default:
441                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
442                         }
443                 } else {
444                         // Double check the file checksum.
445                         //
446                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
447                         if filehash != hash {
448                                 // TODO(twp): this condition probably represents a bad disk and
449                                 // should raise major alarm bells for an administrator: e.g.
450                                 // they should be sent directly to an event manager at high
451                                 // priority or logged as urgent problems.
452                                 //
453                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
454                                         vol, hash, filehash)
455                                 error_to_caller = DiskHashError
456                         } else {
457                                 // Success!
458                                 if error_to_caller != NotFoundError {
459                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
460                                                 vol, hash)
461                                 }
462                                 // Update the timestamp if the caller requested.
463                                 // If we could not update the timestamp, continue looking on
464                                 // other volumes.
465                                 if update_timestamp {
466                                         if vol.Touch(hash) != nil {
467                                                 continue
468                                         }
469                                 }
470                                 return buf, nil
471                         }
472                 }
473         }
474
475         if error_to_caller != NotFoundError {
476                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
477         }
478         return nil, error_to_caller
479 }
480
481 /* PutBlock(block, hash)
482    Stores the BLOCK (identified by the content id HASH) in Keep.
483
484    The MD5 checksum of the block must be identical to the content id HASH.
485    If not, an error is returned.
486
487    PutBlock stores the BLOCK on the first Keep volume with free space.
488    A failure code is returned to the user only if all volumes fail.
489
490    On success, PutBlock returns nil.
491    On failure, it returns a KeepError with one of the following codes:
492
493    500 Collision
494           A different block with the same hash already exists on this
495           Keep server.
496    422 MD5Fail
497           The MD5 hash of the BLOCK does not match the argument HASH.
498    503 Full
499           There was not enough space left in any Keep volume to store
500           the object.
501    500 Fail
502           The object could not be stored for some other reason (e.g.
503           all writes failed). The text of the error message should
504           provide as much detail as possible.
505 */
506
507 func PutBlock(block []byte, hash string) error {
508         // Check that BLOCK's checksum matches HASH.
509         blockhash := fmt.Sprintf("%x", md5.Sum(block))
510         if blockhash != hash {
511                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
512                 return RequestHashError
513         }
514
515         // If we already have a block on disk under this identifier, return
516         // success (but check for MD5 collisions).  While fetching the block,
517         // update its timestamp.
518         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
519         // In either case, we want to write our new (good) block to disk,
520         // so there is nothing special to do if err != nil.
521         //
522         if oldblock, err := GetBlock(hash, true); err == nil {
523                 if bytes.Compare(block, oldblock) == 0 {
524                         // The block already exists; return success.
525                         return nil
526                 } else {
527                         return CollisionError
528                 }
529         }
530
531         // Choose a Keep volume to write to.
532         // If this volume fails, try all of the volumes in order.
533         vol := KeepVM.Choose()
534         if err := vol.Put(hash, block); err == nil {
535                 return nil // success!
536         } else {
537                 allFull := true
538                 for _, vol := range KeepVM.Volumes() {
539                         err := vol.Put(hash, block)
540                         if err == nil {
541                                 return nil // success!
542                         }
543                         if err != FullError {
544                                 // The volume is not full but the write did not succeed.
545                                 // Report the error and continue trying.
546                                 allFull = false
547                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
548                         }
549                 }
550
551                 if allFull {
552                         log.Printf("all Keep volumes full")
553                         return FullError
554                 } else {
555                         log.Printf("all Keep volumes failed")
556                         return GenericError
557                 }
558         }
559 }
560
561 // IsValidLocator
562 //     Return true if the specified string is a valid Keep locator.
563 //     When Keep is extended to support hash types other than MD5,
564 //     this should be updated to cover those as well.
565 //
566 func IsValidLocator(loc string) bool {
567         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
568         if err == nil {
569                 return match
570         }
571         log.Printf("IsValidLocator: %s\n", err)
572         return false
573 }
574
575 // GetApiToken returns the OAuth2 token from the Authorization
576 // header of a HTTP request, or an empty string if no matching
577 // token is found.
578 func GetApiToken(req *http.Request) string {
579         if auth, ok := req.Header["Authorization"]; ok {
580                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
581                         log.Println(err)
582                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
583                         return match[1]
584                 }
585         }
586         return ""
587 }
588
589 // IsExpired returns true if the given Unix timestamp (expressed as a
590 // hexadecimal string) is in the past, or if timestamp_hex cannot be
591 // parsed as a hexadecimal string.
592 func IsExpired(timestamp_hex string) bool {
593         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
594         if err != nil {
595                 log.Printf("IsExpired: %s\n", err)
596                 return true
597         }
598         return time.Unix(ts, 0).Before(time.Now())
599 }
600
601 // CanDelete returns true if the user identified by api_token is
602 // allowed to delete blocks.
603 func CanDelete(api_token string) bool {
604         if api_token == "" {
605                 return false
606         }
607         // Blocks may be deleted only when Keep has been configured with a
608         // data manager.
609         if data_manager_token == "" {
610                 return false
611         }
612         if api_token == data_manager_token {
613                 return true
614         }
615         // TODO(twp): look up api_token with the API server
616         // return true if is_admin is true and if the token
617         // has unlimited scope
618         return false
619 }