10666: Added version number to go sdk and go tools & services
[arvados.git] / services / keepstore / keepstore.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "flag"
9         "fmt"
10         "net"
11         "net/http"
12         "os"
13         "os/signal"
14         "syscall"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18         "git.curoverse.com/arvados.git/sdk/go/config"
19         "git.curoverse.com/arvados.git/sdk/go/httpserver"
20         "git.curoverse.com/arvados.git/sdk/go/keepclient"
21         arvadosVersion "git.curoverse.com/arvados.git/sdk/go/version"
22         log "github.com/Sirupsen/logrus"
23         "github.com/coreos/go-systemd/daemon"
24 )
25
26 // A Keep "block" is 64MB.
27 const BlockSize = 64 * 1024 * 1024
28
29 // A Keep volume must have at least MinFreeKilobytes available
30 // in order to permit writes.
31 const MinFreeKilobytes = BlockSize / 1024
32
33 // ProcMounts /proc/mounts
34 var ProcMounts = "/proc/mounts"
35
36 var bufs *bufferPool
37
38 // KeepError types.
39 //
40 type KeepError struct {
41         HTTPCode int
42         ErrMsg   string
43 }
44
45 var (
46         BadRequestError     = &KeepError{400, "Bad Request"}
47         UnauthorizedError   = &KeepError{401, "Unauthorized"}
48         CollisionError      = &KeepError{500, "Collision"}
49         RequestHashError    = &KeepError{422, "Hash mismatch in request"}
50         PermissionError     = &KeepError{403, "Forbidden"}
51         DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
52         ExpiredError        = &KeepError{401, "Expired permission signature"}
53         NotFoundError       = &KeepError{404, "Not Found"}
54         GenericError        = &KeepError{500, "Fail"}
55         FullError           = &KeepError{503, "Full"}
56         SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
57         TooLongError        = &KeepError{413, "Block is too large"}
58         MethodDisabledError = &KeepError{405, "Method disabled"}
59         ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
60         ErrClientDisconnect = &KeepError{503, "Client disconnected"}
61 )
62
63 func (e *KeepError) Error() string {
64         return e.ErrMsg
65 }
66
67 // ========================
68 // Internal data structures
69 //
70 // These global variables are used by multiple parts of the
71 // program. They are good candidates for moving into their own
72 // packages.
73
74 // The Keep VolumeManager maintains a list of available volumes.
75 // Initialized by the --volumes flag (or by FindKeepVolumes).
76 var KeepVM VolumeManager
77
78 // The pull list manager and trash queue are threadsafe queues which
79 // support atomic update operations. The PullHandler and TrashHandler
80 // store results from Data Manager /pull and /trash requests here.
81 //
82 // See the Keep and Data Manager design documents for more details:
83 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
84 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
85 //
86 var pullq *WorkQueue
87 var trashq *WorkQueue
88
89 func main() {
90         deprecated.beforeFlagParse(theConfig)
91
92         dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
93         getVersion := flag.Bool("version", false, "Print version information and exit.")
94
95         defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
96         var configPath string
97         flag.StringVar(
98                 &configPath,
99                 "config",
100                 defaultConfigPath,
101                 "YAML or JSON configuration file `path`")
102         flag.Usage = usage
103         flag.Parse()
104
105         // Print version information if requested
106         if *getVersion {
107                 fmt.Printf("Version: %s\n", arvadosVersion.GetVersion())
108                 os.Exit(0)
109         }
110
111         deprecated.afterFlagParse(theConfig)
112
113         err := config.LoadFile(theConfig, configPath)
114         if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
115                 log.Fatal(err)
116         }
117
118         if *dumpConfig {
119                 log.Fatal(config.DumpAndExit(theConfig))
120         }
121
122         log.Printf("keepstore %q started", arvadosVersion.GetVersion())
123
124         err = theConfig.Start()
125         if err != nil {
126                 log.Fatal(err)
127         }
128
129         if pidfile := theConfig.PIDFile; pidfile != "" {
130                 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
131                 if err != nil {
132                         log.Fatalf("open pidfile (%s): %s", pidfile, err)
133                 }
134                 defer f.Close()
135                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
136                 if err != nil {
137                         log.Fatalf("flock pidfile (%s): %s", pidfile, err)
138                 }
139                 defer os.Remove(pidfile)
140                 err = f.Truncate(0)
141                 if err != nil {
142                         log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
143                 }
144                 _, err = fmt.Fprint(f, os.Getpid())
145                 if err != nil {
146                         log.Fatalf("write pidfile (%s): %s", pidfile, err)
147                 }
148                 err = f.Sync()
149                 if err != nil {
150                         log.Fatalf("sync pidfile (%s): %s", pidfile, err)
151                 }
152         }
153
154         log.Println("keepstore starting, pid", os.Getpid())
155         defer log.Println("keepstore exiting, pid", os.Getpid())
156
157         // Start a round-robin VolumeManager with the volumes we have found.
158         KeepVM = MakeRRVolumeManager(theConfig.Volumes)
159
160         // Middleware stack: logger, MaxRequests limiter, method handlers
161         router := MakeRESTRouter()
162         limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
163         router.limiter = limiter
164         http.Handle("/", &LoggingRESTRouter{router: limiter})
165
166         // Set up a TCP listener.
167         listener, err := net.Listen("tcp", theConfig.Listen)
168         if err != nil {
169                 log.Fatal(err)
170         }
171
172         // Initialize Pull queue and worker
173         keepClient := &keepclient.KeepClient{
174                 Arvados:       &arvadosclient.ArvadosClient{},
175                 Want_replicas: 1,
176         }
177
178         // Initialize the pullq and worker
179         pullq = NewWorkQueue()
180         go RunPullWorker(pullq, keepClient)
181
182         // Initialize the trashq and worker
183         trashq = NewWorkQueue()
184         go RunTrashWorker(trashq)
185
186         // Start emptyTrash goroutine
187         doneEmptyingTrash := make(chan bool)
188         go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
189
190         // Shut down the server gracefully (by closing the listener)
191         // if SIGTERM is received.
192         term := make(chan os.Signal, 1)
193         go func(sig <-chan os.Signal) {
194                 s := <-sig
195                 log.Println("caught signal:", s)
196                 doneEmptyingTrash <- true
197                 listener.Close()
198         }(term)
199         signal.Notify(term, syscall.SIGTERM)
200         signal.Notify(term, syscall.SIGINT)
201
202         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
203                 log.Printf("Error notifying init daemon: %v", err)
204         }
205         log.Println("listening at", listener.Addr())
206         srv := &http.Server{}
207         srv.Serve(listener)
208 }
209
210 // Periodically (once per interval) invoke EmptyTrash on all volumes.
211 func emptyTrash(done <-chan bool, interval time.Duration) {
212         ticker := time.NewTicker(interval)
213
214         for {
215                 select {
216                 case <-ticker.C:
217                         for _, v := range theConfig.Volumes {
218                                 if v.Writable() {
219                                         v.EmptyTrash()
220                                 }
221                         }
222                 case <-done:
223                         ticker.Stop()
224                         return
225                 }
226         }
227 }