9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
21 // ======================
22 // Configuration settings
24 // TODO(twp): make all of these configurable via command line flags
25 // and/or configuration file settings.
27 // Default TCP address on which to listen for requests.
28 // Initialized by the --listen flag.
29 const DEFAULT_ADDR = ":25107"
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
38 var PROC_MOUNTS = "/proc/mounts"
40 // enforce_permissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the --enforce-permissions flag.
43 var enforce_permissions bool
45 // permission_ttl is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the --permission-ttl flag.
48 var permission_ttl time.Duration
50 // data_manager_token represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the --data-manager-token-file flag.
53 var data_manager_token string
55 // never_delete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var never_delete = false
62 type KeepError struct {
68 BadRequestError = &KeepError{400, "Bad Request"}
69 UnauthorizedError = &KeepError{401, "Unauthorized"}
70 CollisionError = &KeepError{500, "Collision"}
71 RequestHashError = &KeepError{422, "Hash mismatch in request"}
72 PermissionError = &KeepError{403, "Forbidden"}
73 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
74 ExpiredError = &KeepError{401, "Expired permission signature"}
75 NotFoundError = &KeepError{404, "Not Found"}
76 GenericError = &KeepError{500, "Fail"}
77 FullError = &KeepError{503, "Full"}
78 TooLongError = &KeepError{504, "Timeout"}
79 MethodDisabledError = &KeepError{405, "Method disabled"}
82 func (e *KeepError) Error() string {
86 // ========================
87 // Internal data structures
89 // These global variables are used by multiple parts of the
90 // program. They are good candidates for moving into their own
93 // The Keep VolumeManager maintains a list of available volumes.
94 // Initialized by the --volumes flag (or by FindKeepVolumes).
95 var KeepVM VolumeManager
97 // The pull list manager and trash queue are threadsafe queues which
98 // support atomic update operations. The PullHandler and TrashHandler
99 // store results from Data Manager /pull and /trash requests here.
101 // See the Keep and Data Manager design documents for more details:
102 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
103 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
106 var trashq *WorkQueue
112 type volumeSet []Volume
114 func (vs *volumeSet) Set(value string) error {
115 if dirs := strings.Split(value, ","); len(dirs) > 1 {
116 log.Print("DEPRECATED: using comma-separated volume list.")
117 for _, dir := range dirs {
118 if err := vs.Set(dir); err != nil {
124 if len(value) == 0 || value[0] != '/' {
125 return errors.New("Invalid volume: must begin with '/'.")
127 if _, err := os.Stat(value); err != nil {
130 *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
134 func (vs *volumeSet) String() string {
136 for i, v := range *vs {
145 // Discover adds a volume for every directory named "keep" that is
146 // located at the top level of a device- or tmpfs-backed mount point
147 // other than "/". It returns the number of volumes added.
148 func (vs *volumeSet) Discover() int {
150 f, err := os.Open(PROC_MOUNTS)
152 log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
154 scanner := bufio.NewScanner(f)
156 args := strings.Fields(scanner.Text())
157 if err := scanner.Err(); err != nil {
158 log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
160 dev, mount := args[0], args[1]
164 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
167 keepdir := mount + "/keep"
168 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
171 // Set the -readonly flag (but only for this volume)
172 // if the filesystem is mounted readonly.
173 flagReadonlyWas := flagReadonly
174 for _, fsopt := range strings.Split(args[3], ",") {
184 flagReadonly = flagReadonlyWas
190 // TODO(twp): continue moving as much code as possible out of main
191 // so it can be effectively tested. Esp. handling and postprocessing
192 // of command line flags (identifying Keep volumes and initializing
193 // permission arguments).
196 log.Println("Keep started: pid", os.Getpid())
199 data_manager_token_file string
201 permission_key_file string
202 permission_ttl_sec int
207 &data_manager_token_file,
208 "data-manager-token-file",
210 "File with the API token used by the Data Manager. All DELETE "+
211 "requests or GET /index requests must carry this token.")
213 &enforce_permissions,
214 "enforce-permissions",
216 "Enforce permission signatures on requests.")
221 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
226 "If set, nothing will be deleted. HTTP 405 will be returned "+
227 "for valid DELETE requests.")
229 &permission_key_file,
230 "permission-key-file",
232 "File containing the secret key for generating and verifying "+
233 "permission signatures.")
238 "Expiration time (in seconds) for newly generated permission "+
244 "Serialize read and write operations on the following volumes.")
249 "Do not write, delete, or touch anything on the following volumes.")
253 "Deprecated synonym for -volume.")
257 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
262 "Path to write pid file")
266 if len(volumes) == 0 {
267 if volumes.Discover() == 0 {
268 log.Fatal("No volumes found.")
272 for _, v := range volumes {
273 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
276 // Initialize data manager token and permission key.
277 // If these tokens are specified but cannot be read,
278 // raise a fatal error.
279 if data_manager_token_file != "" {
280 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
281 data_manager_token = strings.TrimSpace(string(buf))
283 log.Fatalf("reading data manager token: %s\n", err)
286 if permission_key_file != "" {
287 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
288 PermissionSecret = bytes.TrimSpace(buf)
290 log.Fatalf("reading permission key: %s\n", err)
294 // Initialize permission TTL
295 permission_ttl = time.Duration(permission_ttl_sec) * time.Second
297 // If --enforce-permissions is true, we must have a permission key
299 if PermissionSecret == nil {
300 if enforce_permissions {
301 log.Fatal("--enforce-permissions requires a permission key")
303 log.Println("Running without a PermissionSecret. Block locators " +
304 "returned by this server will not be signed, and will be rejected " +
305 "by a server that enforces permissions.")
306 log.Println("To fix this, run Keep with --permission-key-file=<path> " +
307 "to define the location of a file containing the permission key.")
311 // Start a round-robin VolumeManager with the volumes we have found.
312 KeepVM = MakeRRVolumeManager(volumes)
314 // Tell the built-in HTTP server to direct all requests to the REST router.
315 loggingRouter := MakeLoggingRESTRouter()
316 http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
317 loggingRouter.ServeHTTP(resp, req)
320 // Set up a TCP listener.
321 listener, err := net.Listen("tcp", listen)
326 // Initialize Pull queue and worker
327 keepClient := &keepclient.KeepClient{
331 Client: &http.Client{},
334 // Initialize the pullq and worker
335 pullq = NewWorkQueue()
336 go RunPullWorker(pullq, keepClient)
338 // Initialize the trashq and worker
339 trashq = NewWorkQueue()
340 go RunTrashWorker(trashq)
342 // Shut down the server gracefully (by closing the listener)
343 // if SIGTERM is received.
344 term := make(chan os.Signal, 1)
345 go func(sig <-chan os.Signal) {
347 log.Println("caught signal:", s)
350 signal.Notify(term, syscall.SIGTERM)
353 f, err := os.Create(pidfile)
355 fmt.Fprint(f, os.Getpid())
358 log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
362 // Start listening for requests.
363 srv := &http.Server{Addr: listen}
366 log.Println("shutting down")