11583: 13100: Fix memory leak in test suite.
[arvados.git] / services / keepstore / keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "flag"
9         "fmt"
10         "net"
11         "net/http"
12         "os"
13         "os/signal"
14         "syscall"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18         "git.curoverse.com/arvados.git/sdk/go/config"
19         "git.curoverse.com/arvados.git/sdk/go/keepclient"
20         "github.com/coreos/go-systemd/daemon"
21 )
22
23 var version = "dev"
24
25 // A Keep "block" is 64MB.
26 const BlockSize = 64 * 1024 * 1024
27
28 // A Keep volume must have at least MinFreeKilobytes available
29 // in order to permit writes.
30 const MinFreeKilobytes = BlockSize / 1024
31
32 // ProcMounts /proc/mounts
33 var ProcMounts = "/proc/mounts"
34
35 var bufs *bufferPool
36
37 // KeepError types.
38 //
39 type KeepError struct {
40         HTTPCode int
41         ErrMsg   string
42 }
43
44 var (
45         BadRequestError     = &KeepError{400, "Bad Request"}
46         UnauthorizedError   = &KeepError{401, "Unauthorized"}
47         CollisionError      = &KeepError{500, "Collision"}
48         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
49         PermissionError     = &KeepError{403, "Forbidden"}
50         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
51         ExpiredError        = &KeepError{401, "Expired permission signature"}
52         NotFoundError       = &KeepError{404, "Not Found"}
53         GenericError        = &KeepError{500, "Fail"}
54         FullError           = &KeepError{503, "Full"}
55         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
56         TooLongError        = &KeepError{413, "Block is too large"}
57         MethodDisabledError = &KeepError{405, "Method disabled"}
58         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
59         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
60 )
61
62 func (e *KeepError) Error() string {
63         return e.ErrMsg
64 }
65
66 // ========================
67 // Internal data structures
68 //
69 // These global variables are used by multiple parts of the
70 // program. They are good candidates for moving into their own
71 // packages.
72
73 // The Keep VolumeManager maintains a list of available volumes.
74 // Initialized by the --volumes flag (or by FindKeepVolumes).
75 var KeepVM VolumeManager
76
77 // The pull list manager and trash queue are threadsafe queues which
78 // support atomic update operations. The PullHandler and TrashHandler
79 // store results from Data Manager /pull and /trash requests here.
80 //
81 // See the Keep and Data Manager design documents for more details:
82 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
83 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
84 //
85 var pullq *WorkQueue
86 var trashq *WorkQueue
87
88 func main() {
89         deprecated.beforeFlagParse(theConfig)
90
91         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
92         getVersion := flag.Bool("version", false, "Print version information and exit.")
93
94         defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
95         var configPath string
96         flag.StringVar(
97                 &configPath,
98                 "config",
99                 defaultConfigPath,
100                 "YAML or JSON configuration file `path`")
101         flag.Usage = usage
102         flag.Parse()
103
104         // Print version information if requested
105         if *getVersion {
106                 fmt.Printf("keepstore %s\n", version)
107                 return
108         }
109
110         deprecated.afterFlagParse(theConfig)
111
112         err := config.LoadFile(theConfig, configPath)
113         if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
114                 log.Fatal(err)
115         }
116
117         if *dumpConfig {
118                 log.Fatal(config.DumpAndExit(theConfig))
119         }
120
121         log.Printf("keepstore %s started", version)
122
123         err = theConfig.Start()
124         if err != nil {
125                 log.Fatal(err)
126         }
127
128         if pidfile := theConfig.PIDFile; pidfile != "" {
129                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
130                 if err != nil {
131                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
132                 }
133                 defer f.Close()
134                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
135                 if err != nil {
136                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
137                 }
138                 defer os.Remove(pidfile)
139                 err = f.Truncate(0)
140                 if err != nil {
141                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
142                 }
143                 _, err = fmt.Fprint(f, os.Getpid())
144                 if err != nil {
145                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
146                 }
147                 err = f.Sync()
148                 if err != nil {
149                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
150                 }
151         }
152
153         log.Println("keepstore starting, pid", os.Getpid())
154         defer log.Println("keepstore exiting, pid", os.Getpid())
155
156         // Start a round-robin VolumeManager with the volumes we have found.
157         KeepVM = MakeRRVolumeManager(theConfig.Volumes)
158
159         // Middleware/handler stack
160         router := MakeRESTRouter()
161
162         // Set up a TCP listener.
163         listener, err := net.Listen("tcp", theConfig.Listen)
164         if err != nil {
165                 log.Fatal(err)
166         }
167
168         // Initialize Pull queue and worker
169         keepClient := &keepclient.KeepClient{
170                 Arvados:       &arvadosclient.ArvadosClient{},
171                 Want_replicas: 1,
172         }
173
174         // Initialize the pullq and worker
175         pullq = NewWorkQueue()
176         go RunPullWorker(pullq, keepClient)
177
178         // Initialize the trashq and worker
179         trashq = NewWorkQueue()
180         go RunTrashWorker(trashq)
181
182         // Start emptyTrash goroutine
183         doneEmptyingTrash := make(chan bool)
184         go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
185
186         // Shut down the server gracefully (by closing the listener)
187         // if SIGTERM is received.
188         term := make(chan os.Signal, 1)
189         go func(sig <-chan os.Signal) {
190                 s := <-sig
191                 log.Println("caught signal:", s)
192                 doneEmptyingTrash <- true
193                 listener.Close()
194         }(term)
195         signal.Notify(term, syscall.SIGTERM)
196         signal.Notify(term, syscall.SIGINT)
197
198         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
199                 log.Printf("Error notifying init daemon: %v", err)
200         }
201         log.Println("listening at", listener.Addr())
202         srv := &http.Server{Handler: router}
203         srv.Serve(listener)
204 }
205
206 // Periodically (once per interval) invoke EmptyTrash on all volumes.
207 func emptyTrash(done <-chan bool, interval time.Duration) {
208         ticker := time.NewTicker(interval)
209
210         for {
211                 select {
212                 case <-ticker.C:
213                         for _, v := range theConfig.Volumes {
214                                 if v.Writable() {
215                                         v.EmptyTrash()
216                                 }
217                         }
218                 case <-done:
219                         ticker.Stop()
220                         return
221                 }
222         }
223 }