7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/httpserver"
9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
21 // ======================
22 // Configuration settings
24 // TODO(twp): make all of these configurable via command line flags
25 // and/or configuration file settings.
27 // Default TCP address on which to listen for requests.
28 // Initialized by the --listen flag.
29 const DefaultAddr = ":25107"
31 // A Keep "block" is 64MB.
32 const BlockSize = 64 * 1024 * 1024
34 // A Keep volume must have at least MinFreeKilobytes available
35 // in order to permit writes.
36 const MinFreeKilobytes = BlockSize / 1024
38 // ProcMounts /proc/mounts
39 var ProcMounts = "/proc/mounts"
41 // enforcePermissions controls whether permission signatures
42 // should be enforced (affecting GET and DELETE requests).
43 // Initialized by the -enforce-permissions flag.
44 var enforcePermissions bool
46 // blobSignatureTTL is the time duration for which new permission
47 // signatures (returned by PUT requests) will be valid.
48 // Initialized by the -permission-ttl flag.
49 var blobSignatureTTL time.Duration
51 // dataManagerToken represents the API token used by the
52 // Data Manager, and is required on certain privileged operations.
53 // Initialized by the -data-manager-token-file flag.
54 var dataManagerToken string
56 // neverDelete can be used to prevent the DELETE handler from
57 // actually deleting anything.
58 var neverDelete = true
60 // trashLifetime is the time duration after a block is trashed
61 // during which it can be recovered using an /untrash request
62 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
63 var trashLifetime time.Duration
65 // trashCheckInterval is the time duration at which the emptyTrash goroutine
66 // will check and delete expired trashed blocks. Default is one day.
67 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
68 var trashCheckInterval time.Duration
75 type KeepError struct {
81 BadRequestError = &KeepError{400, "Bad Request"}
82 UnauthorizedError = &KeepError{401, "Unauthorized"}
83 CollisionError = &KeepError{500, "Collision"}
84 RequestHashError = &KeepError{422, "Hash mismatch in request"}
85 PermissionError = &KeepError{403, "Forbidden"}
86 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
87 ExpiredError = &KeepError{401, "Expired permission signature"}
88 NotFoundError = &KeepError{404, "Not Found"}
89 GenericError = &KeepError{500, "Fail"}
90 FullError = &KeepError{503, "Full"}
91 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
92 TooLongError = &KeepError{413, "Block is too large"}
93 MethodDisabledError = &KeepError{405, "Method disabled"}
94 ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
95 ErrClientDisconnect = &KeepError{503, "Client disconnected"}
98 func (e *KeepError) Error() string {
102 // ========================
103 // Internal data structures
105 // These global variables are used by multiple parts of the
106 // program. They are good candidates for moving into their own
109 // The Keep VolumeManager maintains a list of available volumes.
110 // Initialized by the --volumes flag (or by FindKeepVolumes).
111 var KeepVM VolumeManager
113 // The pull list manager and trash queue are threadsafe queues which
114 // support atomic update operations. The PullHandler and TrashHandler
115 // store results from Data Manager /pull and /trash requests here.
117 // See the Keep and Data Manager design documents for more details:
118 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
119 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
122 var trashq *WorkQueue
124 type volumeSet []Volume
132 func (vs *volumeSet) String() string {
133 return fmt.Sprintf("%+v", (*vs)[:])
136 // TODO(twp): continue moving as much code as possible out of main
137 // so it can be effectively tested. Esp. handling and postprocessing
138 // of command line flags (identifying Keep volumes and initializing
139 // permission arguments).
142 log.Println("keepstore starting, pid", os.Getpid())
143 defer log.Println("keepstore exiting, pid", os.Getpid())
146 dataManagerTokenFile string
148 blobSigningKeyFile string
154 &dataManagerTokenFile,
155 "data-manager-token-file",
157 "File with the API token used by the Data Manager. All DELETE "+
158 "requests or GET /index requests must carry this token.")
161 "enforce-permissions",
163 "Enforce permission signatures on requests.")
168 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
173 "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
178 "If true, nothing will be deleted. "+
179 "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
180 "You should leave this option alone unless you can afford to lose data.")
183 "permission-key-file",
185 "Synonym for -blob-signing-key-file.")
188 "blob-signing-key-file",
190 "File containing the secret key for generating and verifying "+
191 "blob permission signatures.")
196 "Synonym for -blob-signature-ttl.")
199 "blob-signature-ttl",
200 int(time.Duration(2*7*24*time.Hour).Seconds()),
201 "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
202 "See services/api/config/application.default.yml.")
207 "Serialize read and write operations on the following volumes.")
212 "Do not write, delete, or touch anything on the following volumes.")
217 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
222 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
227 "Time duration after a block is trashed during which it can be recovered using an /untrash request")
230 "trash-check-interval",
232 "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
237 log.Fatal("-max-buffers must be greater than zero.")
239 bufs = newBufferPool(maxBuffers, BlockSize)
242 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
244 log.Fatalf("open pidfile (%s): %s", pidfile, err)
246 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
248 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
252 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
254 _, err = fmt.Fprint(f, os.Getpid())
256 log.Fatalf("write pidfile (%s): %s", pidfile, err)
260 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
263 defer os.Remove(pidfile)
266 if len(volumes) == 0 {
267 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
268 log.Fatal("No volumes found.")
272 for _, v := range volumes {
273 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
276 // Initialize data manager token and permission key.
277 // If these tokens are specified but cannot be read,
278 // raise a fatal error.
279 if dataManagerTokenFile != "" {
280 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
281 dataManagerToken = strings.TrimSpace(string(buf))
283 log.Fatalf("reading data manager token: %s\n", err)
287 if neverDelete != true {
288 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
289 "been extensively tested. You should leave this option alone unless you can afford to lose data.")
292 if blobSigningKeyFile != "" {
293 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
294 PermissionSecret = bytes.TrimSpace(buf)
296 log.Fatalf("reading permission key: %s\n", err)
300 blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
302 if PermissionSecret == nil {
303 if enforcePermissions {
304 log.Fatal("-enforce-permissions requires a permission key")
306 log.Println("Running without a PermissionSecret. Block locators " +
307 "returned by this server will not be signed, and will be rejected " +
308 "by a server that enforces permissions.")
309 log.Println("To fix this, use the -blob-signing-key-file flag " +
310 "to specify the file containing the permission key.")
314 if maxRequests <= 0 {
315 maxRequests = maxBuffers * 2
316 log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
319 // Start a round-robin VolumeManager with the volumes we have found.
320 KeepVM = MakeRRVolumeManager(volumes)
322 // Middleware stack: logger, maxRequests limiter, method handlers
323 http.Handle("/", &LoggingRESTRouter{
324 httpserver.NewRequestLimiter(maxRequests,
328 // Set up a TCP listener.
329 listener, err := net.Listen("tcp", listen)
334 // Initialize Pull queue and worker
335 keepClient := &keepclient.KeepClient{
336 Arvados: &arvadosclient.ArvadosClient{},
338 Client: &http.Client{},
341 // Initialize the pullq and worker
342 pullq = NewWorkQueue()
343 go RunPullWorker(pullq, keepClient)
345 // Initialize the trashq and worker
346 trashq = NewWorkQueue()
347 go RunTrashWorker(trashq)
349 // Start emptyTrash goroutine
350 doneEmptyingTrash := make(chan bool)
351 go emptyTrash(doneEmptyingTrash, trashCheckInterval)
353 // Shut down the server gracefully (by closing the listener)
354 // if SIGTERM is received.
355 term := make(chan os.Signal, 1)
356 go func(sig <-chan os.Signal) {
358 log.Println("caught signal:", s)
359 doneEmptyingTrash <- true
362 signal.Notify(term, syscall.SIGTERM)
363 signal.Notify(term, syscall.SIGINT)
365 log.Println("listening at", listen)
366 srv := &http.Server{Addr: listen}
370 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
371 func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
372 ticker := time.NewTicker(trashCheckInterval)
377 for _, v := range volumes {
382 case <-doneEmptyingTrash: