Merge branch 'thehyve/fix-crunch-documentation' Fix a typo in Crunch Dispatch install...
[arvados.git] / services / keepstore / keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "flag"
9         "fmt"
10         "net"
11         "os"
12         "os/signal"
13         "syscall"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17         "git.curoverse.com/arvados.git/sdk/go/config"
18         "git.curoverse.com/arvados.git/sdk/go/keepclient"
19         "github.com/coreos/go-systemd/daemon"
20 )
21
22 var version = "dev"
23
24 // A Keep "block" is 64MB.
25 const BlockSize = 64 * 1024 * 1024
26
27 // A Keep volume must have at least MinFreeKilobytes available
28 // in order to permit writes.
29 const MinFreeKilobytes = BlockSize / 1024
30
31 // ProcMounts /proc/mounts
32 var ProcMounts = "/proc/mounts"
33
34 var bufs *bufferPool
35
36 // KeepError types.
37 //
38 type KeepError struct {
39         HTTPCode int
40         ErrMsg   string
41 }
42
43 var (
44         BadRequestError     = &KeepError{400, "Bad Request"}
45         UnauthorizedError   = &KeepError{401, "Unauthorized"}
46         CollisionError      = &KeepError{500, "Collision"}
47         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
48         PermissionError     = &KeepError{403, "Forbidden"}
49         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
50         ExpiredError        = &KeepError{401, "Expired permission signature"}
51         NotFoundError       = &KeepError{404, "Not Found"}
52         GenericError        = &KeepError{500, "Fail"}
53         FullError           = &KeepError{503, "Full"}
54         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
55         TooLongError        = &KeepError{413, "Block is too large"}
56         MethodDisabledError = &KeepError{405, "Method disabled"}
57         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
58         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
59 )
60
61 func (e *KeepError) Error() string {
62         return e.ErrMsg
63 }
64
65 // ========================
66 // Internal data structures
67 //
68 // These global variables are used by multiple parts of the
69 // program. They are good candidates for moving into their own
70 // packages.
71
72 // The Keep VolumeManager maintains a list of available volumes.
73 // Initialized by the --volumes flag (or by FindKeepVolumes).
74 var KeepVM VolumeManager
75
76 // The pull list manager and trash queue are threadsafe queues which
77 // support atomic update operations. The PullHandler and TrashHandler
78 // store results from Data Manager /pull and /trash requests here.
79 //
80 // See the Keep and Data Manager design documents for more details:
81 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
82 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
83 //
84 var pullq *WorkQueue
85 var trashq *WorkQueue
86
87 func main() {
88         deprecated.beforeFlagParse(theConfig)
89
90         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
91         getVersion := flag.Bool("version", false, "Print version information and exit.")
92
93         defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
94         var configPath string
95         flag.StringVar(
96                 &configPath,
97                 "config",
98                 defaultConfigPath,
99                 "YAML or JSON configuration file `path`")
100         flag.Usage = usage
101         flag.Parse()
102
103         // Print version information if requested
104         if *getVersion {
105                 fmt.Printf("keepstore %s\n", version)
106                 return
107         }
108
109         deprecated.afterFlagParse(theConfig)
110
111         err := config.LoadFile(theConfig, configPath)
112         if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
113                 log.Fatal(err)
114         }
115
116         if *dumpConfig {
117                 log.Fatal(config.DumpAndExit(theConfig))
118         }
119
120         log.Printf("keepstore %s started", version)
121
122         err = theConfig.Start()
123         if err != nil {
124                 log.Fatal(err)
125         }
126
127         if pidfile := theConfig.PIDFile; pidfile != "" {
128                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
129                 if err != nil {
130                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
131                 }
132                 defer f.Close()
133                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
134                 if err != nil {
135                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
136                 }
137                 defer os.Remove(pidfile)
138                 err = f.Truncate(0)
139                 if err != nil {
140                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
141                 }
142                 _, err = fmt.Fprint(f, os.Getpid())
143                 if err != nil {
144                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
145                 }
146                 err = f.Sync()
147                 if err != nil {
148                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
149                 }
150         }
151
152         log.Println("keepstore starting, pid", os.Getpid())
153         defer log.Println("keepstore exiting, pid", os.Getpid())
154
155         // Start a round-robin VolumeManager with the volumes we have found.
156         KeepVM = MakeRRVolumeManager(theConfig.Volumes)
157
158         // Middleware/handler stack
159         router := MakeRESTRouter()
160
161         // Set up a TCP listener.
162         listener, err := net.Listen("tcp", theConfig.Listen)
163         if err != nil {
164                 log.Fatal(err)
165         }
166
167         // Initialize keepclient for pull workers
168         keepClient := &keepclient.KeepClient{
169                 Arvados:       &arvadosclient.ArvadosClient{},
170                 Want_replicas: 1,
171         }
172
173         // Initialize the pullq and workers
174         pullq = NewWorkQueue()
175         for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
176                 go RunPullWorker(pullq, keepClient)
177         }
178
179         // Initialize the trashq and workers
180         trashq = NewWorkQueue()
181         for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
182                 go RunTrashWorker(trashq)
183         }
184
185         // Start emptyTrash goroutine
186         doneEmptyingTrash := make(chan bool)
187         go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
188
189         // Shut down the server gracefully (by closing the listener)
190         // if SIGTERM is received.
191         term := make(chan os.Signal, 1)
192         go func(sig <-chan os.Signal) {
193                 s := <-sig
194                 log.Println("caught signal:", s)
195                 doneEmptyingTrash <- true
196                 listener.Close()
197         }(term)
198         signal.Notify(term, syscall.SIGTERM)
199         signal.Notify(term, syscall.SIGINT)
200
201         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
202                 log.Printf("Error notifying init daemon: %v", err)
203         }
204         log.Println("listening at", listener.Addr())
205         srv := &server{}
206         srv.Handler = router
207         srv.Serve(listener)
208 }
209
210 // Periodically (once per interval) invoke EmptyTrash on all volumes.
211 func emptyTrash(done <-chan bool, interval time.Duration) {
212         ticker := time.NewTicker(interval)
213
214         for {
215                 select {
216                 case <-ticker.C:
217                         for _, v := range theConfig.Volumes {
218                                 if v.Writable() {
219                                         v.EmptyTrash()
220                                 }
221                         }
222                 case <-done:
223                         ticker.Stop()
224                         return
225                 }
226         }
227 }