7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
20 // ======================
21 // Configuration settings
23 // TODO(twp): make all of these configurable via command line flags
24 // and/or configuration file settings.
26 // Default TCP address on which to listen for requests.
27 // Initialized by the --listen flag.
28 const DefaultAddr = ":25107"
30 // A Keep "block" is 64MB.
31 const BlockSize = 64 * 1024 * 1024
33 // A Keep volume must have at least MinFreeKilobytes available
34 // in order to permit writes.
35 const MinFreeKilobytes = BlockSize / 1024
37 // ProcMounts /proc/mounts
38 var ProcMounts = "/proc/mounts"
40 // enforcePermissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the -enforce-permissions flag.
43 var enforcePermissions bool
45 // blobSignatureTTL is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the -permission-ttl flag.
48 var blobSignatureTTL time.Duration
50 // dataManagerToken represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the -data-manager-token-file flag.
53 var dataManagerToken string
55 // neverDelete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var neverDelete = true
59 // trashLifetime is the time duration after a block is trashed
60 // during which it can be recovered using an /untrash request
61 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
62 var trashLifetime time.Duration
64 // trashCheckInterval is the time duration at which the emptyTrash goroutine
65 // will check and delete expired trashed blocks. Default is one day.
66 // Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
67 var trashCheckInterval time.Duration
74 type KeepError struct {
80 BadRequestError = &KeepError{400, "Bad Request"}
81 UnauthorizedError = &KeepError{401, "Unauthorized"}
82 CollisionError = &KeepError{500, "Collision"}
83 RequestHashError = &KeepError{422, "Hash mismatch in request"}
84 PermissionError = &KeepError{403, "Forbidden"}
85 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
86 ExpiredError = &KeepError{401, "Expired permission signature"}
87 NotFoundError = &KeepError{404, "Not Found"}
88 GenericError = &KeepError{500, "Fail"}
89 FullError = &KeepError{503, "Full"}
90 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
91 TooLongError = &KeepError{413, "Block is too large"}
92 MethodDisabledError = &KeepError{405, "Method disabled"}
93 ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
96 func (e *KeepError) Error() string {
100 // ========================
101 // Internal data structures
103 // These global variables are used by multiple parts of the
104 // program. They are good candidates for moving into their own
107 // The Keep VolumeManager maintains a list of available volumes.
108 // Initialized by the --volumes flag (or by FindKeepVolumes).
109 var KeepVM VolumeManager
111 // The pull list manager and trash queue are threadsafe queues which
112 // support atomic update operations. The PullHandler and TrashHandler
113 // store results from Data Manager /pull and /trash requests here.
115 // See the Keep and Data Manager design documents for more details:
116 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
117 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
120 var trashq *WorkQueue
122 type volumeSet []Volume
130 var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32}).trash.(\d+)$`)
132 func (vs *volumeSet) String() string {
133 return fmt.Sprintf("%+v", (*vs)[:])
136 // TODO(twp): continue moving as much code as possible out of main
137 // so it can be effectively tested. Esp. handling and postprocessing
138 // of command line flags (identifying Keep volumes and initializing
139 // permission arguments).
142 log.Println("keepstore starting, pid", os.Getpid())
143 defer log.Println("keepstore exiting, pid", os.Getpid())
146 dataManagerTokenFile string
148 blobSigningKeyFile string
153 &dataManagerTokenFile,
154 "data-manager-token-file",
156 "File with the API token used by the Data Manager. All DELETE "+
157 "requests or GET /index requests must carry this token.")
160 "enforce-permissions",
162 "Enforce permission signatures on requests.")
167 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
172 "If true, nothing will be deleted. "+
173 "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
174 "You should leave this option alone unless you can afford to lose data.")
177 "permission-key-file",
179 "Synonym for -blob-signing-key-file.")
182 "blob-signing-key-file",
184 "File containing the secret key for generating and verifying "+
185 "blob permission signatures.")
190 "Synonym for -blob-signature-ttl.")
193 "blob-signature-ttl",
194 int(time.Duration(2*7*24*time.Hour).Seconds()),
195 "Lifetime of blob permission signatures. "+
196 "See services/api/config/application.default.yml.")
201 "Serialize read and write operations on the following volumes.")
206 "Do not write, delete, or touch anything on the following volumes.")
211 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
216 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
221 "Time duration after a block is trashed during which it can be recovered using an /untrash request")
224 "trash-check-interval",
226 "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
231 log.Fatal("-max-buffers must be greater than zero.")
233 bufs = newBufferPool(maxBuffers, BlockSize)
236 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
238 log.Fatalf("open pidfile (%s): %s", pidfile, err)
240 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
242 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
246 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
248 _, err = fmt.Fprint(f, os.Getpid())
250 log.Fatalf("write pidfile (%s): %s", pidfile, err)
254 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
257 defer os.Remove(pidfile)
260 if len(volumes) == 0 {
261 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
262 log.Fatal("No volumes found.")
266 for _, v := range volumes {
267 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
270 // Initialize data manager token and permission key.
271 // If these tokens are specified but cannot be read,
272 // raise a fatal error.
273 if dataManagerTokenFile != "" {
274 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
275 dataManagerToken = strings.TrimSpace(string(buf))
277 log.Fatalf("reading data manager token: %s\n", err)
281 if neverDelete != true {
282 log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
283 "been extensively tested. You should leave this option alone unless you can afford to lose data.")
286 if blobSigningKeyFile != "" {
287 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
288 PermissionSecret = bytes.TrimSpace(buf)
290 log.Fatalf("reading permission key: %s\n", err)
294 blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
296 if PermissionSecret == nil {
297 if enforcePermissions {
298 log.Fatal("-enforce-permissions requires a permission key")
300 log.Println("Running without a PermissionSecret. Block locators " +
301 "returned by this server will not be signed, and will be rejected " +
302 "by a server that enforces permissions.")
303 log.Println("To fix this, use the -blob-signing-key-file flag " +
304 "to specify the file containing the permission key.")
308 // Start a round-robin VolumeManager with the volumes we have found.
309 KeepVM = MakeRRVolumeManager(volumes)
311 // Tell the built-in HTTP server to direct all requests to the REST router.
312 loggingRouter := MakeLoggingRESTRouter()
313 http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
314 loggingRouter.ServeHTTP(resp, req)
317 // Set up a TCP listener.
318 listener, err := net.Listen("tcp", listen)
323 // Initialize Pull queue and worker
324 keepClient := &keepclient.KeepClient{
327 Client: &http.Client{},
330 // Initialize the pullq and worker
331 pullq = NewWorkQueue()
332 go RunPullWorker(pullq, keepClient)
334 // Initialize the trashq and worker
335 trashq = NewWorkQueue()
336 go RunTrashWorker(trashq)
338 // Start emptyTrash goroutine
339 doneEmptyingTrash := make(chan bool)
340 go emptyTrash(doneEmptyingTrash, trashCheckInterval)
342 // Shut down the server gracefully (by closing the listener)
343 // if SIGTERM is received.
344 term := make(chan os.Signal, 1)
345 go func(sig <-chan os.Signal) {
347 log.Println("caught signal:", s)
348 doneEmptyingTrash <- true
351 signal.Notify(term, syscall.SIGTERM)
352 signal.Notify(term, syscall.SIGINT)
354 log.Println("listening at", listen)
355 srv := &http.Server{Addr: listen}
359 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
360 func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
361 ticker := time.NewTicker(trashCheckInterval)
366 for _, v := range volumes {
371 case <-doneEmptyingTrash: