5748: Clean up comments and variable names.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "syscall"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43
44         // For IndexHandler we support:
45         //   /index           - returns all locators
46         //   /index/{prefix}  - returns all locators that begin with {prefix}
47         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48         //      If {prefix} is the empty string, return an index of all locators
49         //      (so /index and /index/ behave identically)
50         //      A client may supply a full 32-digit locator string, in which
51         //      case the server will return an index with either zero or one
52         //      entries. This usage allows a client to check whether a block is
53         //      present, and its size and upload time, without retrieving the
54         //      entire block.
55         //
56         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
57         rest.HandleFunc(
58                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
60
61         // The PullHandler and TrashHandler process "PUT /pull" and "PUT
62         // /trash" requests from Data Manager.  These requests instruct
63         // Keep to replicate or delete blocks; see
64         // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
65         // for more details.
66         //
67         // Each handler parses the JSON list of block management requests
68         // in the message body, and replaces any existing pull queue or
69         // trash queue with their contentes.
70         //
71         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
73
74         // Any request which does not match any of these routes gets
75         // 400 Bad Request.
76         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
77
78         return rest
79 }
80
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
83 }
84
85 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
86         hash := mux.Vars(req)["hash"]
87
88         hints := mux.Vars(req)["hints"]
89
90         // Parse the locator string and hints from the request.
91         // TODO(twp): implement a Locator type.
92         var signature, timestamp string
93         if hints != "" {
94                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
95                 for _, hint := range strings.Split(hints, "+") {
96                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
97                                 // Server ignores size hints
98                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
99                                 signature = m[1]
100                                 timestamp = m[2]
101                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
102                                 // Any unknown hint that starts with an uppercase letter is
103                                 // presumed to be valid and ignored, to permit forward compatibility.
104                         } else {
105                                 // Unknown format; not a valid locator.
106                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
107                                 return
108                         }
109                 }
110         }
111
112         // If permission checking is in effect, verify this
113         // request's permission signature.
114         if enforce_permissions {
115                 if signature == "" || timestamp == "" {
116                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
117                         return
118                 } else if IsExpired(timestamp) {
119                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
120                         return
121                 } else {
122                         req_locator := req.URL.Path[1:] // strip leading slash
123                         if !VerifySignature(req_locator, GetApiToken(req)) {
124                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
125                                 return
126                         }
127                 }
128         }
129
130         block, err := GetBlock(hash, false)
131
132         // Garbage collect after each GET. Fixes #2865.
133         // TODO(twp): review Keep memory usage and see if there's
134         // a better way to do this than blindly garbage collecting
135         // after every block.
136         defer runtime.GC()
137
138         if err != nil {
139                 // This type assertion is safe because the only errors
140                 // GetBlock can return are DiskHashError or NotFoundError.
141                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
142                 return
143         }
144
145         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
146
147         _, err = resp.Write(block)
148
149         return
150 }
151
152 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
153         // Garbage collect after each PUT. Fixes #2865.
154         // See also GetBlockHandler.
155         defer runtime.GC()
156
157         hash := mux.Vars(req)["hash"]
158
159         // Detect as many error conditions as possible before reading
160         // the body: avoid transmitting data that will not end up
161         // being written anyway.
162
163         if req.ContentLength == -1 {
164                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
165                 return
166         }
167
168         if req.ContentLength > BLOCKSIZE {
169                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
170                 return
171         }
172
173         if len(KeepVM.AllWritable()) == 0 {
174                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
175                 return
176         }
177
178         buf := make([]byte, req.ContentLength)
179         nread, err := io.ReadFull(req.Body, buf)
180         if err != nil {
181                 http.Error(resp, err.Error(), 500)
182                 return
183         } else if int64(nread) < req.ContentLength {
184                 http.Error(resp, "request truncated", 500)
185                 return
186         }
187
188         err = PutBlock(buf, hash)
189         if err != nil {
190                 ke := err.(*KeepError)
191                 http.Error(resp, ke.Error(), ke.HTTPCode)
192                 return
193         }
194
195         // Success; add a size hint, sign the locator if possible, and
196         // return it to the client.
197         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
198         api_token := GetApiToken(req)
199         if PermissionSecret != nil && api_token != "" {
200                 expiry := time.Now().Add(blob_signature_ttl)
201                 return_hash = SignLocator(return_hash, api_token, expiry)
202         }
203         resp.Write([]byte(return_hash + "\n"))
204 }
205
206 // IndexHandler
207 //     A HandleFunc to address /index and /index/{prefix} requests.
208 //
209 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
210         // Reject unauthorized requests.
211         if !IsDataManagerToken(GetApiToken(req)) {
212                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
213                 return
214         }
215
216         prefix := mux.Vars(req)["prefix"]
217
218         for _, vol := range KeepVM.AllReadable() {
219                 if err := vol.IndexTo(prefix, resp); err != nil {
220                         // The only errors returned by IndexTo are
221                         // write errors returned by resp.Write(),
222                         // which probably means the client has
223                         // disconnected and this error will never be
224                         // reported to the client -- but it will
225                         // appear in our own error log.
226                         http.Error(resp, err.Error(), http.StatusInternalServerError)
227                         return
228                 }
229         }
230 }
231
232 // StatusHandler
233 //     Responds to /status.json requests with the current node status,
234 //     described in a JSON structure.
235 //
236 //     The data given in a status.json response includes:
237 //        volumes - a list of Keep volumes currently in use by this server
238 //          each volume is an object with the following fields:
239 //            * mount_point
240 //            * device_num (an integer identifying the underlying filesystem)
241 //            * bytes_free
242 //            * bytes_used
243 //
244 type VolumeStatus struct {
245         MountPoint string `json:"mount_point"`
246         DeviceNum  uint64 `json:"device_num"`
247         BytesFree  uint64 `json:"bytes_free"`
248         BytesUsed  uint64 `json:"bytes_used"`
249 }
250
251 type NodeStatus struct {
252         Volumes []*VolumeStatus `json:"volumes"`
253 }
254
255 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
256         st := GetNodeStatus()
257         if jstat, err := json.Marshal(st); err == nil {
258                 resp.Write(jstat)
259         } else {
260                 log.Printf("json.Marshal: %s\n", err)
261                 log.Printf("NodeStatus = %v\n", st)
262                 http.Error(resp, err.Error(), 500)
263         }
264 }
265
266 // GetNodeStatus
267 //     Returns a NodeStatus struct describing this Keep
268 //     node's current status.
269 //
270 func GetNodeStatus() *NodeStatus {
271         st := new(NodeStatus)
272
273         st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
274         for i, vol := range KeepVM.AllReadable() {
275                 st.Volumes[i] = vol.Status()
276         }
277         return st
278 }
279
280 // GetVolumeStatus
281 //     Returns a VolumeStatus describing the requested volume.
282 //
283 func GetVolumeStatus(volume string) *VolumeStatus {
284         var fs syscall.Statfs_t
285         var devnum uint64
286
287         if fi, err := os.Stat(volume); err == nil {
288                 devnum = fi.Sys().(*syscall.Stat_t).Dev
289         } else {
290                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
291                 return nil
292         }
293
294         err := syscall.Statfs(volume, &fs)
295         if err != nil {
296                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
297                 return nil
298         }
299         // These calculations match the way df calculates disk usage:
300         // "free" space is measured by fs.Bavail, but "used" space
301         // uses fs.Blocks - fs.Bfree.
302         free := fs.Bavail * uint64(fs.Bsize)
303         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
304         return &VolumeStatus{volume, devnum, free, used}
305 }
306
307 // DeleteHandler processes DELETE requests.
308 //
309 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
310 // from all connected volumes.
311 //
312 // Only the Data Manager, or an Arvados admin with scope "all", are
313 // allowed to issue DELETE requests.  If a DELETE request is not
314 // authenticated or is issued by a non-admin user, the server returns
315 // a PermissionError.
316 //
317 // Upon receiving a valid request from an authorized user,
318 // DeleteHandler deletes all copies of the specified block on local
319 // writable volumes.
320 //
321 // Response format:
322 //
323 // If the requested blocks was not found on any volume, the response
324 // code is HTTP 404 Not Found.
325 //
326 // Otherwise, the response code is 200 OK, with a response body
327 // consisting of the JSON message
328 //
329 //    {"copies_deleted":d,"copies_failed":f}
330 //
331 // where d and f are integers representing the number of blocks that
332 // were successfully and unsuccessfully deleted.
333 //
334 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
335         hash := mux.Vars(req)["hash"]
336
337         // Confirm that this user is an admin and has a token with unlimited scope.
338         var tok = GetApiToken(req)
339         if tok == "" || !CanDelete(tok) {
340                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
341                 return
342         }
343
344         if never_delete {
345                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
346                 return
347         }
348
349         // Delete copies of this block from all available volumes.
350         // Report how many blocks were successfully deleted, and how
351         // many were found on writable volumes but not deleted.
352         var result struct {
353                 Deleted int `json:"copies_deleted"`
354                 Failed  int `json:"copies_failed"`
355         }
356         for _, vol := range KeepVM.AllWritable() {
357                 if err := vol.Delete(hash); err == nil {
358                         result.Deleted++
359                 } else if os.IsNotExist(err) {
360                         continue
361                 } else {
362                         result.Failed++
363                         log.Println("DeleteHandler:", err)
364                 }
365         }
366
367         var st int
368
369         if result.Deleted == 0 && result.Failed == 0 {
370                 st = http.StatusNotFound
371         } else {
372                 st = http.StatusOK
373         }
374
375         resp.WriteHeader(st)
376
377         if st == http.StatusOK {
378                 if body, err := json.Marshal(result); err == nil {
379                         resp.Write(body)
380                 } else {
381                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
382                         http.Error(resp, err.Error(), 500)
383                 }
384         }
385 }
386
387 /* PullHandler processes "PUT /pull" requests for the data manager.
388    The request body is a JSON message containing a list of pull
389    requests in the following format:
390
391    [
392       {
393          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
394          "servers":[
395                         "keep0.qr1hi.arvadosapi.com:25107",
396                         "keep1.qr1hi.arvadosapi.com:25108"
397                  ]
398           },
399           {
400                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
401                  "servers":[
402                         "10.0.1.5:25107",
403                         "10.0.1.6:25107",
404                         "10.0.1.7:25108"
405                  ]
406           },
407           ...
408    ]
409
410    Each pull request in the list consists of a block locator string
411    and an ordered list of servers.  Keepstore should try to fetch the
412    block from each server in turn.
413
414    If the request has not been sent by the Data Manager, return 401
415    Unauthorized.
416
417    If the JSON unmarshalling fails, return 400 Bad Request.
418 */
419
420 type PullRequest struct {
421         Locator string   `json:"locator"`
422         Servers []string `json:"servers"`
423 }
424
425 func PullHandler(resp http.ResponseWriter, req *http.Request) {
426         // Reject unauthorized requests.
427         if !IsDataManagerToken(GetApiToken(req)) {
428                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
429                 return
430         }
431
432         // Parse the request body.
433         var pr []PullRequest
434         r := json.NewDecoder(req.Body)
435         if err := r.Decode(&pr); err != nil {
436                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
437                 return
438         }
439
440         // We have a properly formatted pull list sent from the data
441         // manager.  Report success and send the list to the pull list
442         // manager for further handling.
443         resp.WriteHeader(http.StatusOK)
444         resp.Write([]byte(
445                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
446
447         plist := list.New()
448         for _, p := range pr {
449                 plist.PushBack(p)
450         }
451         pullq.ReplaceQueue(plist)
452 }
453
454 type TrashRequest struct {
455         Locator    string `json:"locator"`
456         BlockMtime int64  `json:"block_mtime"`
457 }
458
459 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
460         // Reject unauthorized requests.
461         if !IsDataManagerToken(GetApiToken(req)) {
462                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
463                 return
464         }
465
466         // Parse the request body.
467         var trash []TrashRequest
468         r := json.NewDecoder(req.Body)
469         if err := r.Decode(&trash); err != nil {
470                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
471                 return
472         }
473
474         // We have a properly formatted trash list sent from the data
475         // manager.  Report success and send the list to the trash work
476         // queue for further handling.
477         resp.WriteHeader(http.StatusOK)
478         resp.Write([]byte(
479                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
480
481         tlist := list.New()
482         for _, t := range trash {
483                 tlist.PushBack(t)
484         }
485         trashq.ReplaceQueue(tlist)
486 }
487
488 // ==============================
489 // GetBlock and PutBlock implement lower-level code for handling
490 // blocks by rooting through volumes connected to the local machine.
491 // Once the handler has determined that system policy permits the
492 // request, it calls these methods to perform the actual operation.
493 //
494 // TODO(twp): this code would probably be better located in the
495 // VolumeManager interface. As an abstraction, the VolumeManager
496 // should be the only part of the code that cares about which volume a
497 // block is stored on, so it should be responsible for figuring out
498 // which volume to check for fetching blocks, storing blocks, etc.
499
500 // ==============================
501 // GetBlock fetches and returns the block identified by "hash".  If
502 // the update_timestamp argument is true, GetBlock also updates the
503 // block's file modification time (for the sake of PutBlock, which
504 // must update the file's timestamp when the block already exists).
505 //
506 // On success, GetBlock returns a byte slice with the block data, and
507 // a nil error.
508 //
509 // If the block cannot be found on any volume, returns NotFoundError.
510 //
511 // If the block found does not have the correct MD5 hash, returns
512 // DiskHashError.
513 //
514
515 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
516         // Attempt to read the requested hash from a keep volume.
517         error_to_caller := NotFoundError
518
519         var vols []Volume
520         if update_timestamp {
521                 // Pointless to find the block on an unwritable volume
522                 // because Touch() will fail -- this is as good as
523                 // "not found" for purposes of callers who need to
524                 // update_timestamp.
525                 vols = KeepVM.AllWritable()
526         } else {
527                 vols = KeepVM.AllReadable()
528         }
529
530         for _, vol := range vols {
531                 buf, err := vol.Get(hash)
532                 if err != nil {
533                         // IsNotExist is an expected error and may be
534                         // ignored. All other errors are logged. In
535                         // any case we continue trying to read other
536                         // volumes. If all volumes report IsNotExist,
537                         // we return a NotFoundError.
538                         if !os.IsNotExist(err) {
539                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
540                         }
541                         continue
542                 }
543                 // Check the file checksum.
544                 //
545                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
546                 if filehash != hash {
547                         // TODO: Try harder to tell a sysadmin about
548                         // this.
549                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
550                                 vol, hash, filehash)
551                         error_to_caller = DiskHashError
552                         continue
553                 }
554                 if error_to_caller == DiskHashError {
555                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
556                                 vol, hash)
557                 }
558                 if update_timestamp {
559                         if err := vol.Touch(hash); err != nil {
560                                 error_to_caller = GenericError
561                                 log.Printf("%s: Touch %s failed: %s",
562                                         vol, hash, error_to_caller)
563                                 continue
564                         }
565                 }
566                 return buf, nil
567         }
568         return nil, error_to_caller
569 }
570
571 /* PutBlock(block, hash)
572    Stores the BLOCK (identified by the content id HASH) in Keep.
573
574    The MD5 checksum of the block must be identical to the content id HASH.
575    If not, an error is returned.
576
577    PutBlock stores the BLOCK on the first Keep volume with free space.
578    A failure code is returned to the user only if all volumes fail.
579
580    On success, PutBlock returns nil.
581    On failure, it returns a KeepError with one of the following codes:
582
583    500 Collision
584           A different block with the same hash already exists on this
585           Keep server.
586    422 MD5Fail
587           The MD5 hash of the BLOCK does not match the argument HASH.
588    503 Full
589           There was not enough space left in any Keep volume to store
590           the object.
591    500 Fail
592           The object could not be stored for some other reason (e.g.
593           all writes failed). The text of the error message should
594           provide as much detail as possible.
595 */
596
597 func PutBlock(block []byte, hash string) error {
598         // Check that BLOCK's checksum matches HASH.
599         blockhash := fmt.Sprintf("%x", md5.Sum(block))
600         if blockhash != hash {
601                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
602                 return RequestHashError
603         }
604
605         // If we already have a block on disk under this identifier, return
606         // success (but check for MD5 collisions).  While fetching the block,
607         // update its timestamp.
608         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
609         // In either case, we want to write our new (good) block to disk,
610         // so there is nothing special to do if err != nil.
611         //
612         if oldblock, err := GetBlock(hash, true); err == nil {
613                 if bytes.Compare(block, oldblock) == 0 {
614                         // The block already exists; return success.
615                         return nil
616                 } else {
617                         return CollisionError
618                 }
619         }
620
621         // Choose a Keep volume to write to.
622         // If this volume fails, try all of the volumes in order.
623         if vol := KeepVM.NextWritable(); vol != nil {
624                 if err := vol.Put(hash, block); err == nil {
625                         return nil // success!
626                 }
627         }
628
629         writables := KeepVM.AllWritable()
630         if len(writables) == 0 {
631                 log.Print("No writable volumes.")
632                 return FullError
633         }
634
635         allFull := true
636         for _, vol := range writables {
637                 err := vol.Put(hash, block)
638                 if err == nil {
639                         return nil // success!
640                 }
641                 if err != FullError {
642                         // The volume is not full but the
643                         // write did not succeed.  Report the
644                         // error and continue trying.
645                         allFull = false
646                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
647                 }
648         }
649
650         if allFull {
651                 log.Print("All volumes are full.")
652                 return FullError
653         } else {
654                 // Already logged the non-full errors.
655                 return GenericError
656         }
657 }
658
659 // IsValidLocator
660 //     Return true if the specified string is a valid Keep locator.
661 //     When Keep is extended to support hash types other than MD5,
662 //     this should be updated to cover those as well.
663 //
664 func IsValidLocator(loc string) bool {
665         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
666         if err == nil {
667                 return match
668         }
669         log.Printf("IsValidLocator: %s\n", err)
670         return false
671 }
672
673 // GetApiToken returns the OAuth2 token from the Authorization
674 // header of a HTTP request, or an empty string if no matching
675 // token is found.
676 func GetApiToken(req *http.Request) string {
677         if auth, ok := req.Header["Authorization"]; ok {
678                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
679                         log.Println(err)
680                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
681                         return match[1]
682                 }
683         }
684         return ""
685 }
686
687 // IsExpired returns true if the given Unix timestamp (expressed as a
688 // hexadecimal string) is in the past, or if timestamp_hex cannot be
689 // parsed as a hexadecimal string.
690 func IsExpired(timestamp_hex string) bool {
691         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
692         if err != nil {
693                 log.Printf("IsExpired: %s\n", err)
694                 return true
695         }
696         return time.Unix(ts, 0).Before(time.Now())
697 }
698
699 // CanDelete returns true if the user identified by api_token is
700 // allowed to delete blocks.
701 func CanDelete(api_token string) bool {
702         if api_token == "" {
703                 return false
704         }
705         // Blocks may be deleted only when Keep has been configured with a
706         // data manager.
707         if IsDataManagerToken(api_token) {
708                 return true
709         }
710         // TODO(twp): look up api_token with the API server
711         // return true if is_admin is true and if the token
712         // has unlimited scope
713         return false
714 }
715
716 // IsDataManagerToken returns true if api_token represents the data
717 // manager's token.
718 func IsDataManagerToken(api_token string) bool {
719         return data_manager_token != "" && api_token == data_manager_token
720 }