1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 "git.curoverse.com/arvados.git/sdk/go/config"
19 "git.curoverse.com/arvados.git/sdk/go/keepclient"
20 "github.com/coreos/go-systemd/daemon"
21 "github.com/prometheus/client_golang/prometheus"
26 // A Keep "block" is 64MB.
27 const BlockSize = 64 * 1024 * 1024
29 // A Keep volume must have at least MinFreeKilobytes available
30 // in order to permit writes.
31 const MinFreeKilobytes = BlockSize / 1024
33 // ProcMounts /proc/mounts
34 var ProcMounts = "/proc/mounts"
40 type KeepError struct {
46 BadRequestError = &KeepError{400, "Bad Request"}
47 UnauthorizedError = &KeepError{401, "Unauthorized"}
48 CollisionError = &KeepError{500, "Collision"}
49 RequestHashError = &KeepError{422, "Hash mismatch in request"}
50 PermissionError = &KeepError{403, "Forbidden"}
51 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
52 ExpiredError = &KeepError{401, "Expired permission signature"}
53 NotFoundError = &KeepError{404, "Not Found"}
54 GenericError = &KeepError{500, "Fail"}
55 FullError = &KeepError{503, "Full"}
56 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
57 TooLongError = &KeepError{413, "Block is too large"}
58 MethodDisabledError = &KeepError{405, "Method disabled"}
59 ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
60 ErrClientDisconnect = &KeepError{503, "Client disconnected"}
63 func (e *KeepError) Error() string {
67 // ========================
68 // Internal data structures
70 // These global variables are used by multiple parts of the
71 // program. They are good candidates for moving into their own
74 // The Keep VolumeManager maintains a list of available volumes.
75 // Initialized by the --volumes flag (or by FindKeepVolumes).
76 var KeepVM VolumeManager
78 // The pull list manager and trash queue are threadsafe queues which
79 // support atomic update operations. The PullHandler and TrashHandler
80 // store results from Data Manager /pull and /trash requests here.
82 // See the Keep and Data Manager design documents for more details:
83 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
84 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
90 deprecated.beforeFlagParse(theConfig)
92 dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
93 getVersion := flag.Bool("version", false, "Print version information and exit.")
95 defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
101 "YAML or JSON configuration file `path`")
105 // Print version information if requested
107 fmt.Printf("keepstore %s\n", version)
111 deprecated.afterFlagParse(theConfig)
113 err := config.LoadFile(theConfig, configPath)
114 if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
119 log.Fatal(config.DumpAndExit(theConfig))
122 log.Printf("keepstore %s started", version)
124 metricsRegistry := prometheus.NewRegistry()
126 err = theConfig.Start(metricsRegistry)
131 if pidfile := theConfig.PIDFile; pidfile != "" {
132 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
134 log.Fatalf("open pidfile (%s): %s", pidfile, err)
137 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
139 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
141 defer os.Remove(pidfile)
144 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
146 _, err = fmt.Fprint(f, os.Getpid())
148 log.Fatalf("write pidfile (%s): %s", pidfile, err)
152 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
156 var cluster *arvados.Cluster
157 cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
158 if err != nil && os.IsNotExist(err) {
159 log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
160 cluster = &arvados.Cluster{
163 } else if err != nil {
164 log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
166 cluster, err = cfg.GetCluster("")
168 log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
172 log.Println("keepstore starting, pid", os.Getpid())
173 defer log.Println("keepstore exiting, pid", os.Getpid())
175 // Start a round-robin VolumeManager with the volumes we have found.
176 KeepVM = MakeRRVolumeManager(theConfig.Volumes)
178 // Middleware/handler stack
179 router := MakeRESTRouter(cluster, metricsRegistry)
181 // Set up a TCP listener.
182 listener, err := net.Listen("tcp", theConfig.Listen)
187 // Initialize keepclient for pull workers
188 keepClient := &keepclient.KeepClient{
189 Arvados: &arvadosclient.ArvadosClient{},
193 // Initialize the pullq and workers
194 pullq = NewWorkQueue()
195 for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
196 go RunPullWorker(pullq, keepClient)
199 // Initialize the trashq and workers
200 trashq = NewWorkQueue()
201 for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
202 go RunTrashWorker(trashq)
205 // Start emptyTrash goroutine
206 doneEmptyingTrash := make(chan bool)
207 go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
209 // Shut down the server gracefully (by closing the listener)
210 // if SIGTERM is received.
211 term := make(chan os.Signal, 1)
212 go func(sig <-chan os.Signal) {
214 log.Println("caught signal:", s)
215 doneEmptyingTrash <- true
218 signal.Notify(term, syscall.SIGTERM)
219 signal.Notify(term, syscall.SIGINT)
221 if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
222 log.Printf("Error notifying init daemon: %v", err)
224 log.Println("listening at", listener.Addr())
230 // Periodically (once per interval) invoke EmptyTrash on all volumes.
231 func emptyTrash(done <-chan bool, interval time.Duration) {
232 ticker := time.NewTicker(interval)
237 for _, v := range theConfig.Volumes {