7 "git.curoverse.com/arvados.git/sdk/go/httpserver"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
20 // ======================
21 // Configuration settings
23 // TODO(twp): make all of these configurable via command line flags
24 // and/or configuration file settings.
26 // Default TCP address on which to listen for requests.
27 // Initialized by the --listen flag.
28 const DefaultAddr = ":25107"
30 // A Keep "block" is 64MB.
31 const BlockSize = 64 * 1024 * 1024
33 // A Keep volume must have at least MinFreeKilobytes available
34 // in order to permit writes.
35 const MinFreeKilobytes = BlockSize / 1024
37 // ProcMounts /proc/mounts
38 var ProcMounts = "/proc/mounts"
40 // enforcePermissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the -enforce-permissions flag.
43 var enforcePermissions bool
45 // blobSignatureTTL is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the -permission-ttl flag.
48 var blobSignatureTTL time.Duration
50 // dataManagerToken represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the -data-manager-token-file flag.
53 var dataManagerToken string
55 // neverDelete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var neverDelete = true
59 // trashLifetime is the time duration after a block is trashed
60 // during which it can be recovered using an /untrash request
61 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
62 var trashLifetime time.Duration
64 // trashCheckInterval is the time duration at which the emptyTrash goroutine
65 // will check and delete expired trashed blocks. Default is one day.
66 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
67 var trashCheckInterval time.Duration
74 type KeepError struct {
80 BadRequestError = &KeepError{400, "Bad Request"}
81 UnauthorizedError = &KeepError{401, "Unauthorized"}
82 CollisionError = &KeepError{500, "Collision"}
83 RequestHashError = &KeepError{422, "Hash mismatch in request"}
84 PermissionError = &KeepError{403, "Forbidden"}
85 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
86 ExpiredError = &KeepError{401, "Expired permission signature"}
87 NotFoundError = &KeepError{404, "Not Found"}
88 GenericError = &KeepError{500, "Fail"}
89 FullError = &KeepError{503, "Full"}
90 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
91 TooLongError = &KeepError{413, "Block is too large"}
92 MethodDisabledError = &KeepError{405, "Method disabled"}
93 ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
96 func (e *KeepError) Error() string {
100 // ========================
101 // Internal data structures
103 // These global variables are used by multiple parts of the
104 // program. They are good candidates for moving into their own
107 // The Keep VolumeManager maintains a list of available volumes.
108 // Initialized by the --volumes flag (or by FindKeepVolumes).
109 var KeepVM VolumeManager
111 // The pull list manager and trash queue are threadsafe queues which
112 // support atomic update operations. The PullHandler and TrashHandler
113 // store results from Data Manager /pull and /trash requests here.
115 // See the Keep and Data Manager design documents for more details:
116 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
117 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
120 var trashq *WorkQueue
122 type volumeSet []Volume
130 func (vs *volumeSet) String() string {
131 return fmt.Sprintf("%+v", (*vs)[:])
134 // TODO(twp): continue moving as much code as possible out of main
135 // so it can be effectively tested. Esp. handling and postprocessing
136 // of command line flags (identifying Keep volumes and initializing
137 // permission arguments).
140 log.Println("keepstore starting, pid", os.Getpid())
141 defer log.Println("keepstore exiting, pid", os.Getpid())
144 dataManagerTokenFile string
146 blobSigningKeyFile string
152 &dataManagerTokenFile,
153 "data-manager-token-file",
155 "File with the API token used by the Data Manager. All DELETE "+
156 "requests or GET /index requests must carry this token.")
159 "enforce-permissions",
161 "Enforce permission signatures on requests.")
166 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
171 "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
176 "If true, nothing will be deleted. "+
177 "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
178 "You should leave this option alone unless you can afford to lose data.")
181 "permission-key-file",
183 "Synonym for -blob-signing-key-file.")
186 "blob-signing-key-file",
188 "File containing the secret key for generating and verifying "+
189 "blob permission signatures.")
194 "Synonym for -blob-signature-ttl.")
197 "blob-signature-ttl",
198 int(time.Duration(2*7*24*time.Hour).Seconds()),
199 "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
200 "See services/api/config/application.default.yml.")
205 "Serialize read and write operations on the following volumes.")
210 "Do not write, delete, or touch anything on the following volumes.")
215 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
220 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
225 "Time duration after a block is trashed during which it can be recovered using an /untrash request")
228 "trash-check-interval",
230 "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
235 log.Fatal("-max-buffers must be greater than zero.")
237 bufs = newBufferPool(maxBuffers, BlockSize)
240 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
242 log.Fatalf("open pidfile (%s): %s", pidfile, err)
244 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
246 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
250 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
252 _, err = fmt.Fprint(f, os.Getpid())
254 log.Fatalf("write pidfile (%s): %s", pidfile, err)
258 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
261 defer os.Remove(pidfile)
264 if len(volumes) == 0 {
265 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
266 log.Fatal("No volumes found.")
270 for _, v := range volumes {
271 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
274 // Initialize data manager token and permission key.
275 // If these tokens are specified but cannot be read,
276 // raise a fatal error.
277 if dataManagerTokenFile != "" {
278 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
279 dataManagerToken = strings.TrimSpace(string(buf))
281 log.Fatalf("reading data manager token: %s\n", err)
285 if neverDelete != true {
286 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
287 "been extensively tested. You should leave this option alone unless you can afford to lose data.")
290 if blobSigningKeyFile != "" {
291 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
292 PermissionSecret = bytes.TrimSpace(buf)
294 log.Fatalf("reading permission key: %s\n", err)
298 blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
300 if PermissionSecret == nil {
301 if enforcePermissions {
302 log.Fatal("-enforce-permissions requires a permission key")
304 log.Println("Running without a PermissionSecret. Block locators " +
305 "returned by this server will not be signed, and will be rejected " +
306 "by a server that enforces permissions.")
307 log.Println("To fix this, use the -blob-signing-key-file flag " +
308 "to specify the file containing the permission key.")
312 if maxRequests <= 0 {
313 maxRequests = maxBuffers * 2
314 log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
317 // Start a round-robin VolumeManager with the volumes we have found.
318 KeepVM = MakeRRVolumeManager(volumes)
320 // Middleware stack: logger, maxRequests limiter, method handlers
321 http.Handle("/", &LoggingRESTRouter{
322 httpserver.NewRequestLimiter(maxRequests,
326 // Set up a TCP listener.
327 listener, err := net.Listen("tcp", listen)
332 // Initialize Pull queue and worker
333 keepClient := &keepclient.KeepClient{
336 Client: &http.Client{},
339 // Initialize the pullq and worker
340 pullq = NewWorkQueue()
341 go RunPullWorker(pullq, keepClient)
343 // Initialize the trashq and worker
344 trashq = NewWorkQueue()
345 go RunTrashWorker(trashq)
347 // Start emptyTrash goroutine
348 doneEmptyingTrash := make(chan bool)
349 go emptyTrash(doneEmptyingTrash, trashCheckInterval)
351 // Shut down the server gracefully (by closing the listener)
352 // if SIGTERM is received.
353 term := make(chan os.Signal, 1)
354 go func(sig <-chan os.Signal) {
356 log.Println("caught signal:", s)
357 doneEmptyingTrash <- true
360 signal.Notify(term, syscall.SIGTERM)
361 signal.Notify(term, syscall.SIGINT)
363 log.Println("listening at", listen)
364 srv := &http.Server{Addr: listen}
368 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
369 func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
370 ticker := time.NewTicker(trashCheckInterval)
375 for _, v := range volumes {
380 case <-doneEmptyingTrash: