7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
19 // ======================
20 // Configuration settings
22 // TODO(twp): make all of these configurable via command line flags
23 // and/or configuration file settings.
25 // Default TCP address on which to listen for requests.
26 // Initialized by the --listen flag.
27 const DefaultAddr = ":25107"
29 // A Keep "block" is 64MB.
30 const BlockSize = 64 * 1024 * 1024
32 // A Keep volume must have at least MinFreeKilobytes available
33 // in order to permit writes.
34 const MinFreeKilobytes = BlockSize / 1024
36 // ProcMounts /proc/mounts
37 var ProcMounts = "/proc/mounts"
39 // enforcePermissions controls whether permission signatures
40 // should be enforced (affecting GET and DELETE requests).
41 // Initialized by the -enforce-permissions flag.
42 var enforcePermissions bool
44 // blobSignatureTTL is the time duration for which new permission
45 // signatures (returned by PUT requests) will be valid.
46 // Initialized by the -permission-ttl flag.
47 var blobSignatureTTL time.Duration
49 // dataManagerToken represents the API token used by the
50 // Data Manager, and is required on certain privileged operations.
51 // Initialized by the -data-manager-token-file flag.
52 var dataManagerToken string
54 // neverDelete can be used to prevent the DELETE handler from
55 // actually deleting anything.
56 var neverDelete = true
58 // trashLifetime is the time duration after a block is trashed
59 // during which it can be recovered using an /untrash request
60 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
61 var trashLifetime time.Duration
63 // trashCheckInterval is the time duration at which the emptyTrash goroutine
64 // will check and delete expired trashed blocks. Default is one day.
65 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
66 var trashCheckInterval time.Duration
73 type KeepError struct {
79 BadRequestError = &KeepError{400, "Bad Request"}
80 UnauthorizedError = &KeepError{401, "Unauthorized"}
81 CollisionError = &KeepError{500, "Collision"}
82 RequestHashError = &KeepError{422, "Hash mismatch in request"}
83 PermissionError = &KeepError{403, "Forbidden"}
84 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
85 ExpiredError = &KeepError{401, "Expired permission signature"}
86 NotFoundError = &KeepError{404, "Not Found"}
87 GenericError = &KeepError{500, "Fail"}
88 FullError = &KeepError{503, "Full"}
89 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
90 TooLongError = &KeepError{413, "Block is too large"}
91 MethodDisabledError = &KeepError{405, "Method disabled"}
92 ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
95 func (e *KeepError) Error() string {
99 // ========================
100 // Internal data structures
102 // These global variables are used by multiple parts of the
103 // program. They are good candidates for moving into their own
106 // The Keep VolumeManager maintains a list of available volumes.
107 // Initialized by the --volumes flag (or by FindKeepVolumes).
108 var KeepVM VolumeManager
110 // The pull list manager and trash queue are threadsafe queues which
111 // support atomic update operations. The PullHandler and TrashHandler
112 // store results from Data Manager /pull and /trash requests here.
114 // See the Keep and Data Manager design documents for more details:
115 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
116 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
119 var trashq *WorkQueue
121 type volumeSet []Volume
129 func (vs *volumeSet) String() string {
130 return fmt.Sprintf("%+v", (*vs)[:])
133 // TODO(twp): continue moving as much code as possible out of main
134 // so it can be effectively tested. Esp. handling and postprocessing
135 // of command line flags (identifying Keep volumes and initializing
136 // permission arguments).
139 log.Println("keepstore starting, pid", os.Getpid())
140 defer log.Println("keepstore exiting, pid", os.Getpid())
143 dataManagerTokenFile string
145 blobSigningKeyFile string
150 &dataManagerTokenFile,
151 "data-manager-token-file",
153 "File with the API token used by the Data Manager. All DELETE "+
154 "requests or GET /index requests must carry this token.")
157 "enforce-permissions",
159 "Enforce permission signatures on requests.")
164 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
169 "If true, nothing will be deleted. "+
170 "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
171 "You should leave this option alone unless you can afford to lose data.")
174 "permission-key-file",
176 "Synonym for -blob-signing-key-file.")
179 "blob-signing-key-file",
181 "File containing the secret key for generating and verifying "+
182 "blob permission signatures.")
187 "Synonym for -blob-signature-ttl.")
190 "blob-signature-ttl",
191 int(time.Duration(2*7*24*time.Hour).Seconds()),
192 "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures"+
193 "See services/api/config/application.default.yml.")
198 "Serialize read and write operations on the following volumes.")
203 "Do not write, delete, or touch anything on the following volumes.")
208 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
213 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
218 "Time duration after a block is trashed during which it can be recovered using an /untrash request")
221 "trash-check-interval",
223 "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
228 log.Fatal("-max-buffers must be greater than zero.")
230 bufs = newBufferPool(maxBuffers, BlockSize)
233 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
235 log.Fatalf("open pidfile (%s): %s", pidfile, err)
237 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
239 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
243 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
245 _, err = fmt.Fprint(f, os.Getpid())
247 log.Fatalf("write pidfile (%s): %s", pidfile, err)
251 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
254 defer os.Remove(pidfile)
257 if len(volumes) == 0 {
258 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
259 log.Fatal("No volumes found.")
263 for _, v := range volumes {
264 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
267 // Initialize data manager token and permission key.
268 // If these tokens are specified but cannot be read,
269 // raise a fatal error.
270 if dataManagerTokenFile != "" {
271 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
272 dataManagerToken = strings.TrimSpace(string(buf))
274 log.Fatalf("reading data manager token: %s\n", err)
278 if neverDelete != true {
279 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
280 "been extensively tested. You should leave this option alone unless you can afford to lose data.")
283 if blobSigningKeyFile != "" {
284 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
285 PermissionSecret = bytes.TrimSpace(buf)
287 log.Fatalf("reading permission key: %s\n", err)
291 blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
293 if PermissionSecret == nil {
294 if enforcePermissions {
295 log.Fatal("-enforce-permissions requires a permission key")
297 log.Println("Running without a PermissionSecret. Block locators " +
298 "returned by this server will not be signed, and will be rejected " +
299 "by a server that enforces permissions.")
300 log.Println("To fix this, use the -blob-signing-key-file flag " +
301 "to specify the file containing the permission key.")
305 // Start a round-robin VolumeManager with the volumes we have found.
306 KeepVM = MakeRRVolumeManager(volumes)
308 // Tell the built-in HTTP server to direct all requests to the REST router.
309 loggingRouter := MakeLoggingRESTRouter()
310 http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
311 loggingRouter.ServeHTTP(resp, req)
314 // Set up a TCP listener.
315 listener, err := net.Listen("tcp", listen)
320 // Initialize Pull queue and worker
321 keepClient := &keepclient.KeepClient{
324 Client: &http.Client{},
327 // Initialize the pullq and worker
328 pullq = NewWorkQueue()
329 go RunPullWorker(pullq, keepClient)
331 // Initialize the trashq and worker
332 trashq = NewWorkQueue()
333 go RunTrashWorker(trashq)
335 // Start emptyTrash goroutine
336 doneEmptyingTrash := make(chan bool)
337 go emptyTrash(doneEmptyingTrash, trashCheckInterval)
339 // Shut down the server gracefully (by closing the listener)
340 // if SIGTERM is received.
341 term := make(chan os.Signal, 1)
342 go func(sig <-chan os.Signal) {
344 log.Println("caught signal:", s)
345 doneEmptyingTrash <- true
348 signal.Notify(term, syscall.SIGTERM)
349 signal.Notify(term, syscall.SIGINT)
351 log.Println("listening at", listen)
352 srv := &http.Server{Addr: listen}
356 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
357 func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
358 ticker := time.NewTicker(trashCheckInterval)
363 for _, v := range volumes {
368 case <-doneEmptyingTrash: