Merge branch '3198-writable-fuse' of git.curoverse.com:arvados into 3198-writable...
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "strconv"
23         "syscall"
24         "time"
25 )
26
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
29 //
30 func MakeRESTRouter() *mux.Router {
31         rest := mux.NewRouter()
32
33         rest.HandleFunc(
34                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}+{hints}`,
37                 GetBlockHandler).Methods("GET", "HEAD")
38
39         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41         // List all blocks stored here. Privileged client only.
42         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43         // List blocks stored here whose hash has the given prefix.
44         // Privileged client only.
45         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
46
47         // List volumes: path, device number, bytes used/avail.
48         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
49
50         // Replace the current pull queue.
51         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
52
53         // Replace the current trash queue.
54         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
55
56         // Any request which does not match any of these routes gets
57         // 400 Bad Request.
58         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
59
60         return rest
61 }
62
63 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
64         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
65 }
66
67 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
68         if enforce_permissions {
69                 locator := req.URL.Path[1:] // strip leading slash
70                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
71                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
72                         return
73                 }
74         }
75
76         block, err := GetBlock(mux.Vars(req)["hash"], false)
77         if err != nil {
78                 // This type assertion is safe because the only errors
79                 // GetBlock can return are DiskHashError or NotFoundError.
80                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
81                 return
82         }
83         defer bufs.Put(block)
84
85         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
86         resp.Header().Set("Content-Type", "application/octet-stream")
87         resp.Write(block)
88 }
89
90 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
91         hash := mux.Vars(req)["hash"]
92
93         // Detect as many error conditions as possible before reading
94         // the body: avoid transmitting data that will not end up
95         // being written anyway.
96
97         if req.ContentLength == -1 {
98                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
99                 return
100         }
101
102         if req.ContentLength > BLOCKSIZE {
103                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
104                 return
105         }
106
107         if len(KeepVM.AllWritable()) == 0 {
108                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
109                 return
110         }
111
112         buf := bufs.Get(int(req.ContentLength))
113         _, err := io.ReadFull(req.Body, buf)
114         if err != nil {
115                 http.Error(resp, err.Error(), 500)
116                 bufs.Put(buf)
117                 return
118         }
119
120         err = PutBlock(buf, hash)
121         bufs.Put(buf)
122
123         if err != nil {
124                 ke := err.(*KeepError)
125                 http.Error(resp, ke.Error(), ke.HTTPCode)
126                 return
127         }
128
129         // Success; add a size hint, sign the locator if possible, and
130         // return it to the client.
131         return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
132         api_token := GetApiToken(req)
133         if PermissionSecret != nil && api_token != "" {
134                 expiry := time.Now().Add(blob_signature_ttl)
135                 return_hash = SignLocator(return_hash, api_token, expiry)
136         }
137         resp.Write([]byte(return_hash + "\n"))
138 }
139
140 // IndexHandler
141 //     A HandleFunc to address /index and /index/{prefix} requests.
142 //
143 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
144         // Reject unauthorized requests.
145         if !IsDataManagerToken(GetApiToken(req)) {
146                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
147                 return
148         }
149
150         prefix := mux.Vars(req)["prefix"]
151
152         for _, vol := range KeepVM.AllReadable() {
153                 if err := vol.IndexTo(prefix, resp); err != nil {
154                         // The only errors returned by IndexTo are
155                         // write errors returned by resp.Write(),
156                         // which probably means the client has
157                         // disconnected and this error will never be
158                         // reported to the client -- but it will
159                         // appear in our own error log.
160                         http.Error(resp, err.Error(), http.StatusInternalServerError)
161                         return
162                 }
163         }
164         // An empty line at EOF is the only way the client can be
165         // assured the entire index was received.
166         resp.Write([]byte{'\n'})
167 }
168
169 // StatusHandler
170 //     Responds to /status.json requests with the current node status,
171 //     described in a JSON structure.
172 //
173 //     The data given in a status.json response includes:
174 //        volumes - a list of Keep volumes currently in use by this server
175 //          each volume is an object with the following fields:
176 //            * mount_point
177 //            * device_num (an integer identifying the underlying filesystem)
178 //            * bytes_free
179 //            * bytes_used
180 //
181 type VolumeStatus struct {
182         MountPoint string `json:"mount_point"`
183         DeviceNum  uint64 `json:"device_num"`
184         BytesFree  uint64 `json:"bytes_free"`
185         BytesUsed  uint64 `json:"bytes_used"`
186 }
187
188 type NodeStatus struct {
189         Volumes []*VolumeStatus `json:"volumes"`
190 }
191
192 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
193         st := GetNodeStatus()
194         if jstat, err := json.Marshal(st); err == nil {
195                 resp.Write(jstat)
196         } else {
197                 log.Printf("json.Marshal: %s\n", err)
198                 log.Printf("NodeStatus = %v\n", st)
199                 http.Error(resp, err.Error(), 500)
200         }
201 }
202
203 // GetNodeStatus
204 //     Returns a NodeStatus struct describing this Keep
205 //     node's current status.
206 //
207 func GetNodeStatus() *NodeStatus {
208         st := new(NodeStatus)
209
210         st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
211         for i, vol := range KeepVM.AllReadable() {
212                 st.Volumes[i] = vol.Status()
213         }
214         return st
215 }
216
217 // GetVolumeStatus
218 //     Returns a VolumeStatus describing the requested volume.
219 //
220 func GetVolumeStatus(volume string) *VolumeStatus {
221         var fs syscall.Statfs_t
222         var devnum uint64
223
224         if fi, err := os.Stat(volume); err == nil {
225                 devnum = fi.Sys().(*syscall.Stat_t).Dev
226         } else {
227                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
228                 return nil
229         }
230
231         err := syscall.Statfs(volume, &fs)
232         if err != nil {
233                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
234                 return nil
235         }
236         // These calculations match the way df calculates disk usage:
237         // "free" space is measured by fs.Bavail, but "used" space
238         // uses fs.Blocks - fs.Bfree.
239         free := fs.Bavail * uint64(fs.Bsize)
240         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
241         return &VolumeStatus{volume, devnum, free, used}
242 }
243
244 // DeleteHandler processes DELETE requests.
245 //
246 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
247 // from all connected volumes.
248 //
249 // Only the Data Manager, or an Arvados admin with scope "all", are
250 // allowed to issue DELETE requests.  If a DELETE request is not
251 // authenticated or is issued by a non-admin user, the server returns
252 // a PermissionError.
253 //
254 // Upon receiving a valid request from an authorized user,
255 // DeleteHandler deletes all copies of the specified block on local
256 // writable volumes.
257 //
258 // Response format:
259 //
260 // If the requested blocks was not found on any volume, the response
261 // code is HTTP 404 Not Found.
262 //
263 // Otherwise, the response code is 200 OK, with a response body
264 // consisting of the JSON message
265 //
266 //    {"copies_deleted":d,"copies_failed":f}
267 //
268 // where d and f are integers representing the number of blocks that
269 // were successfully and unsuccessfully deleted.
270 //
271 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
272         hash := mux.Vars(req)["hash"]
273
274         // Confirm that this user is an admin and has a token with unlimited scope.
275         var tok = GetApiToken(req)
276         if tok == "" || !CanDelete(tok) {
277                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
278                 return
279         }
280
281         if never_delete {
282                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
283                 return
284         }
285
286         // Delete copies of this block from all available volumes.
287         // Report how many blocks were successfully deleted, and how
288         // many were found on writable volumes but not deleted.
289         var result struct {
290                 Deleted int `json:"copies_deleted"`
291                 Failed  int `json:"copies_failed"`
292         }
293         for _, vol := range KeepVM.AllWritable() {
294                 if err := vol.Delete(hash); err == nil {
295                         result.Deleted++
296                 } else if os.IsNotExist(err) {
297                         continue
298                 } else {
299                         result.Failed++
300                         log.Println("DeleteHandler:", err)
301                 }
302         }
303
304         var st int
305
306         if result.Deleted == 0 && result.Failed == 0 {
307                 st = http.StatusNotFound
308         } else {
309                 st = http.StatusOK
310         }
311
312         resp.WriteHeader(st)
313
314         if st == http.StatusOK {
315                 if body, err := json.Marshal(result); err == nil {
316                         resp.Write(body)
317                 } else {
318                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
319                         http.Error(resp, err.Error(), 500)
320                 }
321         }
322 }
323
324 /* PullHandler processes "PUT /pull" requests for the data manager.
325    The request body is a JSON message containing a list of pull
326    requests in the following format:
327
328    [
329       {
330          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
331          "servers":[
332                         "keep0.qr1hi.arvadosapi.com:25107",
333                         "keep1.qr1hi.arvadosapi.com:25108"
334                  ]
335           },
336           {
337                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
338                  "servers":[
339                         "10.0.1.5:25107",
340                         "10.0.1.6:25107",
341                         "10.0.1.7:25108"
342                  ]
343           },
344           ...
345    ]
346
347    Each pull request in the list consists of a block locator string
348    and an ordered list of servers.  Keepstore should try to fetch the
349    block from each server in turn.
350
351    If the request has not been sent by the Data Manager, return 401
352    Unauthorized.
353
354    If the JSON unmarshalling fails, return 400 Bad Request.
355 */
356
357 type PullRequest struct {
358         Locator string   `json:"locator"`
359         Servers []string `json:"servers"`
360 }
361
362 func PullHandler(resp http.ResponseWriter, req *http.Request) {
363         // Reject unauthorized requests.
364         if !IsDataManagerToken(GetApiToken(req)) {
365                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
366                 return
367         }
368
369         // Parse the request body.
370         var pr []PullRequest
371         r := json.NewDecoder(req.Body)
372         if err := r.Decode(&pr); err != nil {
373                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
374                 return
375         }
376
377         // We have a properly formatted pull list sent from the data
378         // manager.  Report success and send the list to the pull list
379         // manager for further handling.
380         resp.WriteHeader(http.StatusOK)
381         resp.Write([]byte(
382                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
383
384         plist := list.New()
385         for _, p := range pr {
386                 plist.PushBack(p)
387         }
388         pullq.ReplaceQueue(plist)
389 }
390
391 type TrashRequest struct {
392         Locator    string `json:"locator"`
393         BlockMtime int64  `json:"block_mtime"`
394 }
395
396 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
397         // Reject unauthorized requests.
398         if !IsDataManagerToken(GetApiToken(req)) {
399                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
400                 return
401         }
402
403         // Parse the request body.
404         var trash []TrashRequest
405         r := json.NewDecoder(req.Body)
406         if err := r.Decode(&trash); err != nil {
407                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
408                 return
409         }
410
411         // We have a properly formatted trash list sent from the data
412         // manager.  Report success and send the list to the trash work
413         // queue for further handling.
414         resp.WriteHeader(http.StatusOK)
415         resp.Write([]byte(
416                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
417
418         tlist := list.New()
419         for _, t := range trash {
420                 tlist.PushBack(t)
421         }
422         trashq.ReplaceQueue(tlist)
423 }
424
425 // ==============================
426 // GetBlock and PutBlock implement lower-level code for handling
427 // blocks by rooting through volumes connected to the local machine.
428 // Once the handler has determined that system policy permits the
429 // request, it calls these methods to perform the actual operation.
430 //
431 // TODO(twp): this code would probably be better located in the
432 // VolumeManager interface. As an abstraction, the VolumeManager
433 // should be the only part of the code that cares about which volume a
434 // block is stored on, so it should be responsible for figuring out
435 // which volume to check for fetching blocks, storing blocks, etc.
436
437 // ==============================
438 // GetBlock fetches and returns the block identified by "hash".  If
439 // the update_timestamp argument is true, GetBlock also updates the
440 // block's file modification time (for the sake of PutBlock, which
441 // must update the file's timestamp when the block already exists).
442 //
443 // On success, GetBlock returns a byte slice with the block data, and
444 // a nil error.
445 //
446 // If the block cannot be found on any volume, returns NotFoundError.
447 //
448 // If the block found does not have the correct MD5 hash, returns
449 // DiskHashError.
450 //
451
452 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
453         // Attempt to read the requested hash from a keep volume.
454         error_to_caller := NotFoundError
455
456         var vols []Volume
457         if update_timestamp {
458                 // Pointless to find the block on an unwritable volume
459                 // because Touch() will fail -- this is as good as
460                 // "not found" for purposes of callers who need to
461                 // update_timestamp.
462                 vols = KeepVM.AllWritable()
463         } else {
464                 vols = KeepVM.AllReadable()
465         }
466
467         for _, vol := range vols {
468                 buf, err := vol.Get(hash)
469                 if err != nil {
470                         // IsNotExist is an expected error and may be
471                         // ignored. All other errors are logged. In
472                         // any case we continue trying to read other
473                         // volumes. If all volumes report IsNotExist,
474                         // we return a NotFoundError.
475                         if !os.IsNotExist(err) {
476                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
477                         }
478                         continue
479                 }
480                 // Check the file checksum.
481                 //
482                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
483                 if filehash != hash {
484                         // TODO: Try harder to tell a sysadmin about
485                         // this.
486                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
487                                 vol, hash, filehash)
488                         error_to_caller = DiskHashError
489                         bufs.Put(buf)
490                         continue
491                 }
492                 if error_to_caller == DiskHashError {
493                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
494                                 vol, hash)
495                 }
496                 if update_timestamp {
497                         if err := vol.Touch(hash); err != nil {
498                                 error_to_caller = GenericError
499                                 log.Printf("%s: Touch %s failed: %s",
500                                         vol, hash, error_to_caller)
501                                 bufs.Put(buf)
502                                 continue
503                         }
504                 }
505                 return buf, nil
506         }
507         return nil, error_to_caller
508 }
509
510 /* PutBlock(block, hash)
511    Stores the BLOCK (identified by the content id HASH) in Keep.
512
513    The MD5 checksum of the block must be identical to the content id HASH.
514    If not, an error is returned.
515
516    PutBlock stores the BLOCK on the first Keep volume with free space.
517    A failure code is returned to the user only if all volumes fail.
518
519    On success, PutBlock returns nil.
520    On failure, it returns a KeepError with one of the following codes:
521
522    500 Collision
523           A different block with the same hash already exists on this
524           Keep server.
525    422 MD5Fail
526           The MD5 hash of the BLOCK does not match the argument HASH.
527    503 Full
528           There was not enough space left in any Keep volume to store
529           the object.
530    500 Fail
531           The object could not be stored for some other reason (e.g.
532           all writes failed). The text of the error message should
533           provide as much detail as possible.
534 */
535
536 func PutBlock(block []byte, hash string) error {
537         // Check that BLOCK's checksum matches HASH.
538         blockhash := fmt.Sprintf("%x", md5.Sum(block))
539         if blockhash != hash {
540                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
541                 return RequestHashError
542         }
543
544         // If we already have a block on disk under this identifier, return
545         // success (but check for MD5 collisions).  While fetching the block,
546         // update its timestamp.
547         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
548         // In either case, we want to write our new (good) block to disk,
549         // so there is nothing special to do if err != nil.
550         //
551         if oldblock, err := GetBlock(hash, true); err == nil {
552                 defer bufs.Put(oldblock)
553                 if bytes.Compare(block, oldblock) == 0 {
554                         // The block already exists; return success.
555                         return nil
556                 } else {
557                         return CollisionError
558                 }
559         }
560
561         // Choose a Keep volume to write to.
562         // If this volume fails, try all of the volumes in order.
563         if vol := KeepVM.NextWritable(); vol != nil {
564                 if err := vol.Put(hash, block); err == nil {
565                         return nil // success!
566                 }
567         }
568
569         writables := KeepVM.AllWritable()
570         if len(writables) == 0 {
571                 log.Print("No writable volumes.")
572                 return FullError
573         }
574
575         allFull := true
576         for _, vol := range writables {
577                 err := vol.Put(hash, block)
578                 if err == nil {
579                         return nil // success!
580                 }
581                 if err != FullError {
582                         // The volume is not full but the
583                         // write did not succeed.  Report the
584                         // error and continue trying.
585                         allFull = false
586                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
587                 }
588         }
589
590         if allFull {
591                 log.Print("All volumes are full.")
592                 return FullError
593         } else {
594                 // Already logged the non-full errors.
595                 return GenericError
596         }
597 }
598
599 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
600
601 // IsValidLocator
602 //     Return true if the specified string is a valid Keep locator.
603 //     When Keep is extended to support hash types other than MD5,
604 //     this should be updated to cover those as well.
605 //
606 func IsValidLocator(loc string) bool {
607         return validLocatorRe.MatchString(loc)
608 }
609
610 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
611
612 // GetApiToken returns the OAuth2 token from the Authorization
613 // header of a HTTP request, or an empty string if no matching
614 // token is found.
615 func GetApiToken(req *http.Request) string {
616         if auth, ok := req.Header["Authorization"]; ok {
617                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
618                         return match[1]
619                 }
620         }
621         return ""
622 }
623
624 // IsExpired returns true if the given Unix timestamp (expressed as a
625 // hexadecimal string) is in the past, or if timestamp_hex cannot be
626 // parsed as a hexadecimal string.
627 func IsExpired(timestamp_hex string) bool {
628         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
629         if err != nil {
630                 log.Printf("IsExpired: %s\n", err)
631                 return true
632         }
633         return time.Unix(ts, 0).Before(time.Now())
634 }
635
636 // CanDelete returns true if the user identified by api_token is
637 // allowed to delete blocks.
638 func CanDelete(api_token string) bool {
639         if api_token == "" {
640                 return false
641         }
642         // Blocks may be deleted only when Keep has been configured with a
643         // data manager.
644         if IsDataManagerToken(api_token) {
645                 return true
646         }
647         // TODO(twp): look up api_token with the API server
648         // return true if is_admin is true and if the token
649         // has unlimited scope
650         return false
651 }
652
653 // IsDataManagerToken returns true if api_token represents the data
654 // manager's token.
655 func IsDataManagerToken(api_token string) bool {
656         return data_manager_token != "" && api_token == data_manager_token
657 }