7179: Tighten Put requirements when overwriting existing data.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "crypto/md5"
13         "encoding/json"
14         "fmt"
15         "github.com/gorilla/mux"
16         "io"
17         "log"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "sync"
24         "time"
25 )
26
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
29 //
30 func MakeRESTRouter() *mux.Router {
31         rest := mux.NewRouter()
32
33         rest.HandleFunc(
34                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}+{hints}`,
37                 GetBlockHandler).Methods("GET", "HEAD")
38
39         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41         // List all blocks stored here. Privileged client only.
42         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43         // List blocks stored here whose hash has the given prefix.
44         // Privileged client only.
45         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
46
47         // List volumes: path, device number, bytes used/avail.
48         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
49
50         // Replace the current pull queue.
51         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
52
53         // Replace the current trash queue.
54         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
55
56         // Any request which does not match any of these routes gets
57         // 400 Bad Request.
58         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
59
60         return rest
61 }
62
63 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
64         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
65 }
66
67 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
68         if enforce_permissions {
69                 locator := req.URL.Path[1:] // strip leading slash
70                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
71                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
72                         return
73                 }
74         }
75
76         block, err := GetBlock(mux.Vars(req)["hash"])
77         if err != nil {
78                 // This type assertion is safe because the only errors
79                 // GetBlock can return are DiskHashError or NotFoundError.
80                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
81                 return
82         }
83         defer bufs.Put(block)
84
85         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
86         resp.Header().Set("Content-Type", "application/octet-stream")
87         resp.Write(block)
88 }
89
90 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
91         hash := mux.Vars(req)["hash"]
92
93         // Detect as many error conditions as possible before reading
94         // the body: avoid transmitting data that will not end up
95         // being written anyway.
96
97         if req.ContentLength == -1 {
98                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
99                 return
100         }
101
102         if req.ContentLength > BLOCKSIZE {
103                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
104                 return
105         }
106
107         if len(KeepVM.AllWritable()) == 0 {
108                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
109                 return
110         }
111
112         buf := bufs.Get(int(req.ContentLength))
113         _, err := io.ReadFull(req.Body, buf)
114         if err != nil {
115                 http.Error(resp, err.Error(), 500)
116                 bufs.Put(buf)
117                 return
118         }
119
120         err = PutBlock(buf, hash)
121         bufs.Put(buf)
122
123         if err != nil {
124                 ke := err.(*KeepError)
125                 http.Error(resp, ke.Error(), ke.HTTPCode)
126                 return
127         }
128
129         // Success; add a size hint, sign the locator if possible, and
130         // return it to the client.
131         return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
132         api_token := GetApiToken(req)
133         if PermissionSecret != nil && api_token != "" {
134                 expiry := time.Now().Add(blob_signature_ttl)
135                 return_hash = SignLocator(return_hash, api_token, expiry)
136         }
137         resp.Write([]byte(return_hash + "\n"))
138 }
139
140 // IndexHandler
141 //     A HandleFunc to address /index and /index/{prefix} requests.
142 //
143 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
144         // Reject unauthorized requests.
145         if !IsDataManagerToken(GetApiToken(req)) {
146                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
147                 return
148         }
149
150         prefix := mux.Vars(req)["prefix"]
151
152         for _, vol := range KeepVM.AllReadable() {
153                 if err := vol.IndexTo(prefix, resp); err != nil {
154                         // The only errors returned by IndexTo are
155                         // write errors returned by resp.Write(),
156                         // which probably means the client has
157                         // disconnected and this error will never be
158                         // reported to the client -- but it will
159                         // appear in our own error log.
160                         http.Error(resp, err.Error(), http.StatusInternalServerError)
161                         return
162                 }
163         }
164         // An empty line at EOF is the only way the client can be
165         // assured the entire index was received.
166         resp.Write([]byte{'\n'})
167 }
168
169 // StatusHandler
170 //     Responds to /status.json requests with the current node status,
171 //     described in a JSON structure.
172 //
173 //     The data given in a status.json response includes:
174 //        volumes - a list of Keep volumes currently in use by this server
175 //          each volume is an object with the following fields:
176 //            * mount_point
177 //            * device_num (an integer identifying the underlying filesystem)
178 //            * bytes_free
179 //            * bytes_used
180
181 type PoolStatus struct {
182         Alloc uint64 `json:"BytesAllocated"`
183         Cap   int    `json:"BuffersMax"`
184         Len   int    `json:"BuffersInUse"`
185 }
186
187 type NodeStatus struct {
188         Volumes    []*VolumeStatus `json:"volumes"`
189         BufferPool PoolStatus
190         PullQueue  WorkQueueStatus
191         TrashQueue WorkQueueStatus
192         Memory     runtime.MemStats
193 }
194
195 var st NodeStatus
196 var stLock sync.Mutex
197
198 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
199         stLock.Lock()
200         readNodeStatus(&st)
201         jstat, err := json.Marshal(&st)
202         stLock.Unlock()
203         if err == nil {
204                 resp.Write(jstat)
205         } else {
206                 log.Printf("json.Marshal: %s", err)
207                 log.Printf("NodeStatus = %v", &st)
208                 http.Error(resp, err.Error(), 500)
209         }
210 }
211
212 // populate the given NodeStatus struct with current values.
213 func readNodeStatus(st *NodeStatus) {
214         vols := KeepVM.AllReadable()
215         if cap(st.Volumes) < len(vols) {
216                 st.Volumes = make([]*VolumeStatus, len(vols))
217         }
218         st.Volumes = st.Volumes[:0]
219         for _, vol := range vols {
220                 if s := vol.Status(); s != nil {
221                         st.Volumes = append(st.Volumes, s)
222                 }
223         }
224         st.BufferPool.Alloc = bufs.Alloc()
225         st.BufferPool.Cap = bufs.Cap()
226         st.BufferPool.Len = bufs.Len()
227         st.PullQueue = getWorkQueueStatus(pullq)
228         st.TrashQueue = getWorkQueueStatus(trashq)
229         runtime.ReadMemStats(&st.Memory)
230 }
231
232 // return a WorkQueueStatus for the given queue. If q is nil (which
233 // should never happen except in test suites), return a zero status
234 // value instead of crashing.
235 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
236         if q == nil {
237                 // This should only happen during tests.
238                 return WorkQueueStatus{}
239         }
240         return q.Status()
241 }
242
243 // DeleteHandler processes DELETE requests.
244 //
245 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
246 // from all connected volumes.
247 //
248 // Only the Data Manager, or an Arvados admin with scope "all", are
249 // allowed to issue DELETE requests.  If a DELETE request is not
250 // authenticated or is issued by a non-admin user, the server returns
251 // a PermissionError.
252 //
253 // Upon receiving a valid request from an authorized user,
254 // DeleteHandler deletes all copies of the specified block on local
255 // writable volumes.
256 //
257 // Response format:
258 //
259 // If the requested blocks was not found on any volume, the response
260 // code is HTTP 404 Not Found.
261 //
262 // Otherwise, the response code is 200 OK, with a response body
263 // consisting of the JSON message
264 //
265 //    {"copies_deleted":d,"copies_failed":f}
266 //
267 // where d and f are integers representing the number of blocks that
268 // were successfully and unsuccessfully deleted.
269 //
270 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
271         hash := mux.Vars(req)["hash"]
272
273         // Confirm that this user is an admin and has a token with unlimited scope.
274         var tok = GetApiToken(req)
275         if tok == "" || !CanDelete(tok) {
276                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
277                 return
278         }
279
280         if never_delete {
281                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
282                 return
283         }
284
285         // Delete copies of this block from all available volumes.
286         // Report how many blocks were successfully deleted, and how
287         // many were found on writable volumes but not deleted.
288         var result struct {
289                 Deleted int `json:"copies_deleted"`
290                 Failed  int `json:"copies_failed"`
291         }
292         for _, vol := range KeepVM.AllWritable() {
293                 if err := vol.Delete(hash); err == nil {
294                         result.Deleted++
295                 } else if os.IsNotExist(err) {
296                         continue
297                 } else {
298                         result.Failed++
299                         log.Println("DeleteHandler:", err)
300                 }
301         }
302
303         var st int
304
305         if result.Deleted == 0 && result.Failed == 0 {
306                 st = http.StatusNotFound
307         } else {
308                 st = http.StatusOK
309         }
310
311         resp.WriteHeader(st)
312
313         if st == http.StatusOK {
314                 if body, err := json.Marshal(result); err == nil {
315                         resp.Write(body)
316                 } else {
317                         log.Printf("json.Marshal: %s (result = %v)", err, result)
318                         http.Error(resp, err.Error(), 500)
319                 }
320         }
321 }
322
323 /* PullHandler processes "PUT /pull" requests for the data manager.
324    The request body is a JSON message containing a list of pull
325    requests in the following format:
326
327    [
328       {
329          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
330          "servers":[
331                         "keep0.qr1hi.arvadosapi.com:25107",
332                         "keep1.qr1hi.arvadosapi.com:25108"
333                  ]
334           },
335           {
336                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
337                  "servers":[
338                         "10.0.1.5:25107",
339                         "10.0.1.6:25107",
340                         "10.0.1.7:25108"
341                  ]
342           },
343           ...
344    ]
345
346    Each pull request in the list consists of a block locator string
347    and an ordered list of servers.  Keepstore should try to fetch the
348    block from each server in turn.
349
350    If the request has not been sent by the Data Manager, return 401
351    Unauthorized.
352
353    If the JSON unmarshalling fails, return 400 Bad Request.
354 */
355
356 type PullRequest struct {
357         Locator string   `json:"locator"`
358         Servers []string `json:"servers"`
359 }
360
361 func PullHandler(resp http.ResponseWriter, req *http.Request) {
362         // Reject unauthorized requests.
363         if !IsDataManagerToken(GetApiToken(req)) {
364                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
365                 return
366         }
367
368         // Parse the request body.
369         var pr []PullRequest
370         r := json.NewDecoder(req.Body)
371         if err := r.Decode(&pr); err != nil {
372                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
373                 return
374         }
375
376         // We have a properly formatted pull list sent from the data
377         // manager.  Report success and send the list to the pull list
378         // manager for further handling.
379         resp.WriteHeader(http.StatusOK)
380         resp.Write([]byte(
381                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
382
383         plist := list.New()
384         for _, p := range pr {
385                 plist.PushBack(p)
386         }
387         pullq.ReplaceQueue(plist)
388 }
389
390 type TrashRequest struct {
391         Locator    string `json:"locator"`
392         BlockMtime int64  `json:"block_mtime"`
393 }
394
395 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
396         // Reject unauthorized requests.
397         if !IsDataManagerToken(GetApiToken(req)) {
398                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
399                 return
400         }
401
402         // Parse the request body.
403         var trash []TrashRequest
404         r := json.NewDecoder(req.Body)
405         if err := r.Decode(&trash); err != nil {
406                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
407                 return
408         }
409
410         // We have a properly formatted trash list sent from the data
411         // manager.  Report success and send the list to the trash work
412         // queue for further handling.
413         resp.WriteHeader(http.StatusOK)
414         resp.Write([]byte(
415                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
416
417         tlist := list.New()
418         for _, t := range trash {
419                 tlist.PushBack(t)
420         }
421         trashq.ReplaceQueue(tlist)
422 }
423
424 // ==============================
425 // GetBlock and PutBlock implement lower-level code for handling
426 // blocks by rooting through volumes connected to the local machine.
427 // Once the handler has determined that system policy permits the
428 // request, it calls these methods to perform the actual operation.
429 //
430 // TODO(twp): this code would probably be better located in the
431 // VolumeManager interface. As an abstraction, the VolumeManager
432 // should be the only part of the code that cares about which volume a
433 // block is stored on, so it should be responsible for figuring out
434 // which volume to check for fetching blocks, storing blocks, etc.
435
436 // ==============================
437 // GetBlock fetches and returns the block identified by "hash".
438 //
439 // On success, GetBlock returns a byte slice with the block data, and
440 // a nil error.
441 //
442 // If the block cannot be found on any volume, returns NotFoundError.
443 //
444 // If the block found does not have the correct MD5 hash, returns
445 // DiskHashError.
446 //
447
448 func GetBlock(hash string) ([]byte, error) {
449         // Attempt to read the requested hash from a keep volume.
450         error_to_caller := NotFoundError
451
452         for _, vol := range KeepVM.AllReadable() {
453                 buf, err := vol.Get(hash)
454                 if err != nil {
455                         // IsNotExist is an expected error and may be
456                         // ignored. All other errors are logged. In
457                         // any case we continue trying to read other
458                         // volumes. If all volumes report IsNotExist,
459                         // we return a NotFoundError.
460                         if !os.IsNotExist(err) {
461                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
462                         }
463                         continue
464                 }
465                 // Check the file checksum.
466                 //
467                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
468                 if filehash != hash {
469                         // TODO: Try harder to tell a sysadmin about
470                         // this.
471                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
472                                 vol, hash, filehash)
473                         error_to_caller = DiskHashError
474                         bufs.Put(buf)
475                         continue
476                 }
477                 if error_to_caller == DiskHashError {
478                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
479                                 vol, hash)
480                 }
481                 return buf, nil
482         }
483         return nil, error_to_caller
484 }
485
486 /* PutBlock(block, hash)
487    Stores the BLOCK (identified by the content id HASH) in Keep.
488
489    The MD5 checksum of the block must be identical to the content id HASH.
490    If not, an error is returned.
491
492    PutBlock stores the BLOCK on the first Keep volume with free space.
493    A failure code is returned to the user only if all volumes fail.
494
495    On success, PutBlock returns nil.
496    On failure, it returns a KeepError with one of the following codes:
497
498    500 Collision
499           A different block with the same hash already exists on this
500           Keep server.
501    422 MD5Fail
502           The MD5 hash of the BLOCK does not match the argument HASH.
503    503 Full
504           There was not enough space left in any Keep volume to store
505           the object.
506    500 Fail
507           The object could not be stored for some other reason (e.g.
508           all writes failed). The text of the error message should
509           provide as much detail as possible.
510 */
511
512 func PutBlock(block []byte, hash string) error {
513         // Check that BLOCK's checksum matches HASH.
514         blockhash := fmt.Sprintf("%x", md5.Sum(block))
515         if blockhash != hash {
516                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
517                 return RequestHashError
518         }
519
520         // If we already have this data, it's intact on disk, and we
521         // can update its timestamp, return success. If we have
522         // different data with the same hash, return failure.
523         if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
524                 return err
525         }
526
527         // Choose a Keep volume to write to.
528         // If this volume fails, try all of the volumes in order.
529         if vol := KeepVM.NextWritable(); vol != nil {
530                 if err := vol.Put(hash, block); err == nil {
531                         return nil // success!
532                 }
533         }
534
535         writables := KeepVM.AllWritable()
536         if len(writables) == 0 {
537                 log.Print("No writable volumes.")
538                 return FullError
539         }
540
541         allFull := true
542         for _, vol := range writables {
543                 err := vol.Put(hash, block)
544                 if err == nil {
545                         return nil // success!
546                 }
547                 if err != FullError {
548                         // The volume is not full but the
549                         // write did not succeed.  Report the
550                         // error and continue trying.
551                         allFull = false
552                         log.Printf("%s: Write(%s): %s", vol, hash, err)
553                 }
554         }
555
556         if allFull {
557                 log.Print("All volumes are full.")
558                 return FullError
559         } else {
560                 // Already logged the non-full errors.
561                 return GenericError
562         }
563 }
564
565 // CompareAndTouch returns nil if one of the volumes already has the
566 // given content and it successfully updates the relevant block's
567 // modification time in order to protect it from premature garbage
568 // collection.
569 func CompareAndTouch(hash string, buf []byte) error {
570         var bestErr error = NotFoundError
571         for _, vol := range KeepVM.AllWritable() {
572                 if err := vol.Compare(hash, buf); err == CollisionError {
573                         // Stop if we have a block with same hash but
574                         // different content. (It will be impossible
575                         // to tell which one is wanted if we have
576                         // both, so there's no point writing it even
577                         // on a different volume.)
578                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
579                         return err
580                 } else if os.IsNotExist(err) {
581                         // Block does not exist. This is the only
582                         // "normal" error: we don't log anything.
583                         continue
584                 } else if err != nil {
585                         // Couldn't open file, data is corrupt on
586                         // disk, etc.: log this abnormal condition,
587                         // and try the next volume.
588                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
589                         continue
590                 }
591                 if err := vol.Touch(hash); err != nil {
592                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
593                         bestErr = err
594                         continue
595                 }
596                 // Compare and Touch both worked --> done.
597                 return nil
598         }
599         return bestErr
600 }
601
602 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
603
604 // IsValidLocator
605 //     Return true if the specified string is a valid Keep locator.
606 //     When Keep is extended to support hash types other than MD5,
607 //     this should be updated to cover those as well.
608 //
609 func IsValidLocator(loc string) bool {
610         return validLocatorRe.MatchString(loc)
611 }
612
613 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
614
615 // GetApiToken returns the OAuth2 token from the Authorization
616 // header of a HTTP request, or an empty string if no matching
617 // token is found.
618 func GetApiToken(req *http.Request) string {
619         if auth, ok := req.Header["Authorization"]; ok {
620                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
621                         return match[1]
622                 }
623         }
624         return ""
625 }
626
627 // IsExpired returns true if the given Unix timestamp (expressed as a
628 // hexadecimal string) is in the past, or if timestamp_hex cannot be
629 // parsed as a hexadecimal string.
630 func IsExpired(timestamp_hex string) bool {
631         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
632         if err != nil {
633                 log.Printf("IsExpired: %s", err)
634                 return true
635         }
636         return time.Unix(ts, 0).Before(time.Now())
637 }
638
639 // CanDelete returns true if the user identified by api_token is
640 // allowed to delete blocks.
641 func CanDelete(api_token string) bool {
642         if api_token == "" {
643                 return false
644         }
645         // Blocks may be deleted only when Keep has been configured with a
646         // data manager.
647         if IsDataManagerToken(api_token) {
648                 return true
649         }
650         // TODO(twp): look up api_token with the API server
651         // return true if is_admin is true and if the token
652         // has unlimited scope
653         return false
654 }
655
656 // IsDataManagerToken returns true if api_token represents the data
657 // manager's token.
658 func IsDataManagerToken(api_token string) bool {
659         return data_manager_token != "" && api_token == data_manager_token
660 }