06a26984d002ad3cbe9372cdaaad2a651b998990
[arvados.git] / services / keepstore / keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "flag"
9         "fmt"
10         "net"
11         "net/http"
12         "os"
13         "os/signal"
14         "syscall"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18         "git.curoverse.com/arvados.git/sdk/go/config"
19         "git.curoverse.com/arvados.git/sdk/go/httpserver"
20         "git.curoverse.com/arvados.git/sdk/go/keepclient"
21         log "github.com/Sirupsen/logrus"
22         "github.com/coreos/go-systemd/daemon"
23 )
24
25 var version = "dev"
26
27 // A Keep "block" is 64MB.
28 const BlockSize = 64 * 1024 * 1024
29
30 // A Keep volume must have at least MinFreeKilobytes available
31 // in order to permit writes.
32 const MinFreeKilobytes = BlockSize / 1024
33
34 // ProcMounts /proc/mounts
35 var ProcMounts = "/proc/mounts"
36
37 var bufs *bufferPool
38
39 // KeepError types.
40 //
41 type KeepError struct {
42         HTTPCode int
43         ErrMsg   string
44 }
45
46 var (
47         BadRequestError     = &KeepError{400, "Bad Request"}
48         UnauthorizedError   = &KeepError{401, "Unauthorized"}
49         CollisionError      = &KeepError{500, "Collision"}
50         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
51         PermissionError     = &KeepError{403, "Forbidden"}
52         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
53         ExpiredError        = &KeepError{401, "Expired permission signature"}
54         NotFoundError       = &KeepError{404, "Not Found"}
55         GenericError        = &KeepError{500, "Fail"}
56         FullError           = &KeepError{503, "Full"}
57         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
58         TooLongError        = &KeepError{413, "Block is too large"}
59         MethodDisabledError = &KeepError{405, "Method disabled"}
60         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
61         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
62 )
63
64 func (e *KeepError) Error() string {
65         return e.ErrMsg
66 }
67
68 // ========================
69 // Internal data structures
70 //
71 // These global variables are used by multiple parts of the
72 // program. They are good candidates for moving into their own
73 // packages.
74
75 // The Keep VolumeManager maintains a list of available volumes.
76 // Initialized by the --volumes flag (or by FindKeepVolumes).
77 var KeepVM VolumeManager
78
79 // The pull list manager and trash queue are threadsafe queues which
80 // support atomic update operations. The PullHandler and TrashHandler
81 // store results from Data Manager /pull and /trash requests here.
82 //
83 // See the Keep and Data Manager design documents for more details:
84 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
85 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
86 //
87 var pullq *WorkQueue
88 var trashq *WorkQueue
89
90 func main() {
91         deprecated.beforeFlagParse(theConfig)
92
93         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
94         getVersion := flag.Bool("version", false, "Print version information and exit.")
95
96         defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
97         var configPath string
98         flag.StringVar(
99                 &configPath,
100                 "config",
101                 defaultConfigPath,
102                 "YAML or JSON configuration file `path`")
103         flag.Usage = usage
104         flag.Parse()
105
106         // Print version information if requested
107         if *getVersion {
108                 fmt.Printf("Version: %s\n", version)
109                 os.Exit(0)
110         }
111
112         deprecated.afterFlagParse(theConfig)
113
114         err := config.LoadFile(theConfig, configPath)
115         if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
116                 log.Fatal(err)
117         }
118
119         if *dumpConfig {
120                 log.Fatal(config.DumpAndExit(theConfig))
121         }
122
123         log.Printf("keepstore %q started", version)
124
125         err = theConfig.Start()
126         if err != nil {
127                 log.Fatal(err)
128         }
129
130         if pidfile := theConfig.PIDFile; pidfile != "" {
131                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
132                 if err != nil {
133                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
134                 }
135                 defer f.Close()
136                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
137                 if err != nil {
138                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
139                 }
140                 defer os.Remove(pidfile)
141                 err = f.Truncate(0)
142                 if err != nil {
143                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
144                 }
145                 _, err = fmt.Fprint(f, os.Getpid())
146                 if err != nil {
147                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
148                 }
149                 err = f.Sync()
150                 if err != nil {
151                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
152                 }
153         }
154
155         log.Println("keepstore starting, pid", os.Getpid())
156         defer log.Println("keepstore exiting, pid", os.Getpid())
157
158         // Start a round-robin VolumeManager with the volumes we have found.
159         KeepVM = MakeRRVolumeManager(theConfig.Volumes)
160
161         // Middleware stack: logger, MaxRequests limiter, method handlers
162         router := MakeRESTRouter()
163         limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
164         router.limiter = limiter
165         http.Handle("/", &LoggingRESTRouter{router: limiter})
166
167         // Set up a TCP listener.
168         listener, err := net.Listen("tcp", theConfig.Listen)
169         if err != nil {
170                 log.Fatal(err)
171         }
172
173         // Initialize Pull queue and worker
174         keepClient := &keepclient.KeepClient{
175                 Arvados:       &arvadosclient.ArvadosClient{},
176                 Want_replicas: 1,
177         }
178
179         // Initialize the pullq and worker
180         pullq = NewWorkQueue()
181         go RunPullWorker(pullq, keepClient)
182
183         // Initialize the trashq and worker
184         trashq = NewWorkQueue()
185         go RunTrashWorker(trashq)
186
187         // Start emptyTrash goroutine
188         doneEmptyingTrash := make(chan bool)
189         go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
190
191         // Shut down the server gracefully (by closing the listener)
192         // if SIGTERM is received.
193         term := make(chan os.Signal, 1)
194         go func(sig <-chan os.Signal) {
195                 s := <-sig
196                 log.Println("caught signal:", s)
197                 doneEmptyingTrash <- true
198                 listener.Close()
199         }(term)
200         signal.Notify(term, syscall.SIGTERM)
201         signal.Notify(term, syscall.SIGINT)
202
203         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
204                 log.Printf("Error notifying init daemon: %v", err)
205         }
206         log.Println("listening at", listener.Addr())
207         srv := &http.Server{}
208         srv.Serve(listener)
209 }
210
211 // Periodically (once per interval) invoke EmptyTrash on all volumes.
212 func emptyTrash(done <-chan bool, interval time.Duration) {
213         ticker := time.NewTicker(interval)
214
215         for {
216                 select {
217                 case <-ticker.C:
218                         for _, v := range theConfig.Volumes {
219                                 if v.Writable() {
220                                         v.EmptyTrash()
221                                 }
222                         }
223                 case <-done:
224                         ticker.Stop()
225                         return
226                 }
227         }
228 }