3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
26 "github.com/gorilla/mux"
28 "git.curoverse.com/arvados.git/sdk/go/httpserver"
29 log "github.com/Sirupsen/logrus"
34 limiter httpserver.RequestCounter
37 // MakeRESTRouter returns a new router that forwards all Keep requests
38 // to the appropriate handlers.
39 func MakeRESTRouter() *router {
40 rest := mux.NewRouter()
41 rtr := &router{Router: rest}
44 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
46 `/{hash:[0-9a-f]{32}}+{hints}`,
47 GetBlockHandler).Methods("GET", "HEAD")
49 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
50 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
51 // List all blocks stored here. Privileged client only.
52 rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
53 // List blocks stored here whose hash has the given prefix.
54 // Privileged client only.
55 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
57 // Internals/debugging info (runtime.MemStats)
58 rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
60 // List volumes: path, device number, bytes used/avail.
61 rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
63 // List mounts: UUID, readonly, tier, device ID, ...
64 rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
65 rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
66 rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
68 // Replace the current pull queue.
69 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
71 // Replace the current trash queue.
72 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74 // Untrash moves blocks from trash back into store
75 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
77 // Any request which does not match any of these routes gets
79 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
84 // BadRequestHandler is a HandleFunc to address bad requests.
85 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
86 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
89 // GetBlockHandler is a HandleFunc to address Get block requests.
90 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
91 ctx, cancel := contextForResponse(context.TODO(), resp)
94 if theConfig.RequireSignatures {
95 locator := req.URL.Path[1:] // strip leading slash
96 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
97 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
102 // TODO: Probe volumes to check whether the block _might_
103 // exist. Some volumes/types could support a quick existence
104 // check without causing other operations to suffer. If all
105 // volumes support that, and assure us the block definitely
106 // isn't here, we can return 404 now instead of waiting for a
109 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
111 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
116 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
118 code := http.StatusInternalServerError
119 if err, ok := err.(*KeepError); ok {
122 http.Error(resp, err.Error(), code)
126 resp.Header().Set("Content-Length", strconv.Itoa(size))
127 resp.Header().Set("Content-Type", "application/octet-stream")
128 resp.Write(buf[:size])
131 // Return a new context that gets cancelled by resp's CloseNotifier.
132 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
133 ctx, cancel := context.WithCancel(parent)
134 if cn, ok := resp.(http.CloseNotifier); ok {
135 go func(c <-chan bool) {
138 theConfig.debugLogf("cancel context")
147 // Get a buffer from the pool -- but give up and return a non-nil
148 // error if ctx ends before we get a buffer.
149 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
150 bufReady := make(chan []byte)
152 bufReady <- bufs.Get(bufSize)
155 case buf := <-bufReady:
159 // Even if closeNotifier happened first, we
160 // need to keep waiting for our buf so we can
161 // return it to the pool.
164 return nil, ErrClientDisconnect
168 // PutBlockHandler is a HandleFunc to address Put block requests.
169 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
170 ctx, cancel := contextForResponse(context.TODO(), resp)
173 hash := mux.Vars(req)["hash"]
175 // Detect as many error conditions as possible before reading
176 // the body: avoid transmitting data that will not end up
177 // being written anyway.
179 if req.ContentLength == -1 {
180 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
184 if req.ContentLength > BlockSize {
185 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
189 if len(KeepVM.AllWritable()) == 0 {
190 http.Error(resp, FullError.Error(), FullError.HTTPCode)
194 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
196 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
200 _, err = io.ReadFull(req.Body, buf)
202 http.Error(resp, err.Error(), 500)
207 replication, err := PutBlock(ctx, buf, hash)
211 code := http.StatusInternalServerError
212 if err, ok := err.(*KeepError); ok {
215 http.Error(resp, err.Error(), code)
219 // Success; add a size hint, sign the locator if possible, and
220 // return it to the client.
221 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
222 apiToken := GetAPIToken(req)
223 if theConfig.blobSigningKey != nil && apiToken != "" {
224 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
225 returnHash = SignLocator(returnHash, apiToken, expiry)
227 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
228 resp.Write([]byte(returnHash + "\n"))
231 // IndexHandler responds to "/index", "/index/{prefix}", and
232 // "/mounts/{uuid}/blocks" requests.
233 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
234 if !IsSystemAuth(GetAPIToken(req)) {
235 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
239 prefix := mux.Vars(req)["prefix"]
242 prefix = req.Form.Get("prefix")
245 uuid := mux.Vars(req)["uuid"]
249 vols = KeepVM.AllReadable()
250 } else if v := KeepVM.Lookup(uuid, false); v == nil {
251 http.Error(resp, "mount not found", http.StatusNotFound)
257 for _, v := range vols {
258 if err := v.IndexTo(prefix, resp); err != nil {
259 // The only errors returned by IndexTo are
260 // write errors returned by resp.Write(),
261 // which probably means the client has
262 // disconnected and this error will never be
263 // reported to the client -- but it will
264 // appear in our own error log.
265 http.Error(resp, err.Error(), http.StatusInternalServerError)
269 // An empty line at EOF is the only way the client can be
270 // assured the entire index was received.
271 resp.Write([]byte{'\n'})
274 // MountsHandler responds to "GET /mounts" requests.
275 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
276 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
278 http.Error(resp, err.Error(), http.StatusInternalServerError)
283 type PoolStatus struct {
284 Alloc uint64 `json:"BytesAllocated"`
285 Cap int `json:"BuffersMax"`
286 Len int `json:"BuffersInUse"`
289 type volumeStatusEnt struct {
291 Status *VolumeStatus `json:",omitempty"`
292 VolumeStats *ioStats `json:",omitempty"`
293 InternalStats interface{} `json:",omitempty"`
297 type NodeStatus struct {
298 Volumes []*volumeStatusEnt
299 BufferPool PoolStatus
300 PullQueue WorkQueueStatus
301 TrashQueue WorkQueueStatus
307 var stLock sync.Mutex
309 // DebugHandler addresses /debug.json requests.
310 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
311 type debugStats struct {
312 MemStats runtime.MemStats
315 runtime.ReadMemStats(&ds.MemStats)
316 err := json.NewEncoder(resp).Encode(&ds)
318 http.Error(resp, err.Error(), 500)
322 // StatusHandler addresses /status.json requests.
323 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
325 rtr.readNodeStatus(&st)
326 jstat, err := json.Marshal(&st)
331 log.Printf("json.Marshal: %s", err)
332 log.Printf("NodeStatus = %v", &st)
333 http.Error(resp, err.Error(), 500)
337 // populate the given NodeStatus struct with current values.
338 func (rtr *router) readNodeStatus(st *NodeStatus) {
339 vols := KeepVM.AllReadable()
340 if cap(st.Volumes) < len(vols) {
341 st.Volumes = make([]*volumeStatusEnt, len(vols))
343 st.Volumes = st.Volumes[:0]
344 for _, vol := range vols {
345 var internalStats interface{}
346 if vol, ok := vol.(InternalStatser); ok {
347 internalStats = vol.InternalStats()
349 st.Volumes = append(st.Volumes, &volumeStatusEnt{
351 Status: vol.Status(),
352 InternalStats: internalStats,
353 //VolumeStats: KeepVM.VolumeStats(vol),
356 st.BufferPool.Alloc = bufs.Alloc()
357 st.BufferPool.Cap = bufs.Cap()
358 st.BufferPool.Len = bufs.Len()
359 st.PullQueue = getWorkQueueStatus(pullq)
360 st.TrashQueue = getWorkQueueStatus(trashq)
361 if rtr.limiter != nil {
362 st.RequestsCurrent = rtr.limiter.Current()
363 st.RequestsMax = rtr.limiter.Max()
367 // return a WorkQueueStatus for the given queue. If q is nil (which
368 // should never happen except in test suites), return a zero status
369 // value instead of crashing.
370 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
372 // This should only happen during tests.
373 return WorkQueueStatus{}
378 // DeleteHandler processes DELETE requests.
380 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
381 // from all connected volumes.
383 // Only the Data Manager, or an Arvados admin with scope "all", are
384 // allowed to issue DELETE requests. If a DELETE request is not
385 // authenticated or is issued by a non-admin user, the server returns
386 // a PermissionError.
388 // Upon receiving a valid request from an authorized user,
389 // DeleteHandler deletes all copies of the specified block on local
394 // If the requested blocks was not found on any volume, the response
395 // code is HTTP 404 Not Found.
397 // Otherwise, the response code is 200 OK, with a response body
398 // consisting of the JSON message
400 // {"copies_deleted":d,"copies_failed":f}
402 // where d and f are integers representing the number of blocks that
403 // were successfully and unsuccessfully deleted.
405 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
406 hash := mux.Vars(req)["hash"]
408 // Confirm that this user is an admin and has a token with unlimited scope.
409 var tok = GetAPIToken(req)
410 if tok == "" || !CanDelete(tok) {
411 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
415 if !theConfig.EnableDelete {
416 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
420 // Delete copies of this block from all available volumes.
421 // Report how many blocks were successfully deleted, and how
422 // many were found on writable volumes but not deleted.
424 Deleted int `json:"copies_deleted"`
425 Failed int `json:"copies_failed"`
427 for _, vol := range KeepVM.AllWritable() {
428 if err := vol.Trash(hash); err == nil {
430 } else if os.IsNotExist(err) {
434 log.Println("DeleteHandler:", err)
440 if result.Deleted == 0 && result.Failed == 0 {
441 st = http.StatusNotFound
448 if st == http.StatusOK {
449 if body, err := json.Marshal(result); err == nil {
452 log.Printf("json.Marshal: %s (result = %v)", err, result)
453 http.Error(resp, err.Error(), 500)
458 /* PullHandler processes "PUT /pull" requests for the data manager.
459 The request body is a JSON message containing a list of pull
460 requests in the following format:
464 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
466 "keep0.qr1hi.arvadosapi.com:25107",
467 "keep1.qr1hi.arvadosapi.com:25108"
471 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
481 Each pull request in the list consists of a block locator string
482 and an ordered list of servers. Keepstore should try to fetch the
483 block from each server in turn.
485 If the request has not been sent by the Data Manager, return 401
488 If the JSON unmarshalling fails, return 400 Bad Request.
491 // PullRequest consists of a block locator and an ordered list of servers
492 type PullRequest struct {
493 Locator string `json:"locator"`
494 Servers []string `json:"servers"`
496 // Destination mount, or "" for "anywhere"
500 // PullHandler processes "PUT /pull" requests for the data manager.
501 func PullHandler(resp http.ResponseWriter, req *http.Request) {
502 // Reject unauthorized requests.
503 if !IsSystemAuth(GetAPIToken(req)) {
504 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
508 // Parse the request body.
510 r := json.NewDecoder(req.Body)
511 if err := r.Decode(&pr); err != nil {
512 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
516 // We have a properly formatted pull list sent from the data
517 // manager. Report success and send the list to the pull list
518 // manager for further handling.
519 resp.WriteHeader(http.StatusOK)
521 fmt.Sprintf("Received %d pull requests\n", len(pr))))
524 for _, p := range pr {
527 pullq.ReplaceQueue(plist)
530 // TrashRequest consists of a block locator and it's Mtime
531 type TrashRequest struct {
532 Locator string `json:"locator"`
533 BlockMtime int64 `json:"block_mtime"`
535 // Target mount, or "" for "everywhere"
539 // TrashHandler processes /trash requests.
540 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
541 // Reject unauthorized requests.
542 if !IsSystemAuth(GetAPIToken(req)) {
543 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
547 // Parse the request body.
548 var trash []TrashRequest
549 r := json.NewDecoder(req.Body)
550 if err := r.Decode(&trash); err != nil {
551 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
555 // We have a properly formatted trash list sent from the data
556 // manager. Report success and send the list to the trash work
557 // queue for further handling.
558 resp.WriteHeader(http.StatusOK)
560 fmt.Sprintf("Received %d trash requests\n", len(trash))))
563 for _, t := range trash {
566 trashq.ReplaceQueue(tlist)
569 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
570 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
571 // Reject unauthorized requests.
572 if !IsSystemAuth(GetAPIToken(req)) {
573 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
577 hash := mux.Vars(req)["hash"]
579 if len(KeepVM.AllWritable()) == 0 {
580 http.Error(resp, "No writable volumes", http.StatusNotFound)
584 var untrashedOn, failedOn []string
586 for _, vol := range KeepVM.AllWritable() {
587 err := vol.Untrash(hash)
589 if os.IsNotExist(err) {
591 } else if err != nil {
592 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
593 failedOn = append(failedOn, vol.String())
595 log.Printf("Untrashed %v on volume %v", hash, vol.String())
596 untrashedOn = append(untrashedOn, vol.String())
600 if numNotFound == len(KeepVM.AllWritable()) {
601 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
605 if len(failedOn) == len(KeepVM.AllWritable()) {
606 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
608 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
609 if len(failedOn) > 0 {
610 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
612 resp.Write([]byte(respBody))
616 // GetBlock and PutBlock implement lower-level code for handling
617 // blocks by rooting through volumes connected to the local machine.
618 // Once the handler has determined that system policy permits the
619 // request, it calls these methods to perform the actual operation.
621 // TODO(twp): this code would probably be better located in the
622 // VolumeManager interface. As an abstraction, the VolumeManager
623 // should be the only part of the code that cares about which volume a
624 // block is stored on, so it should be responsible for figuring out
625 // which volume to check for fetching blocks, storing blocks, etc.
627 // GetBlock fetches the block identified by "hash" into the provided
628 // buf, and returns the data size.
630 // If the block cannot be found on any volume, returns NotFoundError.
632 // If the block found does not have the correct MD5 hash, returns
635 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
636 // Attempt to read the requested hash from a keep volume.
637 errorToCaller := NotFoundError
639 for _, vol := range KeepVM.AllReadable() {
640 size, err := vol.Get(ctx, hash, buf)
643 return 0, ErrClientDisconnect
647 // IsNotExist is an expected error and may be
648 // ignored. All other errors are logged. In
649 // any case we continue trying to read other
650 // volumes. If all volumes report IsNotExist,
651 // we return a NotFoundError.
652 if !os.IsNotExist(err) {
653 log.Printf("%s: Get(%s): %s", vol, hash, err)
657 // Check the file checksum.
659 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
660 if filehash != hash {
661 // TODO: Try harder to tell a sysadmin about
663 log.Printf("%s: checksum mismatch for request %s (actual %s)",
665 errorToCaller = DiskHashError
668 if errorToCaller == DiskHashError {
669 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
674 return 0, errorToCaller
677 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
679 // PutBlock(ctx, block, hash)
680 // Stores the BLOCK (identified by the content id HASH) in Keep.
682 // The MD5 checksum of the block must be identical to the content id HASH.
683 // If not, an error is returned.
685 // PutBlock stores the BLOCK on the first Keep volume with free space.
686 // A failure code is returned to the user only if all volumes fail.
688 // On success, PutBlock returns nil.
689 // On failure, it returns a KeepError with one of the following codes:
692 // A different block with the same hash already exists on this
695 // The MD5 hash of the BLOCK does not match the argument HASH.
697 // There was not enough space left in any Keep volume to store
700 // The object could not be stored for some other reason (e.g.
701 // all writes failed). The text of the error message should
702 // provide as much detail as possible.
704 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
705 // Check that BLOCK's checksum matches HASH.
706 blockhash := fmt.Sprintf("%x", md5.Sum(block))
707 if blockhash != hash {
708 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
709 return 0, RequestHashError
712 // If we already have this data, it's intact on disk, and we
713 // can update its timestamp, return success. If we have
714 // different data with the same hash, return failure.
715 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
717 } else if ctx.Err() != nil {
718 return 0, ErrClientDisconnect
721 // Choose a Keep volume to write to.
722 // If this volume fails, try all of the volumes in order.
723 if vol := KeepVM.NextWritable(); vol != nil {
724 if err := vol.Put(ctx, hash, block); err == nil {
725 return vol.Replication(), nil // success!
727 if ctx.Err() != nil {
728 return 0, ErrClientDisconnect
732 writables := KeepVM.AllWritable()
733 if len(writables) == 0 {
734 log.Print("No writable volumes.")
739 for _, vol := range writables {
740 err := vol.Put(ctx, hash, block)
741 if ctx.Err() != nil {
742 return 0, ErrClientDisconnect
745 return vol.Replication(), nil // success!
747 if err != FullError {
748 // The volume is not full but the
749 // write did not succeed. Report the
750 // error and continue trying.
752 log.Printf("%s: Write(%s): %s", vol, hash, err)
757 log.Print("All volumes are full.")
760 // Already logged the non-full errors.
761 return 0, GenericError
764 // CompareAndTouch returns the current replication level if one of the
765 // volumes already has the given content and it successfully updates
766 // the relevant block's modification time in order to protect it from
767 // premature garbage collection. Otherwise, it returns a non-nil
769 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
770 var bestErr error = NotFoundError
771 for _, vol := range KeepVM.AllWritable() {
772 err := vol.Compare(ctx, hash, buf)
773 if ctx.Err() != nil {
775 } else if err == CollisionError {
776 // Stop if we have a block with same hash but
777 // different content. (It will be impossible
778 // to tell which one is wanted if we have
779 // both, so there's no point writing it even
780 // on a different volume.)
781 log.Printf("%s: Compare(%s): %s", vol, hash, err)
783 } else if os.IsNotExist(err) {
784 // Block does not exist. This is the only
785 // "normal" error: we don't log anything.
787 } else if err != nil {
788 // Couldn't open file, data is corrupt on
789 // disk, etc.: log this abnormal condition,
790 // and try the next volume.
791 log.Printf("%s: Compare(%s): %s", vol, hash, err)
794 if err := vol.Touch(hash); err != nil {
795 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
799 // Compare and Touch both worked --> done.
800 return vol.Replication(), nil
805 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
807 // IsValidLocator returns true if the specified string is a valid Keep locator.
808 // When Keep is extended to support hash types other than MD5,
809 // this should be updated to cover those as well.
811 func IsValidLocator(loc string) bool {
812 return validLocatorRe.MatchString(loc)
815 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
817 // GetAPIToken returns the OAuth2 token from the Authorization
818 // header of a HTTP request, or an empty string if no matching
820 func GetAPIToken(req *http.Request) string {
821 if auth, ok := req.Header["Authorization"]; ok {
822 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
829 // IsExpired returns true if the given Unix timestamp (expressed as a
830 // hexadecimal string) is in the past, or if timestampHex cannot be
831 // parsed as a hexadecimal string.
832 func IsExpired(timestampHex string) bool {
833 ts, err := strconv.ParseInt(timestampHex, 16, 0)
835 log.Printf("IsExpired: %s", err)
838 return time.Unix(ts, 0).Before(time.Now())
841 // CanDelete returns true if the user identified by apiToken is
842 // allowed to delete blocks.
843 func CanDelete(apiToken string) bool {
847 // Blocks may be deleted only when Keep has been configured with a
849 if IsSystemAuth(apiToken) {
852 // TODO(twp): look up apiToken with the API server
853 // return true if is_admin is true and if the token
854 // has unlimited scope
858 // IsSystemAuth returns true if the given token is allowed to perform
859 // system level actions like deleting data.
860 func IsSystemAuth(token string) bool {
861 return token != "" && token == theConfig.systemAuthToken