7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
19 // ======================
20 // Configuration settings
22 // TODO(twp): make all of these configurable via command line flags
23 // and/or configuration file settings.
25 // Default TCP address on which to listen for requests.
26 // Initialized by the --listen flag.
27 const DefaultAddr = ":25107"
29 // A Keep "block" is 64MB.
30 const BlockSize = 64 * 1024 * 1024
32 // A Keep volume must have at least MinFreeKilobytes available
33 // in order to permit writes.
34 const MinFreeKilobytes = BlockSize / 1024
36 // Until #6221 is resolved, never_delete must be true.
37 // However, allow it to be false in testing with TestDataManagerToken
38 const TestDataManagerToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
40 // ProcMounts /proc/mounts
41 var ProcMounts = "/proc/mounts"
43 // enforcePermissions controls whether permission signatures
44 // should be enforced (affecting GET and DELETE requests).
45 // Initialized by the -enforce-permissions flag.
46 var enforcePermissions bool
48 // blobSignatureTTL is the time duration for which new permission
49 // signatures (returned by PUT requests) will be valid.
50 // Initialized by the -permission-ttl flag.
51 var blobSignatureTTL time.Duration
53 // dataManagerToken represents the API token used by the
54 // Data Manager, and is required on certain privileged operations.
55 // Initialized by the -data-manager-token-file flag.
56 var dataManagerToken string
58 // neverDelete can be used to prevent the DELETE handler from
59 // actually deleting anything.
60 var neverDelete = true
67 type KeepError struct {
73 BadRequestError = &KeepError{400, "Bad Request"}
74 UnauthorizedError = &KeepError{401, "Unauthorized"}
75 CollisionError = &KeepError{500, "Collision"}
76 RequestHashError = &KeepError{422, "Hash mismatch in request"}
77 PermissionError = &KeepError{403, "Forbidden"}
78 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
79 ExpiredError = &KeepError{401, "Expired permission signature"}
80 NotFoundError = &KeepError{404, "Not Found"}
81 GenericError = &KeepError{500, "Fail"}
82 FullError = &KeepError{503, "Full"}
83 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
84 TooLongError = &KeepError{413, "Block is too large"}
85 MethodDisabledError = &KeepError{405, "Method disabled"}
88 func (e *KeepError) Error() string {
92 // ========================
93 // Internal data structures
95 // These global variables are used by multiple parts of the
96 // program. They are good candidates for moving into their own
99 // The Keep VolumeManager maintains a list of available volumes.
100 // Initialized by the --volumes flag (or by FindKeepVolumes).
101 var KeepVM VolumeManager
103 // The pull list manager and trash queue are threadsafe queues which
104 // support atomic update operations. The PullHandler and TrashHandler
105 // store results from Data Manager /pull and /trash requests here.
107 // See the Keep and Data Manager design documents for more details:
108 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
109 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
112 var trashq *WorkQueue
114 type volumeSet []Volume
122 func (vs *volumeSet) String() string {
123 return fmt.Sprintf("%+v", (*vs)[:])
126 // TODO(twp): continue moving as much code as possible out of main
127 // so it can be effectively tested. Esp. handling and postprocessing
128 // of command line flags (identifying Keep volumes and initializing
129 // permission arguments).
132 log.Println("keepstore starting, pid", os.Getpid())
133 defer log.Println("keepstore exiting, pid", os.Getpid())
136 dataManagerTokenFile string
138 blobSigningKeyFile string
143 &dataManagerTokenFile,
144 "data-manager-token-file",
146 "File with the API token used by the Data Manager. All DELETE "+
147 "requests or GET /index requests must carry this token.")
150 "enforce-permissions",
152 "Enforce permission signatures on requests.")
157 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
162 "If set, nothing will be deleted. HTTP 405 will be returned "+
163 "for valid DELETE requests.")
166 "permission-key-file",
168 "Synonym for -blob-signing-key-file.")
171 "blob-signing-key-file",
173 "File containing the secret key for generating and verifying "+
174 "blob permission signatures.")
179 "Synonym for -blob-signature-ttl.")
182 "blob-signature-ttl",
183 int(time.Duration(2*7*24*time.Hour).Seconds()),
184 "Lifetime of blob permission signatures. "+
185 "See services/api/config/application.default.yml.")
190 "Serialize read and write operations on the following volumes.")
195 "Do not write, delete, or touch anything on the following volumes.")
200 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
205 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
210 log.Fatal("-max-buffers must be greater than zero.")
212 bufs = newBufferPool(maxBuffers, BlockSize)
215 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
217 log.Fatalf("open pidfile (%s): %s", pidfile, err)
219 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
221 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
225 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
227 _, err = fmt.Fprint(f, os.Getpid())
229 log.Fatalf("write pidfile (%s): %s", pidfile, err)
233 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
236 defer os.Remove(pidfile)
239 if len(volumes) == 0 {
240 if (&unixVolumeAdder{&volumes}).Discover() == 0 {
241 log.Fatal("No volumes found.")
245 for _, v := range volumes {
246 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
249 // Initialize data manager token and permission key.
250 // If these tokens are specified but cannot be read,
251 // raise a fatal error.
252 if dataManagerTokenFile != "" {
253 if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
254 dataManagerToken = strings.TrimSpace(string(buf))
256 log.Fatalf("reading data manager token: %s\n", err)
260 if neverDelete != true && dataManagerToken != TestDataManagerToken {
261 log.Fatal("never_delete must be true, see #6221")
264 if blobSigningKeyFile != "" {
265 if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
266 PermissionSecret = bytes.TrimSpace(buf)
268 log.Fatalf("reading permission key: %s\n", err)
272 blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
274 if PermissionSecret == nil {
275 if enforcePermissions {
276 log.Fatal("-enforce-permissions requires a permission key")
278 log.Println("Running without a PermissionSecret. Block locators " +
279 "returned by this server will not be signed, and will be rejected " +
280 "by a server that enforces permissions.")
281 log.Println("To fix this, use the -blob-signing-key-file flag " +
282 "to specify the file containing the permission key.")
286 // Start a round-robin VolumeManager with the volumes we have found.
287 KeepVM = MakeRRVolumeManager(volumes)
289 // Tell the built-in HTTP server to direct all requests to the REST router.
290 loggingRouter := MakeLoggingRESTRouter()
291 http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
292 loggingRouter.ServeHTTP(resp, req)
295 // Set up a TCP listener.
296 listener, err := net.Listen("tcp", listen)
301 // Initialize Pull queue and worker
302 keepClient := &keepclient.KeepClient{
306 Client: &http.Client{},
309 // Initialize the pullq and worker
310 pullq = NewWorkQueue()
311 go RunPullWorker(pullq, keepClient)
313 // Initialize the trashq and worker
314 trashq = NewWorkQueue()
315 go RunTrashWorker(trashq)
317 // Shut down the server gracefully (by closing the listener)
318 // if SIGTERM is received.
319 term := make(chan os.Signal, 1)
320 go func(sig <-chan os.Signal) {
322 log.Println("caught signal:", s)
325 signal.Notify(term, syscall.SIGTERM)
326 signal.Notify(term, syscall.SIGINT)
328 log.Println("listening at", listen)
329 srv := &http.Server{Addr: listen}