Merge branch '13365-default-secondaryFiles' refs #13365
[arvados.git] / services / keepstore / config.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io/ioutil"
12         "net/http"
13         "strconv"
14         "strings"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/stats"
19         "github.com/Sirupsen/logrus"
20         "github.com/golang/protobuf/jsonpb"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/prometheus/client_golang/prometheus/promhttp"
23 )
24
25 type Config struct {
26         Debug  bool
27         Listen string
28
29         LogFormat string
30
31         PIDFile string
32
33         MaxBuffers  int
34         MaxRequests int
35
36         BlobSignatureTTL    arvados.Duration
37         BlobSigningKeyFile  string
38         RequireSignatures   bool
39         SystemAuthTokenFile string
40         EnableDelete        bool
41         TrashLifetime       arvados.Duration
42         TrashCheckInterval  arvados.Duration
43         PullWorkers         int
44         TrashWorkers        int
45         EmptyTrashWorkers   int
46
47         Volumes VolumeList
48
49         blobSigningKey  []byte
50         systemAuthToken string
51         debugLogf       func(string, ...interface{})
52
53         ManagementToken string
54
55         metrics
56 }
57
58 var (
59         theConfig = DefaultConfig()
60         formatter = map[string]logrus.Formatter{
61                 "text": &logrus.TextFormatter{
62                         FullTimestamp:   true,
63                         TimestampFormat: rfc3339NanoFixed,
64                 },
65                 "json": &logrus.JSONFormatter{
66                         TimestampFormat: rfc3339NanoFixed,
67                 },
68         }
69         log = logrus.StandardLogger()
70 )
71
72 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
73
74 // DefaultConfig returns the default configuration.
75 func DefaultConfig() *Config {
76         return &Config{
77                 Listen:             ":25107",
78                 LogFormat:          "json",
79                 MaxBuffers:         128,
80                 RequireSignatures:  true,
81                 BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
82                 TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
83                 TrashCheckInterval: arvados.Duration(24 * time.Hour),
84                 Volumes:            []Volume{},
85         }
86 }
87
88 // Start should be called exactly once: after setting all public
89 // fields, and before using the config.
90 func (cfg *Config) Start() error {
91         if cfg.Debug {
92                 log.Level = logrus.DebugLevel
93                 cfg.debugLogf = log.Printf
94                 cfg.debugLogf("debugging enabled")
95         } else {
96                 log.Level = logrus.InfoLevel
97                 cfg.debugLogf = func(string, ...interface{}) {}
98         }
99
100         f := formatter[strings.ToLower(cfg.LogFormat)]
101         if f == nil {
102                 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
103         }
104         log.Formatter = f
105
106         if cfg.MaxBuffers < 0 {
107                 return fmt.Errorf("MaxBuffers must be greater than zero")
108         }
109         bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
110
111         if cfg.MaxRequests < 1 {
112                 cfg.MaxRequests = cfg.MaxBuffers * 2
113                 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
114         }
115
116         if cfg.BlobSigningKeyFile != "" {
117                 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
118                 if err != nil {
119                         return fmt.Errorf("reading blob signing key file: %s", err)
120                 }
121                 cfg.blobSigningKey = bytes.TrimSpace(buf)
122                 if len(cfg.blobSigningKey) == 0 {
123                         return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
124                 }
125         } else if cfg.RequireSignatures {
126                 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
127         } else {
128                 log.Println("Running without a blob signing key. Block locators " +
129                         "returned by this server will not be signed, and will be rejected " +
130                         "by a server that enforces permissions.")
131                 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
132         }
133
134         if fn := cfg.SystemAuthTokenFile; fn != "" {
135                 buf, err := ioutil.ReadFile(fn)
136                 if err != nil {
137                         return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
138                 }
139                 cfg.systemAuthToken = strings.TrimSpace(string(buf))
140         }
141
142         if cfg.EnableDelete {
143                 log.Print("Trash/delete features are enabled. WARNING: this has not " +
144                         "been extensively tested. You should disable this unless you can afford to lose data.")
145         }
146
147         if len(cfg.Volumes) == 0 {
148                 if (&unixVolumeAdder{cfg}).Discover() == 0 {
149                         return fmt.Errorf("no volumes found")
150                 }
151         }
152         for _, v := range cfg.Volumes {
153                 if err := v.Start(); err != nil {
154                         return fmt.Errorf("volume %s: %s", v, err)
155                 }
156                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
157         }
158         return nil
159 }
160
161 type metrics struct {
162         registry     *prometheus.Registry
163         reqDuration  *prometheus.SummaryVec
164         timeToStatus *prometheus.SummaryVec
165         exportProm   http.Handler
166 }
167
168 func (*metrics) Levels() []logrus.Level {
169         return logrus.AllLevels
170 }
171
172 func (m *metrics) Fire(ent *logrus.Entry) error {
173         if tts, ok := ent.Data["timeToStatus"].(stats.Duration); !ok {
174         } else if method, ok := ent.Data["reqMethod"].(string); !ok {
175         } else if code, ok := ent.Data["respStatusCode"].(int); !ok {
176         } else {
177                 m.timeToStatus.WithLabelValues(strconv.Itoa(code), strings.ToLower(method)).Observe(time.Duration(tts).Seconds())
178         }
179         return nil
180 }
181
182 func (m *metrics) setup() {
183         m.registry = prometheus.NewRegistry()
184         m.timeToStatus = prometheus.NewSummaryVec(prometheus.SummaryOpts{
185                 Name: "time_to_status_seconds",
186                 Help: "Summary of request TTFB.",
187         }, []string{"code", "method"})
188         m.reqDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
189                 Name: "request_duration_seconds",
190                 Help: "Summary of request duration.",
191         }, []string{"code", "method"})
192         m.registry.MustRegister(m.timeToStatus)
193         m.registry.MustRegister(m.reqDuration)
194         m.exportProm = promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
195                 ErrorLog: log,
196         })
197         log.AddHook(m)
198 }
199
200 func (m *metrics) exportJSON(w http.ResponseWriter, req *http.Request) {
201         jm := jsonpb.Marshaler{Indent: "  "}
202         mfs, _ := m.registry.Gather()
203         w.Write([]byte{'['})
204         for i, mf := range mfs {
205                 if i > 0 {
206                         w.Write([]byte{','})
207                 }
208                 jm.Marshal(w, mf)
209         }
210         w.Write([]byte{']'})
211 }
212
213 func (m *metrics) Instrument(next http.Handler) http.Handler {
214         return promhttp.InstrumentHandlerDuration(m.reqDuration, next)
215 }
216
217 // VolumeTypes is built up by init() funcs in the source files that
218 // define the volume types.
219 var VolumeTypes = []func() VolumeWithExamples{}
220
221 type VolumeList []Volume
222
223 // UnmarshalJSON -- given an array of objects -- deserializes each
224 // object as the volume type indicated by the object's Type field.
225 func (vl *VolumeList) UnmarshalJSON(data []byte) error {
226         typeMap := map[string]func() VolumeWithExamples{}
227         for _, factory := range VolumeTypes {
228                 t := factory().Type()
229                 if _, ok := typeMap[t]; ok {
230                         log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
231                 }
232                 typeMap[t] = factory
233         }
234
235         var mapList []map[string]interface{}
236         err := json.Unmarshal(data, &mapList)
237         if err != nil {
238                 return err
239         }
240         for _, mapIn := range mapList {
241                 typeIn, ok := mapIn["Type"].(string)
242                 if !ok {
243                         return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
244                 }
245                 factory, ok := typeMap[typeIn]
246                 if !ok {
247                         return fmt.Errorf("unsupported volume type %+q", typeIn)
248                 }
249                 data, err := json.Marshal(mapIn)
250                 if err != nil {
251                         return err
252                 }
253                 vol := factory()
254                 err = json.Unmarshal(data, vol)
255                 if err != nil {
256                         return err
257                 }
258                 *vl = append(*vl, vol)
259         }
260         return nil
261 }
262
263 // MarshalJSON adds a "Type" field to each volume corresponding to its
264 // Type().
265 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
266         data := []byte{'['}
267         for _, vs := range *vl {
268                 j, err := json.Marshal(vs)
269                 if err != nil {
270                         return nil, err
271                 }
272                 if len(data) > 1 {
273                         data = append(data, byte(','))
274                 }
275                 t, err := json.Marshal(vs.Type())
276                 if err != nil {
277                         panic(err)
278                 }
279                 data = append(data, j[0])
280                 data = append(data, []byte(`"Type":`)...)
281                 data = append(data, t...)
282                 data = append(data, byte(','))
283                 data = append(data, j[1:]...)
284         }
285         return append(data, byte(']')), nil
286 }