Merge branch '12737-wb-rails42-upgrade'
[arvados.git] / services / keepstore / config.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io/ioutil"
12         "net/http"
13         "strconv"
14         "strings"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/stats"
19         "github.com/Sirupsen/logrus"
20         "github.com/golang/protobuf/jsonpb"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/prometheus/client_golang/prometheus/promhttp"
23 )
24
25 type Config struct {
26         Debug  bool
27         Listen string
28
29         LogFormat string
30
31         PIDFile string
32
33         MaxBuffers  int
34         MaxRequests int
35
36         BlobSignatureTTL    arvados.Duration
37         BlobSigningKeyFile  string
38         RequireSignatures   bool
39         SystemAuthTokenFile string
40         EnableDelete        bool
41         TrashLifetime       arvados.Duration
42         TrashCheckInterval  arvados.Duration
43         PullWorkers         int
44         TrashWorkers        int
45         EmptyTrashWorkers   int
46         TLSCertificateFile  string
47         TLSKeyFile          string
48
49         Volumes VolumeList
50
51         blobSigningKey  []byte
52         systemAuthToken string
53         debugLogf       func(string, ...interface{})
54
55         ManagementToken string
56
57         metrics
58 }
59
60 var (
61         theConfig = DefaultConfig()
62         formatter = map[string]logrus.Formatter{
63                 "text": &logrus.TextFormatter{
64                         FullTimestamp:   true,
65                         TimestampFormat: rfc3339NanoFixed,
66                 },
67                 "json": &logrus.JSONFormatter{
68                         TimestampFormat: rfc3339NanoFixed,
69                 },
70         }
71         log = logrus.StandardLogger()
72 )
73
74 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
75
76 // DefaultConfig returns the default configuration.
77 func DefaultConfig() *Config {
78         return &Config{
79                 Listen:             ":25107",
80                 LogFormat:          "json",
81                 MaxBuffers:         128,
82                 RequireSignatures:  true,
83                 BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
84                 TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
85                 TrashCheckInterval: arvados.Duration(24 * time.Hour),
86                 Volumes:            []Volume{},
87         }
88 }
89
90 // Start should be called exactly once: after setting all public
91 // fields, and before using the config.
92 func (cfg *Config) Start() error {
93         if cfg.Debug {
94                 log.Level = logrus.DebugLevel
95                 cfg.debugLogf = log.Printf
96                 cfg.debugLogf("debugging enabled")
97         } else {
98                 log.Level = logrus.InfoLevel
99                 cfg.debugLogf = func(string, ...interface{}) {}
100         }
101
102         f := formatter[strings.ToLower(cfg.LogFormat)]
103         if f == nil {
104                 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
105         }
106         log.Formatter = f
107
108         if cfg.MaxBuffers < 0 {
109                 return fmt.Errorf("MaxBuffers must be greater than zero")
110         }
111         bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
112
113         if cfg.MaxRequests < 1 {
114                 cfg.MaxRequests = cfg.MaxBuffers * 2
115                 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
116         }
117
118         if cfg.BlobSigningKeyFile != "" {
119                 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
120                 if err != nil {
121                         return fmt.Errorf("reading blob signing key file: %s", err)
122                 }
123                 cfg.blobSigningKey = bytes.TrimSpace(buf)
124                 if len(cfg.blobSigningKey) == 0 {
125                         return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
126                 }
127         } else if cfg.RequireSignatures {
128                 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
129         } else {
130                 log.Println("Running without a blob signing key. Block locators " +
131                         "returned by this server will not be signed, and will be rejected " +
132                         "by a server that enforces permissions.")
133                 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
134         }
135
136         if fn := cfg.SystemAuthTokenFile; fn != "" {
137                 buf, err := ioutil.ReadFile(fn)
138                 if err != nil {
139                         return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
140                 }
141                 cfg.systemAuthToken = strings.TrimSpace(string(buf))
142         }
143
144         if cfg.EnableDelete {
145                 log.Print("Trash/delete features are enabled. WARNING: this has not " +
146                         "been extensively tested. You should disable this unless you can afford to lose data.")
147         }
148
149         if len(cfg.Volumes) == 0 {
150                 if (&unixVolumeAdder{cfg}).Discover() == 0 {
151                         return fmt.Errorf("no volumes found")
152                 }
153         }
154         for _, v := range cfg.Volumes {
155                 if err := v.Start(); err != nil {
156                         return fmt.Errorf("volume %s: %s", v, err)
157                 }
158                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
159         }
160         return nil
161 }
162
163 type metrics struct {
164         registry     *prometheus.Registry
165         reqDuration  *prometheus.SummaryVec
166         timeToStatus *prometheus.SummaryVec
167         exportProm   http.Handler
168 }
169
170 func (*metrics) Levels() []logrus.Level {
171         return logrus.AllLevels
172 }
173
174 func (m *metrics) Fire(ent *logrus.Entry) error {
175         if tts, ok := ent.Data["timeToStatus"].(stats.Duration); !ok {
176         } else if method, ok := ent.Data["reqMethod"].(string); !ok {
177         } else if code, ok := ent.Data["respStatusCode"].(int); !ok {
178         } else {
179                 m.timeToStatus.WithLabelValues(strconv.Itoa(code), strings.ToLower(method)).Observe(time.Duration(tts).Seconds())
180         }
181         return nil
182 }
183
184 func (m *metrics) setup() {
185         m.registry = prometheus.NewRegistry()
186         m.timeToStatus = prometheus.NewSummaryVec(prometheus.SummaryOpts{
187                 Name: "time_to_status_seconds",
188                 Help: "Summary of request TTFB.",
189         }, []string{"code", "method"})
190         m.reqDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
191                 Name: "request_duration_seconds",
192                 Help: "Summary of request duration.",
193         }, []string{"code", "method"})
194         m.registry.MustRegister(m.timeToStatus)
195         m.registry.MustRegister(m.reqDuration)
196         m.exportProm = promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
197                 ErrorLog: log,
198         })
199         log.AddHook(m)
200 }
201
202 func (m *metrics) exportJSON(w http.ResponseWriter, req *http.Request) {
203         jm := jsonpb.Marshaler{Indent: "  "}
204         mfs, _ := m.registry.Gather()
205         w.Write([]byte{'['})
206         for i, mf := range mfs {
207                 if i > 0 {
208                         w.Write([]byte{','})
209                 }
210                 jm.Marshal(w, mf)
211         }
212         w.Write([]byte{']'})
213 }
214
215 func (m *metrics) Instrument(next http.Handler) http.Handler {
216         return promhttp.InstrumentHandlerDuration(m.reqDuration, next)
217 }
218
219 // VolumeTypes is built up by init() funcs in the source files that
220 // define the volume types.
221 var VolumeTypes = []func() VolumeWithExamples{}
222
223 type VolumeList []Volume
224
225 // UnmarshalJSON -- given an array of objects -- deserializes each
226 // object as the volume type indicated by the object's Type field.
227 func (vl *VolumeList) UnmarshalJSON(data []byte) error {
228         typeMap := map[string]func() VolumeWithExamples{}
229         for _, factory := range VolumeTypes {
230                 t := factory().Type()
231                 if _, ok := typeMap[t]; ok {
232                         log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
233                 }
234                 typeMap[t] = factory
235         }
236
237         var mapList []map[string]interface{}
238         err := json.Unmarshal(data, &mapList)
239         if err != nil {
240                 return err
241         }
242         for _, mapIn := range mapList {
243                 typeIn, ok := mapIn["Type"].(string)
244                 if !ok {
245                         return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
246                 }
247                 factory, ok := typeMap[typeIn]
248                 if !ok {
249                         return fmt.Errorf("unsupported volume type %+q", typeIn)
250                 }
251                 data, err := json.Marshal(mapIn)
252                 if err != nil {
253                         return err
254                 }
255                 vol := factory()
256                 err = json.Unmarshal(data, vol)
257                 if err != nil {
258                         return err
259                 }
260                 *vl = append(*vl, vol)
261         }
262         return nil
263 }
264
265 // MarshalJSON adds a "Type" field to each volume corresponding to its
266 // Type().
267 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
268         data := []byte{'['}
269         for _, vs := range *vl {
270                 j, err := json.Marshal(vs)
271                 if err != nil {
272                         return nil, err
273                 }
274                 if len(data) > 1 {
275                         data = append(data, byte(','))
276                 }
277                 t, err := json.Marshal(vs.Type())
278                 if err != nil {
279                         panic(err)
280                 }
281                 data = append(data, j[0])
282                 data = append(data, []byte(`"Type":`)...)
283                 data = append(data, t...)
284                 data = append(data, byte(','))
285                 data = append(data, j[1:]...)
286         }
287         return append(data, byte(']')), nil
288 }