Merge branch '10211-double-close-crash' closes #10211
[arvados.git] / services / keepstore / keepstore.go
1 package main
2
3 import (
4         "flag"
5         "fmt"
6         "log"
7         "net"
8         "net/http"
9         "os"
10         "os/signal"
11         "syscall"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15         "git.curoverse.com/arvados.git/sdk/go/config"
16         "git.curoverse.com/arvados.git/sdk/go/httpserver"
17         "git.curoverse.com/arvados.git/sdk/go/keepclient"
18         "github.com/coreos/go-systemd/daemon"
19         "github.com/ghodss/yaml"
20 )
21
22 // A Keep "block" is 64MB.
23 const BlockSize = 64 * 1024 * 1024
24
25 // A Keep volume must have at least MinFreeKilobytes available
26 // in order to permit writes.
27 const MinFreeKilobytes = BlockSize / 1024
28
29 // ProcMounts /proc/mounts
30 var ProcMounts = "/proc/mounts"
31
32 var bufs *bufferPool
33
34 // KeepError types.
35 //
36 type KeepError struct {
37         HTTPCode int
38         ErrMsg   string
39 }
40
41 var (
42         BadRequestError     = &KeepError{400, "Bad Request"}
43         UnauthorizedError   = &KeepError{401, "Unauthorized"}
44         CollisionError      = &KeepError{500, "Collision"}
45         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
46         PermissionError     = &KeepError{403, "Forbidden"}
47         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
48         ExpiredError        = &KeepError{401, "Expired permission signature"}
49         NotFoundError       = &KeepError{404, "Not Found"}
50         GenericError        = &KeepError{500, "Fail"}
51         FullError           = &KeepError{503, "Full"}
52         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
53         TooLongError        = &KeepError{413, "Block is too large"}
54         MethodDisabledError = &KeepError{405, "Method disabled"}
55         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
56         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
57 )
58
59 func (e *KeepError) Error() string {
60         return e.ErrMsg
61 }
62
63 // ========================
64 // Internal data structures
65 //
66 // These global variables are used by multiple parts of the
67 // program. They are good candidates for moving into their own
68 // packages.
69
70 // The Keep VolumeManager maintains a list of available volumes.
71 // Initialized by the --volumes flag (or by FindKeepVolumes).
72 var KeepVM VolumeManager
73
74 // The pull list manager and trash queue are threadsafe queues which
75 // support atomic update operations. The PullHandler and TrashHandler
76 // store results from Data Manager /pull and /trash requests here.
77 //
78 // See the Keep and Data Manager design documents for more details:
79 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
80 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
81 //
82 var pullq *WorkQueue
83 var trashq *WorkQueue
84
85 // TODO(twp): continue moving as much code as possible out of main
86 // so it can be effectively tested. Esp. handling and postprocessing
87 // of command line flags (identifying Keep volumes and initializing
88 // permission arguments).
89
90 func main() {
91         deprecated.beforeFlagParse(theConfig)
92
93         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
94
95         defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
96         var configPath string
97         flag.StringVar(
98                 &configPath,
99                 "config",
100                 defaultConfigPath,
101                 "YAML or JSON configuration file `path`")
102         flag.Usage = usage
103         flag.Parse()
104
105         deprecated.afterFlagParse(theConfig)
106
107         err := config.LoadFile(theConfig, configPath)
108         if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
109                 log.Fatal(err)
110         }
111
112         if *dumpConfig {
113                 y, err := yaml.Marshal(theConfig)
114                 if err != nil {
115                         log.Fatal(err)
116                 }
117                 os.Stdout.Write(y)
118                 os.Exit(0)
119         }
120
121         err = theConfig.Start()
122
123         if pidfile := theConfig.PIDFile; pidfile != "" {
124                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
125                 if err != nil {
126                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
127                 }
128                 defer f.Close()
129                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
130                 if err != nil {
131                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
132                 }
133                 defer os.Remove(pidfile)
134                 err = f.Truncate(0)
135                 if err != nil {
136                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
137                 }
138                 _, err = fmt.Fprint(f, os.Getpid())
139                 if err != nil {
140                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
141                 }
142                 err = f.Sync()
143                 if err != nil {
144                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
145                 }
146         }
147
148         log.Println("keepstore starting, pid", os.Getpid())
149         defer log.Println("keepstore exiting, pid", os.Getpid())
150
151         // Start a round-robin VolumeManager with the volumes we have found.
152         KeepVM = MakeRRVolumeManager(theConfig.Volumes)
153
154         // Middleware stack: logger, MaxRequests limiter, method handlers
155         http.Handle("/", &LoggingRESTRouter{
156                 httpserver.NewRequestLimiter(theConfig.MaxRequests,
157                         MakeRESTRouter()),
158         })
159
160         // Set up a TCP listener.
161         listener, err := net.Listen("tcp", theConfig.Listen)
162         if err != nil {
163                 log.Fatal(err)
164         }
165
166         // Initialize Pull queue and worker
167         keepClient := &keepclient.KeepClient{
168                 Arvados:       &arvadosclient.ArvadosClient{},
169                 Want_replicas: 1,
170                 Client:        &http.Client{},
171         }
172
173         // Initialize the pullq and worker
174         pullq = NewWorkQueue()
175         go RunPullWorker(pullq, keepClient)
176
177         // Initialize the trashq and worker
178         trashq = NewWorkQueue()
179         go RunTrashWorker(trashq)
180
181         // Start emptyTrash goroutine
182         doneEmptyingTrash := make(chan bool)
183         go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
184
185         // Shut down the server gracefully (by closing the listener)
186         // if SIGTERM is received.
187         term := make(chan os.Signal, 1)
188         go func(sig <-chan os.Signal) {
189                 s := <-sig
190                 log.Println("caught signal:", s)
191                 doneEmptyingTrash <- true
192                 listener.Close()
193         }(term)
194         signal.Notify(term, syscall.SIGTERM)
195         signal.Notify(term, syscall.SIGINT)
196
197         if _, err := daemon.SdNotify("READY=1"); err != nil {
198                 log.Printf("Error notifying init daemon: %v", err)
199         }
200         log.Println("listening at", listener.Addr)
201         srv := &http.Server{}
202         srv.Serve(listener)
203 }
204
205 // Periodically (once per interval) invoke EmptyTrash on all volumes.
206 func emptyTrash(done <-chan bool, interval time.Duration) {
207         ticker := time.NewTicker(interval)
208
209         for {
210                 select {
211                 case <-ticker.C:
212                         for _, v := range theConfig.Volumes {
213                                 if v.Writable() {
214                                         v.EmptyTrash()
215                                 }
216                         }
217                 case <-done:
218                         ticker.Stop()
219                         return
220                 }
221         }
222 }