1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // REST handlers for Keep are implemented here.
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler (GET /index, GET /index/prefix)
12 // StatusHandler (GET /status.json)
30 "github.com/gorilla/mux"
32 "git.curoverse.com/arvados.git/sdk/go/httpserver"
33 log "github.com/Sirupsen/logrus"
38 limiter httpserver.RequestCounter
41 // MakeRESTRouter returns a new router that forwards all Keep requests
42 // to the appropriate handlers.
43 func MakeRESTRouter() *router {
44 rest := mux.NewRouter()
45 rtr := &router{Router: rest}
48 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
50 `/{hash:[0-9a-f]{32}}+{hints}`,
51 GetBlockHandler).Methods("GET", "HEAD")
53 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
54 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
55 // List all blocks stored here. Privileged client only.
56 rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
57 // List blocks stored here whose hash has the given prefix.
58 // Privileged client only.
59 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
61 // Internals/debugging info (runtime.MemStats)
62 rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
64 // List volumes: path, device number, bytes used/avail.
65 rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
67 // List mounts: UUID, readonly, tier, device ID, ...
68 rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
69 rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
70 rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
72 // Replace the current pull queue.
73 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
75 // Replace the current trash queue.
76 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
78 // Untrash moves blocks from trash back into store
79 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
81 // Any request which does not match any of these routes gets
83 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
88 // BadRequestHandler is a HandleFunc to address bad requests.
89 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
90 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
93 // GetBlockHandler is a HandleFunc to address Get block requests.
94 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
95 ctx, cancel := contextForResponse(context.TODO(), resp)
98 if theConfig.RequireSignatures {
99 locator := req.URL.Path[1:] // strip leading slash
100 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
101 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
106 // TODO: Probe volumes to check whether the block _might_
107 // exist. Some volumes/types could support a quick existence
108 // check without causing other operations to suffer. If all
109 // volumes support that, and assure us the block definitely
110 // isn't here, we can return 404 now instead of waiting for a
113 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
115 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
120 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
122 code := http.StatusInternalServerError
123 if err, ok := err.(*KeepError); ok {
126 http.Error(resp, err.Error(), code)
130 resp.Header().Set("Content-Length", strconv.Itoa(size))
131 resp.Header().Set("Content-Type", "application/octet-stream")
132 resp.Write(buf[:size])
135 // Return a new context that gets cancelled by resp's CloseNotifier.
136 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
137 ctx, cancel := context.WithCancel(parent)
138 if cn, ok := resp.(http.CloseNotifier); ok {
139 go func(c <-chan bool) {
142 theConfig.debugLogf("cancel context")
151 // Get a buffer from the pool -- but give up and return a non-nil
152 // error if ctx ends before we get a buffer.
153 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
154 bufReady := make(chan []byte)
156 bufReady <- bufs.Get(bufSize)
159 case buf := <-bufReady:
163 // Even if closeNotifier happened first, we
164 // need to keep waiting for our buf so we can
165 // return it to the pool.
168 return nil, ErrClientDisconnect
172 // PutBlockHandler is a HandleFunc to address Put block requests.
173 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
174 ctx, cancel := contextForResponse(context.TODO(), resp)
177 hash := mux.Vars(req)["hash"]
179 // Detect as many error conditions as possible before reading
180 // the body: avoid transmitting data that will not end up
181 // being written anyway.
183 if req.ContentLength == -1 {
184 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
188 if req.ContentLength > BlockSize {
189 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
193 if len(KeepVM.AllWritable()) == 0 {
194 http.Error(resp, FullError.Error(), FullError.HTTPCode)
198 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
200 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
204 _, err = io.ReadFull(req.Body, buf)
206 http.Error(resp, err.Error(), 500)
211 replication, err := PutBlock(ctx, buf, hash)
215 code := http.StatusInternalServerError
216 if err, ok := err.(*KeepError); ok {
219 http.Error(resp, err.Error(), code)
223 // Success; add a size hint, sign the locator if possible, and
224 // return it to the client.
225 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
226 apiToken := GetAPIToken(req)
227 if theConfig.blobSigningKey != nil && apiToken != "" {
228 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
229 returnHash = SignLocator(returnHash, apiToken, expiry)
231 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
232 resp.Write([]byte(returnHash + "\n"))
235 // IndexHandler responds to "/index", "/index/{prefix}", and
236 // "/mounts/{uuid}/blocks" requests.
237 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
238 if !IsSystemAuth(GetAPIToken(req)) {
239 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
243 prefix := mux.Vars(req)["prefix"]
246 prefix = req.Form.Get("prefix")
249 uuid := mux.Vars(req)["uuid"]
253 vols = KeepVM.AllReadable()
254 } else if v := KeepVM.Lookup(uuid, false); v == nil {
255 http.Error(resp, "mount not found", http.StatusNotFound)
261 for _, v := range vols {
262 if err := v.IndexTo(prefix, resp); err != nil {
263 // The only errors returned by IndexTo are
264 // write errors returned by resp.Write(),
265 // which probably means the client has
266 // disconnected and this error will never be
267 // reported to the client -- but it will
268 // appear in our own error log.
269 http.Error(resp, err.Error(), http.StatusInternalServerError)
273 // An empty line at EOF is the only way the client can be
274 // assured the entire index was received.
275 resp.Write([]byte{'\n'})
278 // MountsHandler responds to "GET /mounts" requests.
279 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
280 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
282 http.Error(resp, err.Error(), http.StatusInternalServerError)
287 type PoolStatus struct {
288 Alloc uint64 `json:"BytesAllocated"`
289 Cap int `json:"BuffersMax"`
290 Len int `json:"BuffersInUse"`
293 type volumeStatusEnt struct {
295 Status *VolumeStatus `json:",omitempty"`
296 VolumeStats *ioStats `json:",omitempty"`
297 InternalStats interface{} `json:",omitempty"`
301 type NodeStatus struct {
302 Volumes []*volumeStatusEnt
303 BufferPool PoolStatus
304 PullQueue WorkQueueStatus
305 TrashQueue WorkQueueStatus
311 var stLock sync.Mutex
313 // DebugHandler addresses /debug.json requests.
314 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
315 type debugStats struct {
316 MemStats runtime.MemStats
319 runtime.ReadMemStats(&ds.MemStats)
320 err := json.NewEncoder(resp).Encode(&ds)
322 http.Error(resp, err.Error(), 500)
326 // StatusHandler addresses /status.json requests.
327 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
329 rtr.readNodeStatus(&st)
330 jstat, err := json.Marshal(&st)
335 log.Printf("json.Marshal: %s", err)
336 log.Printf("NodeStatus = %v", &st)
337 http.Error(resp, err.Error(), 500)
341 // populate the given NodeStatus struct with current values.
342 func (rtr *router) readNodeStatus(st *NodeStatus) {
343 vols := KeepVM.AllReadable()
344 if cap(st.Volumes) < len(vols) {
345 st.Volumes = make([]*volumeStatusEnt, len(vols))
347 st.Volumes = st.Volumes[:0]
348 for _, vol := range vols {
349 var internalStats interface{}
350 if vol, ok := vol.(InternalStatser); ok {
351 internalStats = vol.InternalStats()
353 st.Volumes = append(st.Volumes, &volumeStatusEnt{
355 Status: vol.Status(),
356 InternalStats: internalStats,
357 //VolumeStats: KeepVM.VolumeStats(vol),
360 st.BufferPool.Alloc = bufs.Alloc()
361 st.BufferPool.Cap = bufs.Cap()
362 st.BufferPool.Len = bufs.Len()
363 st.PullQueue = getWorkQueueStatus(pullq)
364 st.TrashQueue = getWorkQueueStatus(trashq)
365 if rtr.limiter != nil {
366 st.RequestsCurrent = rtr.limiter.Current()
367 st.RequestsMax = rtr.limiter.Max()
371 // return a WorkQueueStatus for the given queue. If q is nil (which
372 // should never happen except in test suites), return a zero status
373 // value instead of crashing.
374 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
376 // This should only happen during tests.
377 return WorkQueueStatus{}
382 // DeleteHandler processes DELETE requests.
384 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
385 // from all connected volumes.
387 // Only the Data Manager, or an Arvados admin with scope "all", are
388 // allowed to issue DELETE requests. If a DELETE request is not
389 // authenticated or is issued by a non-admin user, the server returns
390 // a PermissionError.
392 // Upon receiving a valid request from an authorized user,
393 // DeleteHandler deletes all copies of the specified block on local
398 // If the requested blocks was not found on any volume, the response
399 // code is HTTP 404 Not Found.
401 // Otherwise, the response code is 200 OK, with a response body
402 // consisting of the JSON message
404 // {"copies_deleted":d,"copies_failed":f}
406 // where d and f are integers representing the number of blocks that
407 // were successfully and unsuccessfully deleted.
409 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
410 hash := mux.Vars(req)["hash"]
412 // Confirm that this user is an admin and has a token with unlimited scope.
413 var tok = GetAPIToken(req)
414 if tok == "" || !CanDelete(tok) {
415 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
419 if !theConfig.EnableDelete {
420 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
424 // Delete copies of this block from all available volumes.
425 // Report how many blocks were successfully deleted, and how
426 // many were found on writable volumes but not deleted.
428 Deleted int `json:"copies_deleted"`
429 Failed int `json:"copies_failed"`
431 for _, vol := range KeepVM.AllWritable() {
432 if err := vol.Trash(hash); err == nil {
434 } else if os.IsNotExist(err) {
438 log.Println("DeleteHandler:", err)
444 if result.Deleted == 0 && result.Failed == 0 {
445 st = http.StatusNotFound
452 if st == http.StatusOK {
453 if body, err := json.Marshal(result); err == nil {
456 log.Printf("json.Marshal: %s (result = %v)", err, result)
457 http.Error(resp, err.Error(), 500)
462 /* PullHandler processes "PUT /pull" requests for the data manager.
463 The request body is a JSON message containing a list of pull
464 requests in the following format:
468 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
470 "keep0.qr1hi.arvadosapi.com:25107",
471 "keep1.qr1hi.arvadosapi.com:25108"
475 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
485 Each pull request in the list consists of a block locator string
486 and an ordered list of servers. Keepstore should try to fetch the
487 block from each server in turn.
489 If the request has not been sent by the Data Manager, return 401
492 If the JSON unmarshalling fails, return 400 Bad Request.
495 // PullRequest consists of a block locator and an ordered list of servers
496 type PullRequest struct {
497 Locator string `json:"locator"`
498 Servers []string `json:"servers"`
500 // Destination mount, or "" for "anywhere"
504 // PullHandler processes "PUT /pull" requests for the data manager.
505 func PullHandler(resp http.ResponseWriter, req *http.Request) {
506 // Reject unauthorized requests.
507 if !IsSystemAuth(GetAPIToken(req)) {
508 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
512 // Parse the request body.
514 r := json.NewDecoder(req.Body)
515 if err := r.Decode(&pr); err != nil {
516 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
520 // We have a properly formatted pull list sent from the data
521 // manager. Report success and send the list to the pull list
522 // manager for further handling.
523 resp.WriteHeader(http.StatusOK)
525 fmt.Sprintf("Received %d pull requests\n", len(pr))))
528 for _, p := range pr {
531 pullq.ReplaceQueue(plist)
534 // TrashRequest consists of a block locator and it's Mtime
535 type TrashRequest struct {
536 Locator string `json:"locator"`
537 BlockMtime int64 `json:"block_mtime"`
539 // Target mount, or "" for "everywhere"
543 // TrashHandler processes /trash requests.
544 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
545 // Reject unauthorized requests.
546 if !IsSystemAuth(GetAPIToken(req)) {
547 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
551 // Parse the request body.
552 var trash []TrashRequest
553 r := json.NewDecoder(req.Body)
554 if err := r.Decode(&trash); err != nil {
555 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
559 // We have a properly formatted trash list sent from the data
560 // manager. Report success and send the list to the trash work
561 // queue for further handling.
562 resp.WriteHeader(http.StatusOK)
564 fmt.Sprintf("Received %d trash requests\n", len(trash))))
567 for _, t := range trash {
570 trashq.ReplaceQueue(tlist)
573 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
574 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
575 // Reject unauthorized requests.
576 if !IsSystemAuth(GetAPIToken(req)) {
577 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
581 hash := mux.Vars(req)["hash"]
583 if len(KeepVM.AllWritable()) == 0 {
584 http.Error(resp, "No writable volumes", http.StatusNotFound)
588 var untrashedOn, failedOn []string
590 for _, vol := range KeepVM.AllWritable() {
591 err := vol.Untrash(hash)
593 if os.IsNotExist(err) {
595 } else if err != nil {
596 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
597 failedOn = append(failedOn, vol.String())
599 log.Printf("Untrashed %v on volume %v", hash, vol.String())
600 untrashedOn = append(untrashedOn, vol.String())
604 if numNotFound == len(KeepVM.AllWritable()) {
605 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
609 if len(failedOn) == len(KeepVM.AllWritable()) {
610 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
612 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
613 if len(failedOn) > 0 {
614 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
616 resp.Write([]byte(respBody))
620 // GetBlock and PutBlock implement lower-level code for handling
621 // blocks by rooting through volumes connected to the local machine.
622 // Once the handler has determined that system policy permits the
623 // request, it calls these methods to perform the actual operation.
625 // TODO(twp): this code would probably be better located in the
626 // VolumeManager interface. As an abstraction, the VolumeManager
627 // should be the only part of the code that cares about which volume a
628 // block is stored on, so it should be responsible for figuring out
629 // which volume to check for fetching blocks, storing blocks, etc.
631 // GetBlock fetches the block identified by "hash" into the provided
632 // buf, and returns the data size.
634 // If the block cannot be found on any volume, returns NotFoundError.
636 // If the block found does not have the correct MD5 hash, returns
639 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
640 // Attempt to read the requested hash from a keep volume.
641 errorToCaller := NotFoundError
643 for _, vol := range KeepVM.AllReadable() {
644 size, err := vol.Get(ctx, hash, buf)
647 return 0, ErrClientDisconnect
651 // IsNotExist is an expected error and may be
652 // ignored. All other errors are logged. In
653 // any case we continue trying to read other
654 // volumes. If all volumes report IsNotExist,
655 // we return a NotFoundError.
656 if !os.IsNotExist(err) {
657 log.Printf("%s: Get(%s): %s", vol, hash, err)
661 // Check the file checksum.
663 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
664 if filehash != hash {
665 // TODO: Try harder to tell a sysadmin about
667 log.Printf("%s: checksum mismatch for request %s (actual %s)",
669 errorToCaller = DiskHashError
672 if errorToCaller == DiskHashError {
673 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
678 return 0, errorToCaller
681 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
683 // PutBlock(ctx, block, hash)
684 // Stores the BLOCK (identified by the content id HASH) in Keep.
686 // The MD5 checksum of the block must be identical to the content id HASH.
687 // If not, an error is returned.
689 // PutBlock stores the BLOCK on the first Keep volume with free space.
690 // A failure code is returned to the user only if all volumes fail.
692 // On success, PutBlock returns nil.
693 // On failure, it returns a KeepError with one of the following codes:
696 // A different block with the same hash already exists on this
699 // The MD5 hash of the BLOCK does not match the argument HASH.
701 // There was not enough space left in any Keep volume to store
704 // The object could not be stored for some other reason (e.g.
705 // all writes failed). The text of the error message should
706 // provide as much detail as possible.
708 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
709 // Check that BLOCK's checksum matches HASH.
710 blockhash := fmt.Sprintf("%x", md5.Sum(block))
711 if blockhash != hash {
712 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
713 return 0, RequestHashError
716 // If we already have this data, it's intact on disk, and we
717 // can update its timestamp, return success. If we have
718 // different data with the same hash, return failure.
719 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
721 } else if ctx.Err() != nil {
722 return 0, ErrClientDisconnect
725 // Choose a Keep volume to write to.
726 // If this volume fails, try all of the volumes in order.
727 if vol := KeepVM.NextWritable(); vol != nil {
728 if err := vol.Put(ctx, hash, block); err == nil {
729 return vol.Replication(), nil // success!
731 if ctx.Err() != nil {
732 return 0, ErrClientDisconnect
736 writables := KeepVM.AllWritable()
737 if len(writables) == 0 {
738 log.Print("No writable volumes.")
743 for _, vol := range writables {
744 err := vol.Put(ctx, hash, block)
745 if ctx.Err() != nil {
746 return 0, ErrClientDisconnect
749 return vol.Replication(), nil // success!
751 if err != FullError {
752 // The volume is not full but the
753 // write did not succeed. Report the
754 // error and continue trying.
756 log.Printf("%s: Write(%s): %s", vol, hash, err)
761 log.Print("All volumes are full.")
764 // Already logged the non-full errors.
765 return 0, GenericError
768 // CompareAndTouch returns the current replication level if one of the
769 // volumes already has the given content and it successfully updates
770 // the relevant block's modification time in order to protect it from
771 // premature garbage collection. Otherwise, it returns a non-nil
773 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
774 var bestErr error = NotFoundError
775 for _, vol := range KeepVM.AllWritable() {
776 err := vol.Compare(ctx, hash, buf)
777 if ctx.Err() != nil {
779 } else if err == CollisionError {
780 // Stop if we have a block with same hash but
781 // different content. (It will be impossible
782 // to tell which one is wanted if we have
783 // both, so there's no point writing it even
784 // on a different volume.)
785 log.Printf("%s: Compare(%s): %s", vol, hash, err)
787 } else if os.IsNotExist(err) {
788 // Block does not exist. This is the only
789 // "normal" error: we don't log anything.
791 } else if err != nil {
792 // Couldn't open file, data is corrupt on
793 // disk, etc.: log this abnormal condition,
794 // and try the next volume.
795 log.Printf("%s: Compare(%s): %s", vol, hash, err)
798 if err := vol.Touch(hash); err != nil {
799 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
803 // Compare and Touch both worked --> done.
804 return vol.Replication(), nil
809 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
811 // IsValidLocator returns true if the specified string is a valid Keep locator.
812 // When Keep is extended to support hash types other than MD5,
813 // this should be updated to cover those as well.
815 func IsValidLocator(loc string) bool {
816 return validLocatorRe.MatchString(loc)
819 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
821 // GetAPIToken returns the OAuth2 token from the Authorization
822 // header of a HTTP request, or an empty string if no matching
824 func GetAPIToken(req *http.Request) string {
825 if auth, ok := req.Header["Authorization"]; ok {
826 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
833 // IsExpired returns true if the given Unix timestamp (expressed as a
834 // hexadecimal string) is in the past, or if timestampHex cannot be
835 // parsed as a hexadecimal string.
836 func IsExpired(timestampHex string) bool {
837 ts, err := strconv.ParseInt(timestampHex, 16, 0)
839 log.Printf("IsExpired: %s", err)
842 return time.Unix(ts, 0).Before(time.Now())
845 // CanDelete returns true if the user identified by apiToken is
846 // allowed to delete blocks.
847 func CanDelete(apiToken string) bool {
851 // Blocks may be deleted only when Keep has been configured with a
853 if IsSystemAuth(apiToken) {
856 // TODO(twp): look up apiToken with the API server
857 // return true if is_admin is true and if the token
858 // has unlimited scope
862 // IsSystemAuth returns true if the given token is allowed to perform
863 // system level actions like deleting data.
864 func IsSystemAuth(token string) bool {
865 return token != "" && token == theConfig.systemAuthToken