14 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
16 "git.curoverse.com/arvados.git/sdk/go/config"
17 "git.curoverse.com/arvados.git/sdk/go/httpserver"
18 "git.curoverse.com/arvados.git/sdk/go/keepclient"
19 "github.com/coreos/go-systemd/daemon"
20 "github.com/ghodss/yaml"
23 // A Keep "block" is 64MB.
24 const BlockSize = 64 * 1024 * 1024
26 // A Keep volume must have at least MinFreeKilobytes available
27 // in order to permit writes.
28 const MinFreeKilobytes = BlockSize / 1024
30 // ProcMounts /proc/mounts
31 var ProcMounts = "/proc/mounts"
37 type KeepError struct {
43 BadRequestError = &KeepError{400, "Bad Request"}
44 UnauthorizedError = &KeepError{401, "Unauthorized"}
45 CollisionError = &KeepError{500, "Collision"}
46 RequestHashError = &KeepError{422, "Hash mismatch in request"}
47 PermissionError = &KeepError{403, "Forbidden"}
48 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
49 ExpiredError = &KeepError{401, "Expired permission signature"}
50 NotFoundError = &KeepError{404, "Not Found"}
51 GenericError = &KeepError{500, "Fail"}
52 FullError = &KeepError{503, "Full"}
53 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
54 TooLongError = &KeepError{413, "Block is too large"}
55 MethodDisabledError = &KeepError{405, "Method disabled"}
56 ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
57 ErrClientDisconnect = &KeepError{503, "Client disconnected"}
60 func (e *KeepError) Error() string {
64 // ========================
65 // Internal data structures
67 // These global variables are used by multiple parts of the
68 // program. They are good candidates for moving into their own
71 // The Keep VolumeManager maintains a list of available volumes.
72 // Initialized by the --volumes flag (or by FindKeepVolumes).
73 var KeepVM VolumeManager
75 // The pull list manager and trash queue are threadsafe queues which
76 // support atomic update operations. The PullHandler and TrashHandler
77 // store results from Data Manager /pull and /trash requests here.
79 // See the Keep and Data Manager design documents for more details:
80 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
81 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
91 // TODO(twp): continue moving as much code as possible out of main
92 // so it can be effectively tested. Esp. handling and postprocessing
93 // of command line flags (identifying Keep volumes and initializing
94 // permission arguments).
97 neverDelete := !theConfig.EnableDelete
98 signatureTTLSeconds := int(theConfig.BlobSignatureTTL.Duration() / time.Second)
99 flag.StringVar(&theConfig.Listen, "listen", theConfig.Listen, "see Listen configuration")
100 flag.IntVar(&theConfig.MaxBuffers, "max-buffers", theConfig.MaxBuffers, "see MaxBuffers configuration")
101 flag.IntVar(&theConfig.MaxRequests, "max-requests", theConfig.MaxRequests, "see MaxRequests configuration")
102 flag.BoolVar(&neverDelete, "never-delete", neverDelete, "see EnableDelete configuration")
103 flag.BoolVar(&theConfig.RequireSignatures, "enforce-permissions", theConfig.RequireSignatures, "see RequireSignatures configuration")
104 flag.StringVar(&theConfig.BlobSigningKeyFile, "permission-key-file", theConfig.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
105 flag.StringVar(&theConfig.BlobSigningKeyFile, "blob-signing-key-file", theConfig.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
106 flag.StringVar(&theConfig.SystemAuthTokenFile, "data-manager-token-file", theConfig.SystemAuthTokenFile, "see SystemAuthToken`File` configuration")
107 flag.IntVar(&signatureTTLSeconds, "permission-ttl", signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
108 flag.IntVar(&signatureTTLSeconds, "blob-signature-ttl", signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
109 flag.Var(&theConfig.TrashLifetime, "trash-lifetime", "see TrashLifetime configuration")
110 flag.BoolVar(&flagSerializeIO, "serialize", false, "serialize read and write operations on the following volumes.")
111 flag.BoolVar(&flagReadonly, "readonly", false, "do not write, delete, or touch anything on the following volumes.")
112 flag.StringVar(&theConfig.PIDFile, "pid", theConfig.PIDFile, "see `PIDFile` configuration")
113 flag.Var(&theConfig.TrashCheckInterval, "trash-check-interval", "see TrashCheckInterval configuration")
115 dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
117 defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
118 var configPath string
123 "YAML or JSON configuration file `path`")
127 theConfig.BlobSignatureTTL = arvados.Duration(signatureTTLSeconds) * arvados.Duration(time.Second)
128 theConfig.EnableDelete = !neverDelete
131 err := config.LoadFile(theConfig, configPath)
132 if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
137 y, err := yaml.Marshal(theConfig)
145 err = theConfig.Start()
147 if pidfile := theConfig.PIDFile; pidfile != "" {
148 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
150 log.Fatalf("open pidfile (%s): %s", pidfile, err)
153 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
155 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
157 defer os.Remove(pidfile)
160 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
162 _, err = fmt.Fprint(f, os.Getpid())
164 log.Fatalf("write pidfile (%s): %s", pidfile, err)
168 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
172 log.Println("keepstore starting, pid", os.Getpid())
173 defer log.Println("keepstore exiting, pid", os.Getpid())
175 // Start a round-robin VolumeManager with the volumes we have found.
176 KeepVM = MakeRRVolumeManager(theConfig.Volumes)
178 // Middleware stack: logger, MaxRequests limiter, method handlers
179 http.Handle("/", &LoggingRESTRouter{
180 httpserver.NewRequestLimiter(theConfig.MaxRequests,
184 // Set up a TCP listener.
185 listener, err := net.Listen("tcp", theConfig.Listen)
190 // Initialize Pull queue and worker
191 keepClient := &keepclient.KeepClient{
192 Arvados: &arvadosclient.ArvadosClient{},
194 Client: &http.Client{},
197 // Initialize the pullq and worker
198 pullq = NewWorkQueue()
199 go RunPullWorker(pullq, keepClient)
201 // Initialize the trashq and worker
202 trashq = NewWorkQueue()
203 go RunTrashWorker(trashq)
205 // Start emptyTrash goroutine
206 doneEmptyingTrash := make(chan bool)
207 go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
209 // Shut down the server gracefully (by closing the listener)
210 // if SIGTERM is received.
211 term := make(chan os.Signal, 1)
212 go func(sig <-chan os.Signal) {
214 log.Println("caught signal:", s)
215 doneEmptyingTrash <- true
218 signal.Notify(term, syscall.SIGTERM)
219 signal.Notify(term, syscall.SIGINT)
221 if _, err := daemon.SdNotify("READY=1"); err != nil {
222 log.Printf("Error notifying init daemon: %v", err)
224 log.Println("listening at", listener.Addr)
225 srv := &http.Server{}
229 // Periodically (once per interval) invoke EmptyTrash on all volumes.
230 func emptyTrash(done <-chan bool, interval time.Duration) {
231 ticker := time.NewTicker(interval)
236 for _, v := range theConfig.Volumes {