1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // REST handlers for Keep are implemented here.
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler (GET /index, GET /index/prefix)
12 // StatusHandler (GET /status.json)
30 "github.com/gorilla/mux"
32 "git.curoverse.com/arvados.git/sdk/go/health"
33 "git.curoverse.com/arvados.git/sdk/go/httpserver"
34 log "github.com/Sirupsen/logrus"
39 limiter httpserver.RequestCounter
42 // MakeRESTRouter returns a new router that forwards all Keep requests
43 // to the appropriate handlers.
44 func MakeRESTRouter() *router {
45 rest := mux.NewRouter()
46 rtr := &router{Router: rest}
49 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
51 `/{hash:[0-9a-f]{32}}+{hints}`,
52 GetBlockHandler).Methods("GET", "HEAD")
54 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
55 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
56 // List all blocks stored here. Privileged client only.
57 rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
58 // List blocks stored here whose hash has the given prefix.
59 // Privileged client only.
60 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
62 // Internals/debugging info (runtime.MemStats)
63 rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
65 // List volumes: path, device number, bytes used/avail.
66 rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
68 // List mounts: UUID, readonly, tier, device ID, ...
69 rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
70 rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
71 rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
73 // Replace the current pull queue.
74 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
76 // Replace the current trash queue.
77 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
79 // Untrash moves blocks from trash back into store
80 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
82 rest.Handle("/_health/{check}", &health.Handler{
83 Token: theConfig.ManagementToken,
87 // Any request which does not match any of these routes gets
89 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
94 // BadRequestHandler is a HandleFunc to address bad requests.
95 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
96 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
99 // GetBlockHandler is a HandleFunc to address Get block requests.
100 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
101 ctx, cancel := contextForResponse(context.TODO(), resp)
104 if theConfig.RequireSignatures {
105 locator := req.URL.Path[1:] // strip leading slash
106 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
107 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
112 // TODO: Probe volumes to check whether the block _might_
113 // exist. Some volumes/types could support a quick existence
114 // check without causing other operations to suffer. If all
115 // volumes support that, and assure us the block definitely
116 // isn't here, we can return 404 now instead of waiting for a
119 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
121 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
126 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
128 code := http.StatusInternalServerError
129 if err, ok := err.(*KeepError); ok {
132 http.Error(resp, err.Error(), code)
136 resp.Header().Set("Content-Length", strconv.Itoa(size))
137 resp.Header().Set("Content-Type", "application/octet-stream")
138 resp.Write(buf[:size])
141 // Return a new context that gets cancelled by resp's CloseNotifier.
142 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
143 ctx, cancel := context.WithCancel(parent)
144 if cn, ok := resp.(http.CloseNotifier); ok {
145 go func(c <-chan bool) {
148 theConfig.debugLogf("cancel context")
157 // Get a buffer from the pool -- but give up and return a non-nil
158 // error if ctx ends before we get a buffer.
159 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
160 bufReady := make(chan []byte)
162 bufReady <- bufs.Get(bufSize)
165 case buf := <-bufReady:
169 // Even if closeNotifier happened first, we
170 // need to keep waiting for our buf so we can
171 // return it to the pool.
174 return nil, ErrClientDisconnect
178 // PutBlockHandler is a HandleFunc to address Put block requests.
179 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
180 ctx, cancel := contextForResponse(context.TODO(), resp)
183 hash := mux.Vars(req)["hash"]
185 // Detect as many error conditions as possible before reading
186 // the body: avoid transmitting data that will not end up
187 // being written anyway.
189 if req.ContentLength == -1 {
190 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
194 if req.ContentLength > BlockSize {
195 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
199 if len(KeepVM.AllWritable()) == 0 {
200 http.Error(resp, FullError.Error(), FullError.HTTPCode)
204 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
206 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
210 _, err = io.ReadFull(req.Body, buf)
212 http.Error(resp, err.Error(), 500)
217 replication, err := PutBlock(ctx, buf, hash)
221 code := http.StatusInternalServerError
222 if err, ok := err.(*KeepError); ok {
225 http.Error(resp, err.Error(), code)
229 // Success; add a size hint, sign the locator if possible, and
230 // return it to the client.
231 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
232 apiToken := GetAPIToken(req)
233 if theConfig.blobSigningKey != nil && apiToken != "" {
234 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
235 returnHash = SignLocator(returnHash, apiToken, expiry)
237 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
238 resp.Write([]byte(returnHash + "\n"))
241 // IndexHandler responds to "/index", "/index/{prefix}", and
242 // "/mounts/{uuid}/blocks" requests.
243 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
244 if !IsSystemAuth(GetAPIToken(req)) {
245 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
249 prefix := mux.Vars(req)["prefix"]
252 prefix = req.Form.Get("prefix")
255 uuid := mux.Vars(req)["uuid"]
259 vols = KeepVM.AllReadable()
260 } else if v := KeepVM.Lookup(uuid, false); v == nil {
261 http.Error(resp, "mount not found", http.StatusNotFound)
267 for _, v := range vols {
268 if err := v.IndexTo(prefix, resp); err != nil {
269 // The only errors returned by IndexTo are
270 // write errors returned by resp.Write(),
271 // which probably means the client has
272 // disconnected and this error will never be
273 // reported to the client -- but it will
274 // appear in our own error log.
275 http.Error(resp, err.Error(), http.StatusInternalServerError)
279 // An empty line at EOF is the only way the client can be
280 // assured the entire index was received.
281 resp.Write([]byte{'\n'})
284 // MountsHandler responds to "GET /mounts" requests.
285 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
286 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
288 http.Error(resp, err.Error(), http.StatusInternalServerError)
293 type PoolStatus struct {
294 Alloc uint64 `json:"BytesAllocatedCumulative"`
295 Cap int `json:"BuffersMax"`
296 Len int `json:"BuffersInUse"`
299 type volumeStatusEnt struct {
301 Status *VolumeStatus `json:",omitempty"`
302 VolumeStats *ioStats `json:",omitempty"`
303 InternalStats interface{} `json:",omitempty"`
307 type NodeStatus struct {
308 Volumes []*volumeStatusEnt
309 BufferPool PoolStatus
310 PullQueue WorkQueueStatus
311 TrashQueue WorkQueueStatus
318 var stLock sync.Mutex
320 // DebugHandler addresses /debug.json requests.
321 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
322 type debugStats struct {
323 MemStats runtime.MemStats
326 runtime.ReadMemStats(&ds.MemStats)
327 err := json.NewEncoder(resp).Encode(&ds)
329 http.Error(resp, err.Error(), 500)
333 // StatusHandler addresses /status.json requests.
334 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
336 rtr.readNodeStatus(&st)
337 jstat, err := json.Marshal(&st)
342 log.Printf("json.Marshal: %s", err)
343 log.Printf("NodeStatus = %v", &st)
344 http.Error(resp, err.Error(), 500)
348 // populate the given NodeStatus struct with current values.
349 func (rtr *router) readNodeStatus(st *NodeStatus) {
351 vols := KeepVM.AllReadable()
352 if cap(st.Volumes) < len(vols) {
353 st.Volumes = make([]*volumeStatusEnt, len(vols))
355 st.Volumes = st.Volumes[:0]
356 for _, vol := range vols {
357 var internalStats interface{}
358 if vol, ok := vol.(InternalStatser); ok {
359 internalStats = vol.InternalStats()
361 st.Volumes = append(st.Volumes, &volumeStatusEnt{
363 Status: vol.Status(),
364 InternalStats: internalStats,
365 //VolumeStats: KeepVM.VolumeStats(vol),
368 st.BufferPool.Alloc = bufs.Alloc()
369 st.BufferPool.Cap = bufs.Cap()
370 st.BufferPool.Len = bufs.Len()
371 st.PullQueue = getWorkQueueStatus(pullq)
372 st.TrashQueue = getWorkQueueStatus(trashq)
373 if rtr.limiter != nil {
374 st.RequestsCurrent = rtr.limiter.Current()
375 st.RequestsMax = rtr.limiter.Max()
379 // return a WorkQueueStatus for the given queue. If q is nil (which
380 // should never happen except in test suites), return a zero status
381 // value instead of crashing.
382 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
384 // This should only happen during tests.
385 return WorkQueueStatus{}
390 // DeleteHandler processes DELETE requests.
392 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
393 // from all connected volumes.
395 // Only the Data Manager, or an Arvados admin with scope "all", are
396 // allowed to issue DELETE requests. If a DELETE request is not
397 // authenticated or is issued by a non-admin user, the server returns
398 // a PermissionError.
400 // Upon receiving a valid request from an authorized user,
401 // DeleteHandler deletes all copies of the specified block on local
406 // If the requested blocks was not found on any volume, the response
407 // code is HTTP 404 Not Found.
409 // Otherwise, the response code is 200 OK, with a response body
410 // consisting of the JSON message
412 // {"copies_deleted":d,"copies_failed":f}
414 // where d and f are integers representing the number of blocks that
415 // were successfully and unsuccessfully deleted.
417 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
418 hash := mux.Vars(req)["hash"]
420 // Confirm that this user is an admin and has a token with unlimited scope.
421 var tok = GetAPIToken(req)
422 if tok == "" || !CanDelete(tok) {
423 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
427 if !theConfig.EnableDelete {
428 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
432 // Delete copies of this block from all available volumes.
433 // Report how many blocks were successfully deleted, and how
434 // many were found on writable volumes but not deleted.
436 Deleted int `json:"copies_deleted"`
437 Failed int `json:"copies_failed"`
439 for _, vol := range KeepVM.AllWritable() {
440 if err := vol.Trash(hash); err == nil {
442 } else if os.IsNotExist(err) {
446 log.Println("DeleteHandler:", err)
452 if result.Deleted == 0 && result.Failed == 0 {
453 st = http.StatusNotFound
460 if st == http.StatusOK {
461 if body, err := json.Marshal(result); err == nil {
464 log.Printf("json.Marshal: %s (result = %v)", err, result)
465 http.Error(resp, err.Error(), 500)
470 /* PullHandler processes "PUT /pull" requests for the data manager.
471 The request body is a JSON message containing a list of pull
472 requests in the following format:
476 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
478 "keep0.qr1hi.arvadosapi.com:25107",
479 "keep1.qr1hi.arvadosapi.com:25108"
483 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
493 Each pull request in the list consists of a block locator string
494 and an ordered list of servers. Keepstore should try to fetch the
495 block from each server in turn.
497 If the request has not been sent by the Data Manager, return 401
500 If the JSON unmarshalling fails, return 400 Bad Request.
503 // PullRequest consists of a block locator and an ordered list of servers
504 type PullRequest struct {
505 Locator string `json:"locator"`
506 Servers []string `json:"servers"`
508 // Destination mount, or "" for "anywhere"
512 // PullHandler processes "PUT /pull" requests for the data manager.
513 func PullHandler(resp http.ResponseWriter, req *http.Request) {
514 // Reject unauthorized requests.
515 if !IsSystemAuth(GetAPIToken(req)) {
516 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
520 // Parse the request body.
522 r := json.NewDecoder(req.Body)
523 if err := r.Decode(&pr); err != nil {
524 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
528 // We have a properly formatted pull list sent from the data
529 // manager. Report success and send the list to the pull list
530 // manager for further handling.
531 resp.WriteHeader(http.StatusOK)
533 fmt.Sprintf("Received %d pull requests\n", len(pr))))
536 for _, p := range pr {
539 pullq.ReplaceQueue(plist)
542 // TrashRequest consists of a block locator and it's Mtime
543 type TrashRequest struct {
544 Locator string `json:"locator"`
545 BlockMtime int64 `json:"block_mtime"`
547 // Target mount, or "" for "everywhere"
551 // TrashHandler processes /trash requests.
552 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
553 // Reject unauthorized requests.
554 if !IsSystemAuth(GetAPIToken(req)) {
555 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
559 // Parse the request body.
560 var trash []TrashRequest
561 r := json.NewDecoder(req.Body)
562 if err := r.Decode(&trash); err != nil {
563 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
567 // We have a properly formatted trash list sent from the data
568 // manager. Report success and send the list to the trash work
569 // queue for further handling.
570 resp.WriteHeader(http.StatusOK)
572 fmt.Sprintf("Received %d trash requests\n", len(trash))))
575 for _, t := range trash {
578 trashq.ReplaceQueue(tlist)
581 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
582 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
583 // Reject unauthorized requests.
584 if !IsSystemAuth(GetAPIToken(req)) {
585 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
589 hash := mux.Vars(req)["hash"]
591 if len(KeepVM.AllWritable()) == 0 {
592 http.Error(resp, "No writable volumes", http.StatusNotFound)
596 var untrashedOn, failedOn []string
598 for _, vol := range KeepVM.AllWritable() {
599 err := vol.Untrash(hash)
601 if os.IsNotExist(err) {
603 } else if err != nil {
604 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
605 failedOn = append(failedOn, vol.String())
607 log.Printf("Untrashed %v on volume %v", hash, vol.String())
608 untrashedOn = append(untrashedOn, vol.String())
612 if numNotFound == len(KeepVM.AllWritable()) {
613 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
617 if len(failedOn) == len(KeepVM.AllWritable()) {
618 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
620 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
621 if len(failedOn) > 0 {
622 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
624 resp.Write([]byte(respBody))
628 // GetBlock and PutBlock implement lower-level code for handling
629 // blocks by rooting through volumes connected to the local machine.
630 // Once the handler has determined that system policy permits the
631 // request, it calls these methods to perform the actual operation.
633 // TODO(twp): this code would probably be better located in the
634 // VolumeManager interface. As an abstraction, the VolumeManager
635 // should be the only part of the code that cares about which volume a
636 // block is stored on, so it should be responsible for figuring out
637 // which volume to check for fetching blocks, storing blocks, etc.
639 // GetBlock fetches the block identified by "hash" into the provided
640 // buf, and returns the data size.
642 // If the block cannot be found on any volume, returns NotFoundError.
644 // If the block found does not have the correct MD5 hash, returns
647 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
648 // Attempt to read the requested hash from a keep volume.
649 errorToCaller := NotFoundError
651 for _, vol := range KeepVM.AllReadable() {
652 size, err := vol.Get(ctx, hash, buf)
655 return 0, ErrClientDisconnect
659 // IsNotExist is an expected error and may be
660 // ignored. All other errors are logged. In
661 // any case we continue trying to read other
662 // volumes. If all volumes report IsNotExist,
663 // we return a NotFoundError.
664 if !os.IsNotExist(err) {
665 log.Printf("%s: Get(%s): %s", vol, hash, err)
669 // Check the file checksum.
671 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
672 if filehash != hash {
673 // TODO: Try harder to tell a sysadmin about
675 log.Printf("%s: checksum mismatch for request %s (actual %s)",
677 errorToCaller = DiskHashError
680 if errorToCaller == DiskHashError {
681 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
686 return 0, errorToCaller
689 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
691 // PutBlock(ctx, block, hash)
692 // Stores the BLOCK (identified by the content id HASH) in Keep.
694 // The MD5 checksum of the block must be identical to the content id HASH.
695 // If not, an error is returned.
697 // PutBlock stores the BLOCK on the first Keep volume with free space.
698 // A failure code is returned to the user only if all volumes fail.
700 // On success, PutBlock returns nil.
701 // On failure, it returns a KeepError with one of the following codes:
704 // A different block with the same hash already exists on this
707 // The MD5 hash of the BLOCK does not match the argument HASH.
709 // There was not enough space left in any Keep volume to store
712 // The object could not be stored for some other reason (e.g.
713 // all writes failed). The text of the error message should
714 // provide as much detail as possible.
716 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
717 // Check that BLOCK's checksum matches HASH.
718 blockhash := fmt.Sprintf("%x", md5.Sum(block))
719 if blockhash != hash {
720 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
721 return 0, RequestHashError
724 // If we already have this data, it's intact on disk, and we
725 // can update its timestamp, return success. If we have
726 // different data with the same hash, return failure.
727 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
729 } else if ctx.Err() != nil {
730 return 0, ErrClientDisconnect
733 // Choose a Keep volume to write to.
734 // If this volume fails, try all of the volumes in order.
735 if vol := KeepVM.NextWritable(); vol != nil {
736 if err := vol.Put(ctx, hash, block); err == nil {
737 return vol.Replication(), nil // success!
739 if ctx.Err() != nil {
740 return 0, ErrClientDisconnect
744 writables := KeepVM.AllWritable()
745 if len(writables) == 0 {
746 log.Print("No writable volumes.")
751 for _, vol := range writables {
752 err := vol.Put(ctx, hash, block)
753 if ctx.Err() != nil {
754 return 0, ErrClientDisconnect
757 return vol.Replication(), nil // success!
759 if err != FullError {
760 // The volume is not full but the
761 // write did not succeed. Report the
762 // error and continue trying.
764 log.Printf("%s: Write(%s): %s", vol, hash, err)
769 log.Print("All volumes are full.")
772 // Already logged the non-full errors.
773 return 0, GenericError
776 // CompareAndTouch returns the current replication level if one of the
777 // volumes already has the given content and it successfully updates
778 // the relevant block's modification time in order to protect it from
779 // premature garbage collection. Otherwise, it returns a non-nil
781 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
782 var bestErr error = NotFoundError
783 for _, vol := range KeepVM.AllWritable() {
784 err := vol.Compare(ctx, hash, buf)
785 if ctx.Err() != nil {
787 } else if err == CollisionError {
788 // Stop if we have a block with same hash but
789 // different content. (It will be impossible
790 // to tell which one is wanted if we have
791 // both, so there's no point writing it even
792 // on a different volume.)
793 log.Printf("%s: Compare(%s): %s", vol, hash, err)
795 } else if os.IsNotExist(err) {
796 // Block does not exist. This is the only
797 // "normal" error: we don't log anything.
799 } else if err != nil {
800 // Couldn't open file, data is corrupt on
801 // disk, etc.: log this abnormal condition,
802 // and try the next volume.
803 log.Printf("%s: Compare(%s): %s", vol, hash, err)
806 if err := vol.Touch(hash); err != nil {
807 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
811 // Compare and Touch both worked --> done.
812 return vol.Replication(), nil
817 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
819 // IsValidLocator returns true if the specified string is a valid Keep locator.
820 // When Keep is extended to support hash types other than MD5,
821 // this should be updated to cover those as well.
823 func IsValidLocator(loc string) bool {
824 return validLocatorRe.MatchString(loc)
827 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
829 // GetAPIToken returns the OAuth2 token from the Authorization
830 // header of a HTTP request, or an empty string if no matching
832 func GetAPIToken(req *http.Request) string {
833 if auth, ok := req.Header["Authorization"]; ok {
834 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
841 // IsExpired returns true if the given Unix timestamp (expressed as a
842 // hexadecimal string) is in the past, or if timestampHex cannot be
843 // parsed as a hexadecimal string.
844 func IsExpired(timestampHex string) bool {
845 ts, err := strconv.ParseInt(timestampHex, 16, 0)
847 log.Printf("IsExpired: %s", err)
850 return time.Unix(ts, 0).Before(time.Now())
853 // CanDelete returns true if the user identified by apiToken is
854 // allowed to delete blocks.
855 func CanDelete(apiToken string) bool {
859 // Blocks may be deleted only when Keep has been configured with a
861 if IsSystemAuth(apiToken) {
864 // TODO(twp): look up apiToken with the API server
865 // return true if is_admin is true and if the token
866 // has unlimited scope
870 // IsSystemAuth returns true if the given token is allowed to perform
871 // system level actions like deleting data.
872 func IsSystemAuth(token string) bool {
873 return token != "" && token == theConfig.systemAuthToken