14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 "git.curoverse.com/arvados.git/sdk/go/config"
16 "git.curoverse.com/arvados.git/sdk/go/httpserver"
17 "git.curoverse.com/arvados.git/sdk/go/keepclient"
18 "github.com/coreos/go-systemd/daemon"
19 "github.com/ghodss/yaml"
22 // A Keep "block" is 64MB.
23 const BlockSize = 64 * 1024 * 1024
25 // A Keep volume must have at least MinFreeKilobytes available
26 // in order to permit writes.
27 const MinFreeKilobytes = BlockSize / 1024
29 // ProcMounts /proc/mounts
30 var ProcMounts = "/proc/mounts"
36 type KeepError struct {
42 BadRequestError = &KeepError{400, "Bad Request"}
43 UnauthorizedError = &KeepError{401, "Unauthorized"}
44 CollisionError = &KeepError{500, "Collision"}
45 RequestHashError = &KeepError{422, "Hash mismatch in request"}
46 PermissionError = &KeepError{403, "Forbidden"}
47 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
48 ExpiredError = &KeepError{401, "Expired permission signature"}
49 NotFoundError = &KeepError{404, "Not Found"}
50 GenericError = &KeepError{500, "Fail"}
51 FullError = &KeepError{503, "Full"}
52 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
53 TooLongError = &KeepError{413, "Block is too large"}
54 MethodDisabledError = &KeepError{405, "Method disabled"}
55 ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
56 ErrClientDisconnect = &KeepError{503, "Client disconnected"}
59 func (e *KeepError) Error() string {
63 // ========================
64 // Internal data structures
66 // These global variables are used by multiple parts of the
67 // program. They are good candidates for moving into their own
70 // The Keep VolumeManager maintains a list of available volumes.
71 // Initialized by the --volumes flag (or by FindKeepVolumes).
72 var KeepVM VolumeManager
74 // The pull list manager and trash queue are threadsafe queues which
75 // support atomic update operations. The PullHandler and TrashHandler
76 // store results from Data Manager /pull and /trash requests here.
78 // See the Keep and Data Manager design documents for more details:
79 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
80 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
85 // TODO(twp): continue moving as much code as possible out of main
86 // so it can be effectively tested. Esp. handling and postprocessing
87 // of command line flags (identifying Keep volumes and initializing
88 // permission arguments).
91 deprecated.beforeFlagParse(theConfig)
93 dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
95 defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
101 "YAML or JSON configuration file `path`")
105 deprecated.afterFlagParse(theConfig)
107 err := config.LoadFile(theConfig, configPath)
108 if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
113 y, err := yaml.Marshal(theConfig)
121 err = theConfig.Start()
123 if pidfile := theConfig.PIDFile; pidfile != "" {
124 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
126 log.Fatalf("open pidfile (%s): %s", pidfile, err)
129 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
131 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
133 defer os.Remove(pidfile)
136 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
138 _, err = fmt.Fprint(f, os.Getpid())
140 log.Fatalf("write pidfile (%s): %s", pidfile, err)
144 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
148 log.Println("keepstore starting, pid", os.Getpid())
149 defer log.Println("keepstore exiting, pid", os.Getpid())
151 // Start a round-robin VolumeManager with the volumes we have found.
152 KeepVM = MakeRRVolumeManager(theConfig.Volumes)
154 // Middleware stack: logger, MaxRequests limiter, method handlers
155 http.Handle("/", &LoggingRESTRouter{
156 httpserver.NewRequestLimiter(theConfig.MaxRequests,
160 // Set up a TCP listener.
161 listener, err := net.Listen("tcp", theConfig.Listen)
166 // Initialize Pull queue and worker
167 keepClient := &keepclient.KeepClient{
168 Arvados: &arvadosclient.ArvadosClient{},
170 Client: &http.Client{},
173 // Initialize the pullq and worker
174 pullq = NewWorkQueue()
175 go RunPullWorker(pullq, keepClient)
177 // Initialize the trashq and worker
178 trashq = NewWorkQueue()
179 go RunTrashWorker(trashq)
181 // Start emptyTrash goroutine
182 doneEmptyingTrash := make(chan bool)
183 go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
185 // Shut down the server gracefully (by closing the listener)
186 // if SIGTERM is received.
187 term := make(chan os.Signal, 1)
188 go func(sig <-chan os.Signal) {
190 log.Println("caught signal:", s)
191 doneEmptyingTrash <- true
194 signal.Notify(term, syscall.SIGTERM)
195 signal.Notify(term, syscall.SIGINT)
197 if _, err := daemon.SdNotify("READY=1"); err != nil {
198 log.Printf("Error notifying init daemon: %v", err)
200 log.Println("listening at", listener.Addr)
201 srv := &http.Server{}
205 // Periodically (once per interval) invoke EmptyTrash on all volumes.
206 func emptyTrash(done <-chan bool, interval time.Duration) {
207 ticker := time.NewTicker(interval)
212 for _, v := range theConfig.Volumes {