9956: Add systemd unit file keepstore.service
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "flag"
5         "fmt"
6         "log"
7         "net"
8         "net/http"
9         "os"
10         "os/signal"
11         "syscall"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
16         "git.curoverse.com/arvados.git/sdk/go/config"
17         "git.curoverse.com/arvados.git/sdk/go/httpserver"
18         "git.curoverse.com/arvados.git/sdk/go/keepclient"
19         "github.com/coreos/go-systemd/daemon"
20         "github.com/ghodss/yaml"
21 )
22
23 // A Keep "block" is 64MB.
24 const BlockSize = 64 * 1024 * 1024
25
26 // A Keep volume must have at least MinFreeKilobytes available
27 // in order to permit writes.
28 const MinFreeKilobytes = BlockSize / 1024
29
30 // ProcMounts /proc/mounts
31 var ProcMounts = "/proc/mounts"
32
33 var bufs *bufferPool
34
35 // KeepError types.
36 //
37 type KeepError struct {
38         HTTPCode int
39         ErrMsg   string
40 }
41
42 var (
43         BadRequestError     = &KeepError{400, "Bad Request"}
44         UnauthorizedError   = &KeepError{401, "Unauthorized"}
45         CollisionError      = &KeepError{500, "Collision"}
46         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
47         PermissionError     = &KeepError{403, "Forbidden"}
48         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
49         ExpiredError        = &KeepError{401, "Expired permission signature"}
50         NotFoundError       = &KeepError{404, "Not Found"}
51         GenericError        = &KeepError{500, "Fail"}
52         FullError           = &KeepError{503, "Full"}
53         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
54         TooLongError        = &KeepError{413, "Block is too large"}
55         MethodDisabledError = &KeepError{405, "Method disabled"}
56         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
57         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
58 )
59
60 func (e *KeepError) Error() string {
61         return e.ErrMsg
62 }
63
64 // ========================
65 // Internal data structures
66 //
67 // These global variables are used by multiple parts of the
68 // program. They are good candidates for moving into their own
69 // packages.
70
71 // The Keep VolumeManager maintains a list of available volumes.
72 // Initialized by the --volumes flag (or by FindKeepVolumes).
73 var KeepVM VolumeManager
74
75 // The pull list manager and trash queue are threadsafe queues which
76 // support atomic update operations. The PullHandler and TrashHandler
77 // store results from Data Manager /pull and /trash requests here.
78 //
79 // See the Keep and Data Manager design documents for more details:
80 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
81 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
82 //
83 var pullq *WorkQueue
84 var trashq *WorkQueue
85
86 var (
87         flagSerializeIO bool
88         flagReadonly    bool
89 )
90
91 // TODO(twp): continue moving as much code as possible out of main
92 // so it can be effectively tested. Esp. handling and postprocessing
93 // of command line flags (identifying Keep volumes and initializing
94 // permission arguments).
95
96 func main() {
97         neverDelete := !theConfig.EnableDelete
98         signatureTTLSeconds := int(theConfig.BlobSignatureTTL.Duration() / time.Second)
99         flag.StringVar(&theConfig.Listen, "listen", theConfig.Listen, "see Listen configuration")
100         flag.IntVar(&theConfig.MaxBuffers, "max-buffers", theConfig.MaxBuffers, "see MaxBuffers configuration")
101         flag.IntVar(&theConfig.MaxRequests, "max-requests", theConfig.MaxRequests, "see MaxRequests configuration")
102         flag.BoolVar(&neverDelete, "never-delete", neverDelete, "see EnableDelete configuration")
103         flag.BoolVar(&theConfig.RequireSignatures, "enforce-permissions", theConfig.RequireSignatures, "see RequireSignatures configuration")
104         flag.StringVar(&theConfig.BlobSigningKeyFile, "permission-key-file", theConfig.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
105         flag.StringVar(&theConfig.BlobSigningKeyFile, "blob-signing-key-file", theConfig.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
106         flag.StringVar(&theConfig.SystemAuthTokenFile, "data-manager-token-file", theConfig.SystemAuthTokenFile, "see SystemAuthToken`File` configuration")
107         flag.IntVar(&signatureTTLSeconds, "permission-ttl", signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
108         flag.IntVar(&signatureTTLSeconds, "blob-signature-ttl", signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
109         flag.Var(&theConfig.TrashLifetime, "trash-lifetime", "see TrashLifetime configuration")
110         flag.BoolVar(&flagSerializeIO, "serialize", false, "serialize read and write operations on the following volumes.")
111         flag.BoolVar(&flagReadonly, "readonly", false, "do not write, delete, or touch anything on the following volumes.")
112         flag.StringVar(&theConfig.PIDFile, "pid", theConfig.PIDFile, "see `PIDFile` configuration")
113         flag.Var(&theConfig.TrashCheckInterval, "trash-check-interval", "see TrashCheckInterval configuration")
114
115         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
116
117         defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
118         var configPath string
119         flag.StringVar(
120                 &configPath,
121                 "config",
122                 defaultConfigPath,
123                 "YAML or JSON configuration file `path`")
124         flag.Usage = usage
125         flag.Parse()
126
127         theConfig.BlobSignatureTTL = arvados.Duration(signatureTTLSeconds) * arvados.Duration(time.Second)
128         theConfig.EnableDelete = !neverDelete
129
130         // TODO: Load config
131         err := config.LoadFile(theConfig, configPath)
132         if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
133                 log.Fatal(err)
134         }
135
136         if *dumpConfig {
137                 y, err := yaml.Marshal(theConfig)
138                 if err != nil {
139                         log.Fatal(err)
140                 }
141                 os.Stdout.Write(y)
142                 os.Exit(0)
143         }
144
145         err = theConfig.Start()
146
147         if pidfile := theConfig.PIDFile; pidfile != "" {
148                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
149                 if err != nil {
150                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
151                 }
152                 defer f.Close()
153                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
154                 if err != nil {
155                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
156                 }
157                 defer os.Remove(pidfile)
158                 err = f.Truncate(0)
159                 if err != nil {
160                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
161                 }
162                 _, err = fmt.Fprint(f, os.Getpid())
163                 if err != nil {
164                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
165                 }
166                 err = f.Sync()
167                 if err != nil {
168                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
169                 }
170         }
171
172         log.Println("keepstore starting, pid", os.Getpid())
173         defer log.Println("keepstore exiting, pid", os.Getpid())
174
175         // Start a round-robin VolumeManager with the volumes we have found.
176         KeepVM = MakeRRVolumeManager(theConfig.Volumes)
177
178         // Middleware stack: logger, MaxRequests limiter, method handlers
179         http.Handle("/", &LoggingRESTRouter{
180                 httpserver.NewRequestLimiter(theConfig.MaxRequests,
181                         MakeRESTRouter()),
182         })
183
184         // Set up a TCP listener.
185         listener, err := net.Listen("tcp", theConfig.Listen)
186         if err != nil {
187                 log.Fatal(err)
188         }
189
190         // Initialize Pull queue and worker
191         keepClient := &keepclient.KeepClient{
192                 Arvados:       &arvadosclient.ArvadosClient{},
193                 Want_replicas: 1,
194                 Client:        &http.Client{},
195         }
196
197         // Initialize the pullq and worker
198         pullq = NewWorkQueue()
199         go RunPullWorker(pullq, keepClient)
200
201         // Initialize the trashq and worker
202         trashq = NewWorkQueue()
203         go RunTrashWorker(trashq)
204
205         // Start emptyTrash goroutine
206         doneEmptyingTrash := make(chan bool)
207         go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
208
209         // Shut down the server gracefully (by closing the listener)
210         // if SIGTERM is received.
211         term := make(chan os.Signal, 1)
212         go func(sig <-chan os.Signal) {
213                 s := <-sig
214                 log.Println("caught signal:", s)
215                 doneEmptyingTrash <- true
216                 listener.Close()
217         }(term)
218         signal.Notify(term, syscall.SIGTERM)
219         signal.Notify(term, syscall.SIGINT)
220
221         if _, err := daemon.SdNotify("READY=1"); err != nil {
222                 log.Printf("Error notifying init daemon: %v", err)
223         }
224         log.Println("listening at", listener.Addr)
225         srv := &http.Server{}
226         srv.Serve(listener)
227 }
228
229 // Periodically (once per interval) invoke EmptyTrash on all volumes.
230 func emptyTrash(done <-chan bool, interval time.Duration) {
231         ticker := time.NewTicker(interval)
232
233         for {
234                 select {
235                 case <-ticker.C:
236                         for _, v := range theConfig.Volumes {
237                                 if v.Writable() {
238                                         v.EmptyTrash()
239                                 }
240                         }
241                 case <-done:
242                         ticker.Stop()
243                         return
244                 }
245         }
246 }