Merge branch '3411-expire-collections'
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "container/list"
14         "crypto/md5"
15         "encoding/json"
16         "fmt"
17         "github.com/gorilla/mux"
18         "io"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "runtime"
24         "strconv"
25         "strings"
26         "syscall"
27         "time"
28 )
29
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
32 //
33 func MakeRESTRouter() *mux.Router {
34         rest := mux.NewRouter()
35
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38         rest.HandleFunc(
39                 `/{hash:[0-9a-f]{32}}+{hints}`,
40                 GetBlockHandler).Methods("GET", "HEAD")
41
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44
45         // For IndexHandler we support:
46         //   /index           - returns all locators
47         //   /index/{prefix}  - returns all locators that begin with {prefix}
48         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49         //      If {prefix} is the empty string, return an index of all locators
50         //      (so /index and /index/ behave identically)
51         //      A client may supply a full 32-digit locator string, in which
52         //      case the server will return an index with either zero or one
53         //      entries. This usage allows a client to check whether a block is
54         //      present, and its size and upload time, without retrieving the
55         //      entire block.
56         //
57         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58         rest.HandleFunc(
59                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61
62         // The PullHandler processes "PUT /pull" commands from Data Manager.
63         // It parses the JSON list of pull requests in the request body, and
64         // delivers them to the pull list manager for replication.
65         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
66
67         // Any request which does not match any of these routes gets
68         // 400 Bad Request.
69         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
70
71         return rest
72 }
73
74 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
75         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
76 }
77
78 // FindKeepVolumes scans all mounted volumes on the system for Keep
79 // volumes, and returns a list of matching paths.
80 //
81 // A device is assumed to be a Keep volume if it is a normal or tmpfs
82 // volume and has a "/keep" directory directly underneath the mount
83 // point.
84 //
85 func FindKeepVolumes() []string {
86         vols := make([]string, 0)
87
88         if f, err := os.Open(PROC_MOUNTS); err != nil {
89                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
90         } else {
91                 scanner := bufio.NewScanner(f)
92                 for scanner.Scan() {
93                         args := strings.Fields(scanner.Text())
94                         dev, mount := args[0], args[1]
95                         if mount != "/" &&
96                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
97                                 keep := mount + "/keep"
98                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
99                                         vols = append(vols, keep)
100                                 }
101                         }
102                 }
103                 if err := scanner.Err(); err != nil {
104                         log.Fatal(err)
105                 }
106         }
107         return vols
108 }
109
110 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
111         hash := mux.Vars(req)["hash"]
112
113         log.Printf("%s %s", req.Method, hash)
114
115         hints := mux.Vars(req)["hints"]
116
117         // Parse the locator string and hints from the request.
118         // TODO(twp): implement a Locator type.
119         var signature, timestamp string
120         if hints != "" {
121                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
122                 for _, hint := range strings.Split(hints, "+") {
123                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
124                                 // Server ignores size hints
125                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
126                                 signature = m[1]
127                                 timestamp = m[2]
128                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
129                                 // Any unknown hint that starts with an uppercase letter is
130                                 // presumed to be valid and ignored, to permit forward compatibility.
131                         } else {
132                                 // Unknown format; not a valid locator.
133                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
134                                 return
135                         }
136                 }
137         }
138
139         // If permission checking is in effect, verify this
140         // request's permission signature.
141         if enforce_permissions {
142                 if signature == "" || timestamp == "" {
143                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
144                         return
145                 } else if IsExpired(timestamp) {
146                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
147                         return
148                 } else {
149                         req_locator := req.URL.Path[1:] // strip leading slash
150                         if !VerifySignature(req_locator, GetApiToken(req)) {
151                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
152                                 return
153                         }
154                 }
155         }
156
157         block, err := GetBlock(hash, false)
158
159         // Garbage collect after each GET. Fixes #2865.
160         // TODO(twp): review Keep memory usage and see if there's
161         // a better way to do this than blindly garbage collecting
162         // after every block.
163         defer runtime.GC()
164
165         if err != nil {
166                 // This type assertion is safe because the only errors
167                 // GetBlock can return are DiskHashError or NotFoundError.
168                 if err == NotFoundError {
169                         log.Printf("%s: not found, giving up\n", hash)
170                 }
171                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
172                 return
173         }
174
175         resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
176
177         _, err = resp.Write(block)
178         if err != nil {
179                 log.Printf("GetBlockHandler: writing response: %s", err)
180         }
181
182         return
183 }
184
185 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
186         // Garbage collect after each PUT. Fixes #2865.
187         // See also GetBlockHandler.
188         defer runtime.GC()
189
190         hash := mux.Vars(req)["hash"]
191
192         log.Printf("%s %s", req.Method, hash)
193
194         // Read the block data to be stored.
195         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
196         //
197         if req.ContentLength > BLOCKSIZE {
198                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
199                 return
200         }
201
202         buf := make([]byte, req.ContentLength)
203         nread, err := io.ReadFull(req.Body, buf)
204         if err != nil {
205                 http.Error(resp, err.Error(), 500)
206         } else if int64(nread) < req.ContentLength {
207                 http.Error(resp, "request truncated", 500)
208         } else {
209                 if err := PutBlock(buf, hash); err == nil {
210                         // Success; add a size hint, sign the locator if
211                         // possible, and return it to the client.
212                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
213                         api_token := GetApiToken(req)
214                         if PermissionSecret != nil && api_token != "" {
215                                 expiry := time.Now().Add(permission_ttl)
216                                 return_hash = SignLocator(return_hash, api_token, expiry)
217                         }
218                         resp.Write([]byte(return_hash + "\n"))
219                 } else {
220                         ke := err.(*KeepError)
221                         http.Error(resp, ke.Error(), ke.HTTPCode)
222                 }
223         }
224         return
225 }
226
227 // IndexHandler
228 //     A HandleFunc to address /index and /index/{prefix} requests.
229 //
230 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
231         prefix := mux.Vars(req)["prefix"]
232
233         // Only the data manager may issue /index requests,
234         // and only if enforce_permissions is enabled.
235         // All other requests return 403 Forbidden.
236         api_token := GetApiToken(req)
237         if !enforce_permissions ||
238                 api_token == "" ||
239                 data_manager_token != api_token {
240                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
241                 return
242         }
243         var index string
244         for _, vol := range KeepVM.Volumes() {
245                 index = index + vol.Index(prefix)
246         }
247         resp.Write([]byte(index))
248 }
249
250 // StatusHandler
251 //     Responds to /status.json requests with the current node status,
252 //     described in a JSON structure.
253 //
254 //     The data given in a status.json response includes:
255 //        volumes - a list of Keep volumes currently in use by this server
256 //          each volume is an object with the following fields:
257 //            * mount_point
258 //            * device_num (an integer identifying the underlying filesystem)
259 //            * bytes_free
260 //            * bytes_used
261 //
262 type VolumeStatus struct {
263         MountPoint string `json:"mount_point"`
264         DeviceNum  uint64 `json:"device_num"`
265         BytesFree  uint64 `json:"bytes_free"`
266         BytesUsed  uint64 `json:"bytes_used"`
267 }
268
269 type NodeStatus struct {
270         Volumes []*VolumeStatus `json:"volumes"`
271 }
272
273 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
274         st := GetNodeStatus()
275         if jstat, err := json.Marshal(st); err == nil {
276                 resp.Write(jstat)
277         } else {
278                 log.Printf("json.Marshal: %s\n", err)
279                 log.Printf("NodeStatus = %v\n", st)
280                 http.Error(resp, err.Error(), 500)
281         }
282 }
283
284 // GetNodeStatus
285 //     Returns a NodeStatus struct describing this Keep
286 //     node's current status.
287 //
288 func GetNodeStatus() *NodeStatus {
289         st := new(NodeStatus)
290
291         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
292         for i, vol := range KeepVM.Volumes() {
293                 st.Volumes[i] = vol.Status()
294         }
295         return st
296 }
297
298 // GetVolumeStatus
299 //     Returns a VolumeStatus describing the requested volume.
300 //
301 func GetVolumeStatus(volume string) *VolumeStatus {
302         var fs syscall.Statfs_t
303         var devnum uint64
304
305         if fi, err := os.Stat(volume); err == nil {
306                 devnum = fi.Sys().(*syscall.Stat_t).Dev
307         } else {
308                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
309                 return nil
310         }
311
312         err := syscall.Statfs(volume, &fs)
313         if err != nil {
314                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
315                 return nil
316         }
317         // These calculations match the way df calculates disk usage:
318         // "free" space is measured by fs.Bavail, but "used" space
319         // uses fs.Blocks - fs.Bfree.
320         free := fs.Bavail * uint64(fs.Bsize)
321         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
322         return &VolumeStatus{volume, devnum, free, used}
323 }
324
325 // DeleteHandler processes DELETE requests.
326 //
327 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
328 // from all connected volumes.
329 //
330 // Only the Data Manager, or an Arvados admin with scope "all", are
331 // allowed to issue DELETE requests.  If a DELETE request is not
332 // authenticated or is issued by a non-admin user, the server returns
333 // a PermissionError.
334 //
335 // Upon receiving a valid request from an authorized user,
336 // DeleteHandler deletes all copies of the specified block on local
337 // writable volumes.
338 //
339 // Response format:
340 //
341 // If the requested blocks was not found on any volume, the response
342 // code is HTTP 404 Not Found.
343 //
344 // Otherwise, the response code is 200 OK, with a response body
345 // consisting of the JSON message
346 //
347 //    {"copies_deleted":d,"copies_failed":f}
348 //
349 // where d and f are integers representing the number of blocks that
350 // were successfully and unsuccessfully deleted.
351 //
352 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
353         hash := mux.Vars(req)["hash"]
354         log.Printf("%s %s", req.Method, hash)
355
356         // Confirm that this user is an admin and has a token with unlimited scope.
357         var tok = GetApiToken(req)
358         if tok == "" || !CanDelete(tok) {
359                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
360                 return
361         }
362
363         if never_delete {
364                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
365                 return
366         }
367
368         // Delete copies of this block from all available volumes.  Report
369         // how many blocks were successfully and unsuccessfully
370         // deleted.
371         var result struct {
372                 Deleted int `json:"copies_deleted"`
373                 Failed  int `json:"copies_failed"`
374         }
375         for _, vol := range KeepVM.Volumes() {
376                 if err := vol.Delete(hash); err == nil {
377                         result.Deleted++
378                 } else if os.IsNotExist(err) {
379                         continue
380                 } else {
381                         result.Failed++
382                         log.Println("DeleteHandler:", err)
383                 }
384         }
385
386         var st int
387
388         if result.Deleted == 0 && result.Failed == 0 {
389                 st = http.StatusNotFound
390         } else {
391                 st = http.StatusOK
392         }
393
394         resp.WriteHeader(st)
395
396         if st == http.StatusOK {
397                 if body, err := json.Marshal(result); err == nil {
398                         resp.Write(body)
399                 } else {
400                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
401                         http.Error(resp, err.Error(), 500)
402                 }
403         }
404 }
405
406 /* PullHandler processes "PUT /pull" requests for the data manager.
407    The request body is a JSON message containing a list of pull
408    requests in the following format:
409
410    [
411       {
412          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
413          "servers":[
414                         "keep0.qr1hi.arvadosapi.com:25107",
415                         "keep1.qr1hi.arvadosapi.com:25108"
416                  ]
417           },
418           {
419                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
420                  "servers":[
421                         "10.0.1.5:25107",
422                         "10.0.1.6:25107",
423                         "10.0.1.7:25108"
424                  ]
425           },
426           ...
427    ]
428
429    Each pull request in the list consists of a block locator string
430    and an ordered list of servers.  Keepstore should try to fetch the
431    block from each server in turn.
432
433    If the request has not been sent by the Data Manager, return 401
434    Unauthorized.
435
436    If the JSON unmarshalling fails, return 400 Bad Request.
437 */
438
439 type PullRequest struct {
440         Locator string   `json:"locator"`
441         Servers []string `json:"servers"`
442 }
443
444 func PullHandler(resp http.ResponseWriter, req *http.Request) {
445         // Reject unauthorized requests.
446         api_token := GetApiToken(req)
447         if !IsDataManagerToken(api_token) {
448                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
449                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
450                 return
451         }
452
453         // Parse the request body.
454         var pr []PullRequest
455         r := json.NewDecoder(req.Body)
456         if err := r.Decode(&pr); err != nil {
457                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
458                 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
459                 return
460         }
461
462         // We have a properly formatted pull list sent from the data
463         // manager.  Report success and send the list to the pull list
464         // manager for further handling.
465         log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
466         resp.WriteHeader(http.StatusOK)
467         resp.Write([]byte(
468                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
469
470         plist := list.New()
471         for _, p := range pr {
472                 plist.PushBack(p)
473         }
474
475         if pullmgr == nil {
476                 pullmgr = NewBlockWorkList()
477         }
478         pullmgr.ReplaceList(plist)
479 }
480
481 // ==============================
482 // GetBlock and PutBlock implement lower-level code for handling
483 // blocks by rooting through volumes connected to the local machine.
484 // Once the handler has determined that system policy permits the
485 // request, it calls these methods to perform the actual operation.
486 //
487 // TODO(twp): this code would probably be better located in the
488 // VolumeManager interface. As an abstraction, the VolumeManager
489 // should be the only part of the code that cares about which volume a
490 // block is stored on, so it should be responsible for figuring out
491 // which volume to check for fetching blocks, storing blocks, etc.
492
493 // ==============================
494 // GetBlock fetches and returns the block identified by "hash".  If
495 // the update_timestamp argument is true, GetBlock also updates the
496 // block's file modification time (for the sake of PutBlock, which
497 // must update the file's timestamp when the block already exists).
498 //
499 // On success, GetBlock returns a byte slice with the block data, and
500 // a nil error.
501 //
502 // If the block cannot be found on any volume, returns NotFoundError.
503 //
504 // If the block found does not have the correct MD5 hash, returns
505 // DiskHashError.
506 //
507
508 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
509         // Attempt to read the requested hash from a keep volume.
510         error_to_caller := NotFoundError
511
512         for _, vol := range KeepVM.Volumes() {
513                 if buf, err := vol.Get(hash); err != nil {
514                         // IsNotExist is an expected error and may be ignored.
515                         // (If all volumes report IsNotExist, we return a NotFoundError)
516                         // All other errors should be logged but we continue trying to
517                         // read.
518                         switch {
519                         case os.IsNotExist(err):
520                                 continue
521                         default:
522                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
523                         }
524                 } else {
525                         // Double check the file checksum.
526                         //
527                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
528                         if filehash != hash {
529                                 // TODO(twp): this condition probably represents a bad disk and
530                                 // should raise major alarm bells for an administrator: e.g.
531                                 // they should be sent directly to an event manager at high
532                                 // priority or logged as urgent problems.
533                                 //
534                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
535                                         vol, hash, filehash)
536                                 error_to_caller = DiskHashError
537                         } else {
538                                 // Success!
539                                 if error_to_caller != NotFoundError {
540                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
541                                                 vol, hash)
542                                 }
543                                 // Update the timestamp if the caller requested.
544                                 // If we could not update the timestamp, continue looking on
545                                 // other volumes.
546                                 if update_timestamp {
547                                         if vol.Touch(hash) != nil {
548                                                 continue
549                                         }
550                                 }
551                                 return buf, nil
552                         }
553                 }
554         }
555
556         if error_to_caller != NotFoundError {
557                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
558         }
559         return nil, error_to_caller
560 }
561
562 /* PutBlock(block, hash)
563    Stores the BLOCK (identified by the content id HASH) in Keep.
564
565    The MD5 checksum of the block must be identical to the content id HASH.
566    If not, an error is returned.
567
568    PutBlock stores the BLOCK on the first Keep volume with free space.
569    A failure code is returned to the user only if all volumes fail.
570
571    On success, PutBlock returns nil.
572    On failure, it returns a KeepError with one of the following codes:
573
574    500 Collision
575           A different block with the same hash already exists on this
576           Keep server.
577    422 MD5Fail
578           The MD5 hash of the BLOCK does not match the argument HASH.
579    503 Full
580           There was not enough space left in any Keep volume to store
581           the object.
582    500 Fail
583           The object could not be stored for some other reason (e.g.
584           all writes failed). The text of the error message should
585           provide as much detail as possible.
586 */
587
588 func PutBlock(block []byte, hash string) error {
589         // Check that BLOCK's checksum matches HASH.
590         blockhash := fmt.Sprintf("%x", md5.Sum(block))
591         if blockhash != hash {
592                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
593                 return RequestHashError
594         }
595
596         // If we already have a block on disk under this identifier, return
597         // success (but check for MD5 collisions).  While fetching the block,
598         // update its timestamp.
599         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
600         // In either case, we want to write our new (good) block to disk,
601         // so there is nothing special to do if err != nil.
602         //
603         if oldblock, err := GetBlock(hash, true); err == nil {
604                 if bytes.Compare(block, oldblock) == 0 {
605                         // The block already exists; return success.
606                         return nil
607                 } else {
608                         return CollisionError
609                 }
610         }
611
612         // Choose a Keep volume to write to.
613         // If this volume fails, try all of the volumes in order.
614         vol := KeepVM.Choose()
615         if err := vol.Put(hash, block); err == nil {
616                 return nil // success!
617         } else {
618                 allFull := true
619                 for _, vol := range KeepVM.Volumes() {
620                         err := vol.Put(hash, block)
621                         if err == nil {
622                                 return nil // success!
623                         }
624                         if err != FullError {
625                                 // The volume is not full but the write did not succeed.
626                                 // Report the error and continue trying.
627                                 allFull = false
628                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
629                         }
630                 }
631
632                 if allFull {
633                         log.Printf("all Keep volumes full")
634                         return FullError
635                 } else {
636                         log.Printf("all Keep volumes failed")
637                         return GenericError
638                 }
639         }
640 }
641
642 // IsValidLocator
643 //     Return true if the specified string is a valid Keep locator.
644 //     When Keep is extended to support hash types other than MD5,
645 //     this should be updated to cover those as well.
646 //
647 func IsValidLocator(loc string) bool {
648         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
649         if err == nil {
650                 return match
651         }
652         log.Printf("IsValidLocator: %s\n", err)
653         return false
654 }
655
656 // GetApiToken returns the OAuth2 token from the Authorization
657 // header of a HTTP request, or an empty string if no matching
658 // token is found.
659 func GetApiToken(req *http.Request) string {
660         if auth, ok := req.Header["Authorization"]; ok {
661                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
662                         log.Println(err)
663                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
664                         return match[1]
665                 }
666         }
667         return ""
668 }
669
670 // IsExpired returns true if the given Unix timestamp (expressed as a
671 // hexadecimal string) is in the past, or if timestamp_hex cannot be
672 // parsed as a hexadecimal string.
673 func IsExpired(timestamp_hex string) bool {
674         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
675         if err != nil {
676                 log.Printf("IsExpired: %s\n", err)
677                 return true
678         }
679         return time.Unix(ts, 0).Before(time.Now())
680 }
681
682 // CanDelete returns true if the user identified by api_token is
683 // allowed to delete blocks.
684 func CanDelete(api_token string) bool {
685         if api_token == "" {
686                 return false
687         }
688         // Blocks may be deleted only when Keep has been configured with a
689         // data manager.
690         if IsDataManagerToken(api_token) {
691                 return true
692         }
693         // TODO(twp): look up api_token with the API server
694         // return true if is_admin is true and if the token
695         // has unlimited scope
696         return false
697 }
698
699 // IsDataManagerToken returns true if api_token represents the data
700 // manager's token.
701 func IsDataManagerToken(api_token string) bool {
702         return data_manager_token != "" && api_token == data_manager_token
703 }