9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
22 // ======================
23 // Configuration settings
25 // TODO(twp): make all of these configurable via command line flags
26 // and/or configuration file settings.
28 // Default TCP address on which to listen for requests.
29 // Initialized by the --listen flag.
30 const DefaultAddr = ":25107"
32 // A Keep "block" is 64MB.
33 const BlockSize = 64 * 1024 * 1024
35 // A Keep volume must have at least MinFreeKilobytes available
36 // in order to permit writes.
37 const MinFreeKilobytes = BlockSize / 1024
39 // Until #6221 is resolved, never_delete must be true.
40 // However, allow it to be false in testing with TEST_DATA_MANAGER_TOKEN
41 const TEST_DATA_MANAGER_TOKEN = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
43 // ProcMounts /proc/mounts
44 var ProcMounts = "/proc/mounts"
46 // enforcePermissions controls whether permission signatures
47 // should be enforced (affecting GET and DELETE requests).
48 // Initialized by the -enforce-permissions flag.
49 var enforcePermissions bool
51 // blobSignatureTTL is the time duration for which new permission
52 // signatures (returned by PUT requests) will be valid.
53 // Initialized by the -permission-ttl flag.
54 var blobSignatureTTL time.Duration
56 // dataManagerToken represents the API token used by the
57 // Data Manager, and is required on certain privileged operations.
58 // Initialized by the -data-manager-token-file flag.
59 var dataManagerToken string
61 // neverDelete can be used to prevent the DELETE handler from
62 // actually deleting anything.
63 var neverDelete = true
70 type KeepError struct {
76 BadRequestError = &KeepError{400, "Bad Request"}
77 UnauthorizedError = &KeepError{401, "Unauthorized"}
78 CollisionError = &KeepError{500, "Collision"}
79 RequestHashError = &KeepError{422, "Hash mismatch in request"}
80 PermissionError = &KeepError{403, "Forbidden"}
81 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
82 ExpiredError = &KeepError{401, "Expired permission signature"}
83 NotFoundError = &KeepError{404, "Not Found"}
84 GenericError = &KeepError{500, "Fail"}
85 FullError = &KeepError{503, "Full"}
86 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
87 TooLongError = &KeepError{413, "Block is too large"}
88 MethodDisabledError = &KeepError{405, "Method disabled"}
91 func (e *KeepError) Error() string {
95 // ========================
96 // Internal data structures
98 // These global variables are used by multiple parts of the
99 // program. They are good candidates for moving into their own
102 // The Keep VolumeManager maintains a list of available volumes.
103 // Initialized by the --volumes flag (or by FindKeepVolumes).
104 var KeepVM VolumeManager
106 // The pull list manager and trash queue are threadsafe queues which
107 // support atomic update operations. The PullHandler and TrashHandler
108 // store results from Data Manager /pull and /trash requests here.
110 // See the Keep and Data Manager design documents for more details:
111 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
112 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
115 var trashq *WorkQueue
122 type volumeSet []Volume
124 func (vs *volumeSet) Set(value string) error {
125 if dirs := strings.Split(value, ","); len(dirs) > 1 {
126 log.Print("DEPRECATED: using comma-separated volume list.")
127 for _, dir := range dirs {
128 if err := vs.Set(dir); err != nil {
134 if len(value) == 0 || value[0] != '/' {
135 return errors.New("Invalid volume: must begin with '/'.")
137 if _, err := os.Stat(value); err != nil {
140 var locker sync.Locker
142 locker = &sync.Mutex{}
144 *vs = append(*vs, &UnixVolume{
147 readonly: flagReadonly,
152 func (vs *volumeSet) String() string {
154 for i, v := range *vs {
163 // Discover adds a volume for every directory named "keep" that is
164 // located at the top level of a device- or tmpfs-backed mount point
165 // other than "/". It returns the number of volumes added.
166 func (vs *volumeSet) Discover() int {
168 f, err := os.Open(ProcMounts)
170 log.Fatalf("opening %s: %s", ProcMounts, err)
172 scanner := bufio.NewScanner(f)
174 args := strings.Fields(scanner.Text())
175 if err := scanner.Err(); err != nil {
176 log.Fatalf("reading %s: %s", ProcMounts, err)
178 dev, mount := args[0], args[1]
182 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
185 keepdir := mount + "/keep"
186 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
189 // Set the -readonly flag (but only for this volume)
190 // if the filesystem is mounted readonly.
191 flagReadonlyWas := flagReadonly
192 for _, fsopt := range strings.Split(args[3], ",") {
202 flagReadonly = flagReadonlyWas
208 // TODO(twp): continue moving as much code as possible out of main
209 // so it can be effectively tested. Esp. handling and postprocessing
210 // of command line flags (identifying Keep volumes and initializing
211 // permission arguments).
214 log.Println("keepstore starting, pid", os.Getpid())
215 defer log.Println("keepstore exiting, pid", os.Getpid())
218 dataManagerTokenFile string
220 blobSigningKeyFile string
226 &dataManagerTokenFile,
227 "data-manager-token-file",
229 "File with the API token used by the Data Manager. All DELETE "+
230 "requests or GET /index requests must carry this token.")
233 "enforce-permissions",
235 "Enforce permission signatures on requests.")
240 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
245 "If set, nothing will be deleted. HTTP 405 will be returned "+
246 "for valid DELETE requests.")
249 "permission-key-file",
251 "Synonym for -blob-signing-key-file.")
254 "blob-signing-key-file",
256 "File containing the secret key for generating and verifying "+
257 "blob permission signatures.")
262 "Synonym for -blob-signature-ttl.")
265 "blob-signature-ttl",
266 int(time.Duration(2*7*24*time.Hour).Seconds()),
267 "Lifetime of blob permission signatures. "+
268 "See services/api/config/application.default.yml.")
273 "Serialize read and write operations on the following volumes.")
278 "Do not write, delete, or touch anything on the following volumes.")
282 "Deprecated synonym for -volume.")
286 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
291 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
296 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
301 log.Fatal("-max-buffers must be greater than zero.")
303 bufs = newBufferPool(maxBuffers, BlockSize)
306 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
308 log.Fatalf("open pidfile (%s): %s", pidfile, err)
310 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
312 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
316 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
318 _, err = fmt.Fprint(f, os.Getpid())
320 log.Fatalf("write pidfile (%s): %s", pidfile, err)
324 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
327 defer os.Remove(pidfile)
330 if len(volumes) == 0 {
331 if volumes.Discover() == 0 {
332 log.Fatal("No volumes found.")
336 for _, v := range volumes {
337 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
340 // Initialize data manager token and permission key.
341 // If these tokens are specified but cannot be read,
342 // raise a fatal error.
343 if dataManagerTokenFile != "" {
344 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
345 dataManagerToken = strings.TrimSpace(string(buf))
347 log.Fatalf("reading data manager token: %s\n", err)
351 if neverDelete != true && dataManagerToken != TEST_DATA_MANAGER_TOKEN {
352 log.Fatal("never_delete must be true, see #6221")
355 if blobSigningKeyFile != "" {
356 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
357 PermissionSecret = bytes.TrimSpace(buf)
359 log.Fatalf("reading permission key: %s\n", err)
363 blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
365 if PermissionSecret == nil {
366 if enforcePermissions {
367 log.Fatal("-enforce-permissions requires a permission key")
369 log.Println("Running without a PermissionSecret. Block locators " +
370 "returned by this server will not be signed, and will be rejected " +
371 "by a server that enforces permissions.")
372 log.Println("To fix this, use the -blob-signing-key-file flag " +
373 "to specify the file containing the permission key.")
377 // Start a round-robin VolumeManager with the volumes we have found.
378 KeepVM = MakeRRVolumeManager(volumes)
380 // Tell the built-in HTTP server to direct all requests to the REST router.
381 loggingRouter := MakeLoggingRESTRouter()
382 http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
383 loggingRouter.ServeHTTP(resp, req)
386 // Set up a TCP listener.
387 listener, err := net.Listen("tcp", listen)
392 // Initialize Pull queue and worker
393 keepClient := &keepclient.KeepClient{
397 Client: &http.Client{},
400 // Initialize the pullq and worker
401 pullq = NewWorkQueue()
402 go RunPullWorker(pullq, keepClient)
404 // Initialize the trashq and worker
405 trashq = NewWorkQueue()
406 go RunTrashWorker(trashq)
408 // Shut down the server gracefully (by closing the listener)
409 // if SIGTERM is received.
410 term := make(chan os.Signal, 1)
411 go func(sig <-chan os.Signal) {
413 log.Println("caught signal:", s)
416 signal.Notify(term, syscall.SIGTERM)
417 signal.Notify(term, syscall.SIGINT)
419 log.Println("listening at", listen)
420 srv := &http.Server{Addr: listen}