16427: Improve -help / usage message.
[arvados.git] / lib / undelete / cmd.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package undelete
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "github.com/sirupsen/logrus"
22 )
23
24 var Command command
25
26 type command struct{}
27
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         logger := ctxlog.New(stderr, "text", "info")
31         defer func() {
32                 if err != nil {
33                         logger.WithError(err).Error("fatal")
34                 }
35                 logger.Info("exiting")
36         }()
37
38         loader := config.NewLoader(stdin, logger)
39         loader.SkipLegacy = true
40
41         flags := flag.NewFlagSet("", flag.ContinueOnError)
42         flags.SetOutput(stderr)
43         flags.Usage = func() {
44                 fmt.Fprintf(flags.Output(), `Usage:
45         %s [options ...] /path/to/manifest.txt [...]
46
47         This program recovers deleted collections. Recovery is
48         possible when the collection's manifest is still available and
49         all of its data blocks are still available or recoverable
50         (e.g., garbage collection is not enabled, the blocks are too
51         new for garbage collection, the blocks are referenced by other
52         collections, or the blocks have been trashed but not yet
53         deleted).
54
55         For each provided collection manifest, once all data blocks
56         are recovered/protected from garbage collection, a new
57         collection is saved and its UUID is printed on stdout.
58
59         Exit status will be zero if recovery is successful, i.e., a
60         collection is saved for each provided manifest.
61 Options:
62 `, prog)
63                 flags.PrintDefaults()
64         }
65         loader.SetupFlags(flags)
66         loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
67         err = flags.Parse(args)
68         if err == flag.ErrHelp {
69                 err = nil
70                 return 0
71         } else if err != nil {
72                 return 2
73         }
74
75         if len(flags.Args()) == 0 {
76                 flags.Usage()
77                 return 2
78         }
79
80         lvl, err := logrus.ParseLevel(*loglevel)
81         if err != nil {
82                 return 2
83         }
84         logger.SetLevel(lvl)
85
86         cfg, err := loader.Load()
87         if err != nil {
88                 return 1
89         }
90         cluster, err := cfg.GetCluster("")
91         if err != nil {
92                 return 1
93         }
94         client, err := arvados.NewClientFromConfig(cluster)
95         if err != nil {
96                 return 1
97         }
98         client.AuthToken = cluster.SystemRootToken
99         und := undeleter{
100                 client:  client,
101                 cluster: cluster,
102                 logger:  logger,
103         }
104
105         exitcode := 0
106         for _, src := range flags.Args() {
107                 logger := logger.WithField("src", src)
108                 if len(src) == 27 && src[5:12] == "-57u5n-" {
109                         logger.Error("log entry lookup not implemented")
110                         exitcode = 1
111                         continue
112                 } else {
113                         mtxt, err := ioutil.ReadFile(src)
114                         if err != nil {
115                                 logger.WithError(err).Error("error loading manifest data")
116                                 exitcode = 1
117                                 continue
118                         }
119                         uuid, err := und.RecoverManifest(string(mtxt))
120                         if err != nil {
121                                 logger.WithError(err).Error("recovery failed")
122                                 exitcode = 1
123                                 continue
124                         }
125                         logger.WithField("UUID", uuid).Info("recovery succeeded")
126                         fmt.Fprintln(stdout, uuid)
127                 }
128         }
129         return exitcode
130 }
131
132 type undeleter struct {
133         client  *arvados.Client
134         cluster *arvados.Cluster
135         logger  logrus.FieldLogger
136 }
137
138 var errNotFound = errors.New("not found")
139
140 // Return the timestamp of the newest copy of blk on svc. Second
141 // return value is false if blk is not on svc at all, or an error
142 // occurs.
143 func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
144         found, err := svc.Index(und.client, blk)
145         if err != nil {
146                 logger.WithError(err).Warn("error getting index")
147                 return time.Time{}, err
148         } else if len(found) == 0 {
149                 return time.Time{}, errNotFound
150         }
151         var latest time.Time
152         for _, ent := range found {
153                 t := time.Unix(0, ent.Mtime)
154                 if t.After(latest) {
155                         latest = t
156                 }
157         }
158         logger.WithField("latest", latest).Debug("found")
159         return latest, nil
160 }
161
162 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
163
164 // Ensure the given block exists on the given server and won't be
165 // eligible for trashing until after our chosen deadline (blobsigexp).
166 // Returns an error if the block doesn't exist on the given server, or
167 // has an old timestamp and can't be updated.  Reports errors via
168 // logger.
169 //
170 // After we decide a block is "safe" (whether or not we had to untrash
171 // it), keep-balance might notice that it's currently unreferenced and
172 // decide to trash it, all before our recovered collection gets
173 // saved. But if the block's timestamp is more recent than blobsigttl,
174 // keepstore will refuse to trash it even if told to by keep-balance.
175 func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
176         if latest, err := und.newestMtime(logger, blk, svc); err != nil {
177                 return err
178         } else if latest.Add(blobsigttl).After(blobsigexp) {
179                 return nil
180         }
181         if err := svc.Touch(ctx, und.client, blk); err != nil {
182                 return fmt.Errorf("error updating timestamp: %s", err)
183         }
184         logger.Debug("updated timestamp")
185         if latest, err := und.newestMtime(logger, blk, svc); err == errNotFound {
186                 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
187         } else if err != nil {
188                 return err
189         } else if latest.Add(blobsigttl).After(blobsigexp) {
190                 return nil
191         } else {
192                 return errTouchIneffective
193         }
194 }
195
196 // Untrash and update GC timestamps (as needed) on blocks referenced
197 // by the given manifest, save a new collection and return the new
198 // collection's UUID.
199 func (und undeleter) RecoverManifest(mtxt string) (string, error) {
200         ctx, cancel := context.WithCancel(context.Background())
201         defer cancel()
202
203         coll := arvados.Collection{ManifestText: mtxt}
204         blks, err := coll.SizedDigests()
205         if err != nil {
206                 return "", err
207         }
208         todo := make(chan int, len(blks))
209         for idx := range blks {
210                 todo <- idx
211         }
212         go close(todo)
213
214         var services []arvados.KeepService
215         err = und.client.EachKeepService(func(svc arvados.KeepService) error {
216                 if svc.ServiceType == "proxy" {
217                         und.logger.WithField("service", svc).Debug("ignore proxy service")
218                 } else {
219                         services = append(services, svc)
220                 }
221                 return nil
222         })
223         if err != nil {
224                 return "", fmt.Errorf("error getting list of keep services: %s", err)
225         }
226         und.logger.WithField("services", services).Debug("got list of services")
227
228         // blobsigexp is our deadline for saving the rescued
229         // collection. This must be less than BlobSigningTTL
230         // (otherwise our rescued blocks could be garbage collected
231         // again before we protect them by saving the collection) but
232         // the exact value is somewhat arbitrary. If it's too soon, it
233         // will arrive before we're ready to save, and save will
234         // fail. If it's too late, we'll needlessly update timestamps
235         // on some blocks that were recently written/touched (e.g., by
236         // a previous attempt to rescue this same collection) and
237         // would have lived long enough anyway if left alone.
238         // BlobSigningTTL/2 (typically around 1 week) is much longer
239         // than than we need to recover even a very large collection.
240         blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
241         blobsigexp := time.Now().Add(blobsigttl / 2)
242         und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
243
244         // We'll start a number of threads, each working on
245         // checking/recovering one block at a time. The threads
246         // themselves don't need much CPU/memory, but to avoid hitting
247         // limits on keepstore connections, backend storage bandwidth,
248         // etc., we limit concurrency to 2 per keepstore node.
249         workerThreads := 2 * len(services)
250
251         blkFound := make([]bool, len(blks))
252         var wg sync.WaitGroup
253         for i := 0; i < workerThreads; i++ {
254                 wg.Add(1)
255                 go func() {
256                         defer wg.Done()
257                 nextblk:
258                         for idx := range todo {
259                                 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
260                                 logger := und.logger.WithField("block", blk)
261                                 for _, untrashing := range []bool{false, true} {
262                                         for _, svc := range services {
263                                                 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
264                                                 if untrashing {
265                                                         if err := svc.Untrash(ctx, und.client, blk); err != nil {
266                                                                 logger.WithError(err).Debug("untrash failed")
267                                                                 continue
268                                                         }
269                                                         logger.Info("untrashed")
270                                                 }
271                                                 err := und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
272                                                 if err == errNotFound {
273                                                         logger.Debug(err)
274                                                 } else if err != nil {
275                                                         logger.Error(err)
276                                                 } else {
277                                                         blkFound[idx] = true
278                                                         continue nextblk
279                                                 }
280                                         }
281                                 }
282                                 logger.Debug("unrecoverable")
283                         }
284                 }()
285         }
286         wg.Wait()
287
288         var have, havenot int
289         for _, ok := range blkFound {
290                 if ok {
291                         have++
292                 } else {
293                         havenot++
294                 }
295         }
296         if havenot > 0 {
297                 if have > 0 {
298                         und.logger.Warn("partial recovery is not implemented")
299                 }
300                 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
301         }
302
303         if und.cluster.Collections.BlobSigning {
304                 key := []byte(und.cluster.Collections.BlobSigningKey)
305                 coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
306         }
307         und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
308         err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
309                 "collection": map[string]interface{}{
310                         "manifest_text": coll.ManifestText,
311                 },
312         })
313         if err != nil {
314                 return "", fmt.Errorf("error saving new collection: %s", err)
315         }
316         und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
317         return coll.UUID, nil
318 }