Merge branch '14199-copy-from-remote'
[arvados.git] / services / keepstore / config.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io/ioutil"
12         "strings"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/Sirupsen/logrus"
17 )
18
19 type Config struct {
20         Debug  bool
21         Listen string
22
23         LogFormat string
24
25         PIDFile string
26
27         MaxBuffers  int
28         MaxRequests int
29
30         BlobSignatureTTL    arvados.Duration
31         BlobSigningKeyFile  string
32         RequireSignatures   bool
33         SystemAuthTokenFile string
34         EnableDelete        bool
35         TrashLifetime       arvados.Duration
36         TrashCheckInterval  arvados.Duration
37         PullWorkers         int
38         TrashWorkers        int
39         EmptyTrashWorkers   int
40         TLSCertificateFile  string
41         TLSKeyFile          string
42
43         Volumes VolumeList
44
45         blobSigningKey  []byte
46         systemAuthToken string
47         debugLogf       func(string, ...interface{})
48
49         ManagementToken string `doc: The secret key that must be provided by monitoring services
50 wishing to access the health check endpoint (/_health).`
51 }
52
53 var (
54         theConfig = DefaultConfig()
55         formatter = map[string]logrus.Formatter{
56                 "text": &logrus.TextFormatter{
57                         FullTimestamp:   true,
58                         TimestampFormat: rfc3339NanoFixed,
59                 },
60                 "json": &logrus.JSONFormatter{
61                         TimestampFormat: rfc3339NanoFixed,
62                 },
63         }
64         log = logrus.StandardLogger()
65 )
66
67 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
68
69 // DefaultConfig returns the default configuration.
70 func DefaultConfig() *Config {
71         return &Config{
72                 Listen:             ":25107",
73                 LogFormat:          "json",
74                 MaxBuffers:         128,
75                 RequireSignatures:  true,
76                 BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
77                 TrashLifetime:      arvados.Duration(14 * 24 * time.Hour),
78                 TrashCheckInterval: arvados.Duration(24 * time.Hour),
79                 Volumes:            []Volume{},
80         }
81 }
82
83 // Start should be called exactly once: after setting all public
84 // fields, and before using the config.
85 func (cfg *Config) Start() error {
86         if cfg.Debug {
87                 log.Level = logrus.DebugLevel
88                 cfg.debugLogf = log.Printf
89                 cfg.debugLogf("debugging enabled")
90         } else {
91                 log.Level = logrus.InfoLevel
92                 cfg.debugLogf = func(string, ...interface{}) {}
93         }
94
95         f := formatter[strings.ToLower(cfg.LogFormat)]
96         if f == nil {
97                 return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
98         }
99         log.Formatter = f
100
101         if cfg.MaxBuffers < 0 {
102                 return fmt.Errorf("MaxBuffers must be greater than zero")
103         }
104         bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
105
106         if cfg.MaxRequests < 1 {
107                 cfg.MaxRequests = cfg.MaxBuffers * 2
108                 log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
109         }
110
111         if cfg.BlobSigningKeyFile != "" {
112                 buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
113                 if err != nil {
114                         return fmt.Errorf("reading blob signing key file: %s", err)
115                 }
116                 cfg.blobSigningKey = bytes.TrimSpace(buf)
117                 if len(cfg.blobSigningKey) == 0 {
118                         return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
119                 }
120         } else if cfg.RequireSignatures {
121                 return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
122         } else {
123                 log.Println("Running without a blob signing key. Block locators " +
124                         "returned by this server will not be signed, and will be rejected " +
125                         "by a server that enforces permissions.")
126                 log.Println("To fix this, use the BlobSigningKeyFile config entry.")
127         }
128
129         if fn := cfg.SystemAuthTokenFile; fn != "" {
130                 buf, err := ioutil.ReadFile(fn)
131                 if err != nil {
132                         return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
133                 }
134                 cfg.systemAuthToken = strings.TrimSpace(string(buf))
135         }
136
137         if cfg.EnableDelete {
138                 log.Print("Trash/delete features are enabled. WARNING: this has not " +
139                         "been extensively tested. You should disable this unless you can afford to lose data.")
140         }
141
142         if len(cfg.Volumes) == 0 {
143                 if (&unixVolumeAdder{cfg}).Discover() == 0 {
144                         return fmt.Errorf("no volumes found")
145                 }
146         }
147         for _, v := range cfg.Volumes {
148                 if err := v.Start(); err != nil {
149                         return fmt.Errorf("volume %s: %s", v, err)
150                 }
151                 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
152         }
153         return nil
154 }
155
156 // VolumeTypes is built up by init() funcs in the source files that
157 // define the volume types.
158 var VolumeTypes = []func() VolumeWithExamples{}
159
160 type VolumeList []Volume
161
162 // UnmarshalJSON -- given an array of objects -- deserializes each
163 // object as the volume type indicated by the object's Type field.
164 func (vl *VolumeList) UnmarshalJSON(data []byte) error {
165         typeMap := map[string]func() VolumeWithExamples{}
166         for _, factory := range VolumeTypes {
167                 t := factory().Type()
168                 if _, ok := typeMap[t]; ok {
169                         log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
170                 }
171                 typeMap[t] = factory
172         }
173
174         var mapList []map[string]interface{}
175         err := json.Unmarshal(data, &mapList)
176         if err != nil {
177                 return err
178         }
179         for _, mapIn := range mapList {
180                 typeIn, ok := mapIn["Type"].(string)
181                 if !ok {
182                         return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
183                 }
184                 factory, ok := typeMap[typeIn]
185                 if !ok {
186                         return fmt.Errorf("unsupported volume type %+q", typeIn)
187                 }
188                 data, err := json.Marshal(mapIn)
189                 if err != nil {
190                         return err
191                 }
192                 vol := factory()
193                 err = json.Unmarshal(data, vol)
194                 if err != nil {
195                         return err
196                 }
197                 *vl = append(*vl, vol)
198         }
199         return nil
200 }
201
202 // MarshalJSON adds a "Type" field to each volume corresponding to its
203 // Type().
204 func (vl *VolumeList) MarshalJSON() ([]byte, error) {
205         data := []byte{'['}
206         for _, vs := range *vl {
207                 j, err := json.Marshal(vs)
208                 if err != nil {
209                         return nil, err
210                 }
211                 if len(data) > 1 {
212                         data = append(data, byte(','))
213                 }
214                 t, err := json.Marshal(vs.Type())
215                 if err != nil {
216                         panic(err)
217                 }
218                 data = append(data, j[0])
219                 data = append(data, []byte(`"Type":`)...)
220                 data = append(data, t...)
221                 data = append(data, byte(','))
222                 data = append(data, j[1:]...)
223         }
224         return append(data, byte(']')), nil
225 }