1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 "git.curoverse.com/arvados.git/sdk/go/config"
19 "git.curoverse.com/arvados.git/sdk/go/httpserver"
20 "git.curoverse.com/arvados.git/sdk/go/keepclient"
21 log "github.com/Sirupsen/logrus"
22 "github.com/coreos/go-systemd/daemon"
27 // A Keep "block" is 64MB.
28 const BlockSize = 64 * 1024 * 1024
30 // A Keep volume must have at least MinFreeKilobytes available
31 // in order to permit writes.
32 const MinFreeKilobytes = BlockSize / 1024
34 // ProcMounts /proc/mounts
35 var ProcMounts = "/proc/mounts"
41 type KeepError struct {
47 BadRequestError = &KeepError{400, "Bad Request"}
48 UnauthorizedError = &KeepError{401, "Unauthorized"}
49 CollisionError = &KeepError{500, "Collision"}
50 RequestHashError = &KeepError{422, "Hash mismatch in request"}
51 PermissionError = &KeepError{403, "Forbidden"}
52 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
53 ExpiredError = &KeepError{401, "Expired permission signature"}
54 NotFoundError = &KeepError{404, "Not Found"}
55 GenericError = &KeepError{500, "Fail"}
56 FullError = &KeepError{503, "Full"}
57 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
58 TooLongError = &KeepError{413, "Block is too large"}
59 MethodDisabledError = &KeepError{405, "Method disabled"}
60 ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
61 ErrClientDisconnect = &KeepError{503, "Client disconnected"}
64 func (e *KeepError) Error() string {
68 // ========================
69 // Internal data structures
71 // These global variables are used by multiple parts of the
72 // program. They are good candidates for moving into their own
75 // The Keep VolumeManager maintains a list of available volumes.
76 // Initialized by the --volumes flag (or by FindKeepVolumes).
77 var KeepVM VolumeManager
79 // The pull list manager and trash queue are threadsafe queues which
80 // support atomic update operations. The PullHandler and TrashHandler
81 // store results from Data Manager /pull and /trash requests here.
83 // See the Keep and Data Manager design documents for more details:
84 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
85 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
91 deprecated.beforeFlagParse(theConfig)
93 dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
94 getVersion := flag.Bool("version", false, "Print version information and exit.")
96 defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
102 "YAML or JSON configuration file `path`")
106 // Print version information if requested
108 fmt.Printf("keepstore %s\n", version)
112 deprecated.afterFlagParse(theConfig)
114 err := config.LoadFile(theConfig, configPath)
115 if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
120 log.Fatal(config.DumpAndExit(theConfig))
123 log.Printf("keepstore %s started", version)
125 err = theConfig.Start()
130 if pidfile := theConfig.PIDFile; pidfile != "" {
131 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
133 log.Fatalf("open pidfile (%s): %s", pidfile, err)
136 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
138 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
140 defer os.Remove(pidfile)
143 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
145 _, err = fmt.Fprint(f, os.Getpid())
147 log.Fatalf("write pidfile (%s): %s", pidfile, err)
151 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
155 log.Println("keepstore starting, pid", os.Getpid())
156 defer log.Println("keepstore exiting, pid", os.Getpid())
158 // Start a round-robin VolumeManager with the volumes we have found.
159 KeepVM = MakeRRVolumeManager(theConfig.Volumes)
161 // Middleware/handler stack
162 router := MakeRESTRouter()
163 limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
164 router.limiter = limiter
165 http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
167 // Set up a TCP listener.
168 listener, err := net.Listen("tcp", theConfig.Listen)
173 // Initialize Pull queue and worker
174 keepClient := &keepclient.KeepClient{
175 Arvados: &arvadosclient.ArvadosClient{},
179 // Initialize the pullq and worker
180 pullq = NewWorkQueue()
181 go RunPullWorker(pullq, keepClient)
183 // Initialize the trashq and worker
184 trashq = NewWorkQueue()
185 go RunTrashWorker(trashq)
187 // Start emptyTrash goroutine
188 doneEmptyingTrash := make(chan bool)
189 go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
191 // Shut down the server gracefully (by closing the listener)
192 // if SIGTERM is received.
193 term := make(chan os.Signal, 1)
194 go func(sig <-chan os.Signal) {
196 log.Println("caught signal:", s)
197 doneEmptyingTrash <- true
200 signal.Notify(term, syscall.SIGTERM)
201 signal.Notify(term, syscall.SIGINT)
203 if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
204 log.Printf("Error notifying init daemon: %v", err)
206 log.Println("listening at", listener.Addr())
207 srv := &http.Server{}
211 // Periodically (once per interval) invoke EmptyTrash on all volumes.
212 func emptyTrash(done <-chan bool, interval time.Duration) {
213 ticker := time.NewTicker(interval)
218 for _, v := range theConfig.Volumes {