Merge remote-tracking branch 'origin/master' into 14484-collection-record-update
[arvados.git] / services / keepstore / config.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io/ioutil"
12         "strings"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 type Config struct {
21         Debug  bool
22         Listen string
23
24         LogFormat string
25
26         PIDFile string
27
28         MaxBuffers  int
29         MaxRequests int
30
31         BlobSignatureTTL    arvados.Duration
32         BlobSigningKeyFile  string
33         RequireSignatures   bool
34         SystemAuthTokenFile string
35         EnableDelete        bool
36         TrashLifetime       arvados.Duration
37         TrashCheckInterval  arvados.Duration
38         PullWorkers         int
39         TrashWorkers        int
40         EmptyTrashWorkers   int
41         TLSCertificateFile  string
42         TLSKeyFile          string
43
44         Volumes VolumeList
45
46         blobSigningKey  []byte
47         systemAuthToken string
48         debugLogf       func(string, ...interface{})
49
50         ManagementToken string
51 }
52
53 var (
54         theConfig = DefaultConfig()
55         formatter = map[string]logrus.Formatter{
56                 "text": &logrus.TextFormatter{
57                         FullTimestamp:   true,
58                         TimestampFormat: rfc3339NanoFixed,
59                 },
60                 "json": &logrus.JSONFormatter{
61                         TimestampFormat: rfc3339NanoFixed,
62                 },
63         }
64         log = logrus.StandardLogger()
65 )
66
67 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
68
69 // DefaultConfig returns the default configuration.
70 func DefaultConfig() *Config {
71         return &Config{
72                 Listen:             ":25107",
73                 LogFormat:          "json",
74                 MaxBuffers:         128,
75                 RequireSignatures:  true,
76                 BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
77                 TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
78                 TrashCheckInterval: arvados.Duration(24 * time.Hour),
79                 Volumes:            []Volume{},
80         }
81 }
82
83 // Start should be called exactly once: after setting all public
84 // fields, and before using the config.
85 func (cfg *Config) Start(reg *prometheus.Registry) error {
86         if cfg.Debug {
87                 log.Level = logrus.DebugLevel
88                 cfg.debugLogf = log.Printf
89                 cfg.debugLogf("debugging enabled")
90         } else {
91                 log.Level = logrus.InfoLevel
92                 cfg.debugLogf = func(string, ...interface{}) {}
93         }
94
95         f := formatter[strings.ToLower(cfg.LogFormat)]
96         if f == nil {
97                 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
98         }
99         log.Formatter = f
100
101         if cfg.MaxBuffers < 0 {
102                 return fmt.Errorf("MaxBuffers must be greater than zero")
103         }
104         bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
105
106         if cfg.MaxRequests < 1 {
107                 cfg.MaxRequests = cfg.MaxBuffers * 2
108                 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
109         }
110
111         if cfg.BlobSigningKeyFile != "" {
112                 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
113                 if err != nil {
114                         return fmt.Errorf("reading blob signing key file: %s", err)
115                 }
116                 cfg.blobSigningKey = bytes.TrimSpace(buf)
117                 if len(cfg.blobSigningKey) == 0 {
118                         return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
119                 }
120         } else if cfg.RequireSignatures {
121                 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
122         } else {
123                 log.Println("Running without a blob signing key. Block locators " +
124                         "returned by this server will not be signed, and will be rejected " +
125                         "by a server that enforces permissions.")
126                 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
127         }
128
129         if fn := cfg.SystemAuthTokenFile; fn != "" {
130                 buf, err := ioutil.ReadFile(fn)
131                 if err != nil {
132                         return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
133                 }
134                 cfg.systemAuthToken = strings.TrimSpace(string(buf))
135         }
136
137         if cfg.EnableDelete {
138                 log.Print("Trash/delete features are enabled. WARNING: this has not " +
139                         "been extensively tested. You should disable this unless you can afford to lose data.")
140         }
141
142         if len(cfg.Volumes) == 0 {
143                 if (&unixVolumeAdder{cfg}).Discover() == 0 {
144                         return fmt.Errorf("no volumes found")
145                 }
146         }
147         vm := newVolumeMetricsVecs(reg)
148         for _, v := range cfg.Volumes {
149                 if err := v.Start(vm); err != nil {
150                         return fmt.Errorf("volume %s: %s", v, err)
151                 }
152                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
153         }
154         return nil
155 }
156
157 // VolumeTypes is built up by init() funcs in the source files that
158 // define the volume types.
159 var VolumeTypes = []func() VolumeWithExamples{}
160
161 type VolumeList []Volume
162
163 // UnmarshalJSON -- given an array of objects -- deserializes each
164 // object as the volume type indicated by the object's Type field.
165 func (vl *VolumeList) UnmarshalJSON(data []byte) error {
166         typeMap := map[string]func() VolumeWithExamples{}
167         for _, factory := range VolumeTypes {
168                 t := factory().Type()
169                 if _, ok := typeMap[t]; ok {
170                         log.Fatalf("volume type %+q is claimed by multiple VolumeTypes", t)
171                 }
172                 typeMap[t] = factory
173         }
174
175         var mapList []map[string]interface{}
176         err := json.Unmarshal(data, &mapList)
177         if err != nil {
178                 return err
179         }
180         for _, mapIn := range mapList {
181                 typeIn, ok := mapIn["Type"].(string)
182                 if !ok {
183                         return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
184                 }
185                 factory, ok := typeMap[typeIn]
186                 if !ok {
187                         return fmt.Errorf("unsupported volume type %+q", typeIn)
188                 }
189                 data, err := json.Marshal(mapIn)
190                 if err != nil {
191                         return err
192                 }
193                 vol := factory()
194                 err = json.Unmarshal(data, vol)
195                 if err != nil {
196                         return err
197                 }
198                 *vl = append(*vl, vol)
199         }
200         return nil
201 }
202
203 // MarshalJSON adds a "Type" field to each volume corresponding to its
204 // Type().
205 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
206         data := []byte{'['}
207         for _, vs := range *vl {
208                 j, err := json.Marshal(vs)
209                 if err != nil {
210                         return nil, err
211                 }
212                 if len(data) > 1 {
213                         data = append(data, byte(','))
214                 }
215                 t, err := json.Marshal(vs.Type())
216                 if err != nil {
217                         panic(err)
218                 }
219                 data = append(data, j[0])
220                 data = append(data, []byte(`"Type":`)...)
221                 data = append(data, t...)
222                 data = append(data, byte(','))
223                 data = append(data, j[1:]...)
224         }
225         return append(data, byte(']')), nil
226 }