16425: Don't scan dirs that we don't write in.
[arvados.git] / lib / undelete / cmd.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package undelete
6
7 import (
8         "context"
9         "errors"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "github.com/sirupsen/logrus"
22 )
23
24 var Command command
25
26 type command struct{}
27
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         logger := ctxlog.New(stderr, "text", "info")
31         defer func() {
32                 if err != nil {
33                         logger.WithError(err).Error("fatal")
34                 }
35                 logger.Info("exiting")
36         }()
37
38         loader := config.NewLoader(stdin, logger)
39         loader.SkipLegacy = true
40
41         flags := flag.NewFlagSet("", flag.ContinueOnError)
42         flags.SetOutput(stderr)
43         flags.Usage = func() {
44                 fmt.Fprintf(flags.Output(), `Usage:
45         %s [options ...] /path/to/manifest.txt [...]
46
47         This program recovers deleted collections. Recovery is
48         possible when the collection's manifest is still available and
49         all of its data blocks are still available or recoverable
50         (e.g., garbage collection is not enabled, the blocks are too
51         new for garbage collection, the blocks are referenced by other
52         collections, or the blocks have been trashed but not yet
53         deleted).
54
55         For each provided collection manifest, once all data blocks
56         are recovered/protected from garbage collection, a new
57         collection is saved and its UUID is printed on stdout.
58
59         Exit status will be zero if recovery is successful, i.e., a
60         collection is saved for each provided manifest.
61 Options:
62 `, prog)
63                 flags.PrintDefaults()
64         }
65         loader.SetupFlags(flags)
66         loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
67         err = flags.Parse(args)
68         if err == flag.ErrHelp {
69                 err = nil
70                 return 0
71         } else if err != nil {
72                 return 2
73         }
74
75         if len(flags.Args()) == 0 {
76                 flags.Usage()
77                 return 2
78         }
79
80         lvl, err := logrus.ParseLevel(*loglevel)
81         if err != nil {
82                 return 2
83         }
84         logger.SetLevel(lvl)
85
86         cfg, err := loader.Load()
87         if err != nil {
88                 return 1
89         }
90         cluster, err := cfg.GetCluster("")
91         if err != nil {
92                 return 1
93         }
94         client, err := arvados.NewClientFromConfig(cluster)
95         if err != nil {
96                 return 1
97         }
98         client.AuthToken = cluster.SystemRootToken
99         und := undeleter{
100                 client:  client,
101                 cluster: cluster,
102                 logger:  logger,
103         }
104
105         exitcode := 0
106         for _, src := range flags.Args() {
107                 logger := logger.WithField("src", src)
108                 if len(src) == 27 && src[5:12] == "-57u5n-" {
109                         logger.Error("log entry lookup not implemented")
110                         exitcode = 1
111                         continue
112                 } else {
113                         mtxt, err := ioutil.ReadFile(src)
114                         if err != nil {
115                                 logger.WithError(err).Error("error loading manifest data")
116                                 exitcode = 1
117                                 continue
118                         }
119                         uuid, err := und.RecoverManifest(string(mtxt))
120                         if err != nil {
121                                 logger.WithError(err).Error("recovery failed")
122                                 exitcode = 1
123                                 continue
124                         }
125                         logger.WithField("UUID", uuid).Info("recovery succeeded")
126                         fmt.Fprintln(stdout, uuid)
127                 }
128         }
129         return exitcode
130 }
131
132 type undeleter struct {
133         client  *arvados.Client
134         cluster *arvados.Cluster
135         logger  logrus.FieldLogger
136 }
137
138 var errNotFound = errors.New("not found")
139
140 // Finds the timestamp of the newest copy of blk on svc. Returns
141 // errNotFound if blk is not on svc at all.
142 func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
143         found, err := svc.Index(und.client, blk)
144         if err != nil {
145                 logger.WithError(err).Warn("error getting index")
146                 return time.Time{}, err
147         } else if len(found) == 0 {
148                 return time.Time{}, errNotFound
149         }
150         var latest time.Time
151         for _, ent := range found {
152                 t := time.Unix(0, ent.Mtime)
153                 if t.After(latest) {
154                         latest = t
155                 }
156         }
157         logger.WithField("latest", latest).Debug("found")
158         return latest, nil
159 }
160
161 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
162
163 // Ensures the given block exists on the given server and won't be
164 // eligible for trashing until after our chosen deadline (blobsigexp).
165 // Returns an error if the block doesn't exist on the given server, or
166 // has an old timestamp and can't be updated.
167 //
168 // After we decide a block is "safe" (whether or not we had to untrash
169 // it), keep-balance might notice that it's currently unreferenced and
170 // decide to trash it, all before our recovered collection gets
171 // saved. But if the block's timestamp is more recent than blobsigttl,
172 // keepstore will refuse to trash it even if told to by keep-balance.
173 func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
174         if latest, err := und.newestMtime(logger, blk, svc); err != nil {
175                 return err
176         } else if latest.Add(blobsigttl).After(blobsigexp) {
177                 return nil
178         }
179         if err := svc.Touch(ctx, und.client, blk); err != nil {
180                 return fmt.Errorf("error updating timestamp: %s", err)
181         }
182         logger.Debug("updated timestamp")
183         if latest, err := und.newestMtime(logger, blk, svc); err == errNotFound {
184                 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
185         } else if err != nil {
186                 return err
187         } else if latest.Add(blobsigttl).After(blobsigexp) {
188                 return nil
189         } else {
190                 return errTouchIneffective
191         }
192 }
193
194 // Untrash and update GC timestamps (as needed) on blocks referenced
195 // by the given manifest, save a new collection and return the new
196 // collection's UUID.
197 func (und undeleter) RecoverManifest(mtxt string) (string, error) {
198         ctx, cancel := context.WithCancel(context.Background())
199         defer cancel()
200
201         coll := arvados.Collection{ManifestText: mtxt}
202         blks, err := coll.SizedDigests()
203         if err != nil {
204                 return "", err
205         }
206         todo := make(chan int, len(blks))
207         for idx := range blks {
208                 todo <- idx
209         }
210         go close(todo)
211
212         var services []arvados.KeepService
213         err = und.client.EachKeepService(func(svc arvados.KeepService) error {
214                 if svc.ServiceType == "proxy" {
215                         und.logger.WithField("service", svc).Debug("ignore proxy service")
216                 } else {
217                         services = append(services, svc)
218                 }
219                 return nil
220         })
221         if err != nil {
222                 return "", fmt.Errorf("error getting list of keep services: %s", err)
223         }
224         und.logger.WithField("services", services).Debug("got list of services")
225
226         // blobsigexp is our deadline for saving the rescued
227         // collection. This must be less than BlobSigningTTL
228         // (otherwise our rescued blocks could be garbage collected
229         // again before we protect them by saving the collection) but
230         // the exact value is somewhat arbitrary. If it's too soon, it
231         // will arrive before we're ready to save, and save will
232         // fail. If it's too late, we'll needlessly update timestamps
233         // on some blocks that were recently written/touched (e.g., by
234         // a previous attempt to rescue this same collection) and
235         // would have lived long enough anyway if left alone.
236         // BlobSigningTTL/2 (typically around 1 week) is much longer
237         // than than we need to recover even a very large collection.
238         blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
239         blobsigexp := time.Now().Add(blobsigttl / 2)
240         und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
241
242         // We'll start a number of threads, each working on
243         // checking/recovering one block at a time. The threads
244         // themselves don't need much CPU/memory, but to avoid hitting
245         // limits on keepstore connections, backend storage bandwidth,
246         // etc., we limit concurrency to 2 per keepstore node.
247         workerThreads := 2 * len(services)
248
249         blkFound := make([]bool, len(blks))
250         var wg sync.WaitGroup
251         for i := 0; i < workerThreads; i++ {
252                 wg.Add(1)
253                 go func() {
254                         defer wg.Done()
255                 nextblk:
256                         for idx := range todo {
257                                 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
258                                 logger := und.logger.WithField("block", blk)
259                                 for _, untrashing := range []bool{false, true} {
260                                         for _, svc := range services {
261                                                 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
262                                                 if untrashing {
263                                                         if err := svc.Untrash(ctx, und.client, blk); err != nil {
264                                                                 logger.WithError(err).Debug("untrash failed")
265                                                                 continue
266                                                         }
267                                                         logger.Info("untrashed")
268                                                 }
269                                                 err := und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
270                                                 if err == errNotFound {
271                                                         logger.Debug(err)
272                                                 } else if err != nil {
273                                                         logger.Error(err)
274                                                 } else {
275                                                         blkFound[idx] = true
276                                                         continue nextblk
277                                                 }
278                                         }
279                                 }
280                                 logger.Debug("unrecoverable")
281                         }
282                 }()
283         }
284         wg.Wait()
285
286         var have, havenot int
287         for _, ok := range blkFound {
288                 if ok {
289                         have++
290                 } else {
291                         havenot++
292                 }
293         }
294         if havenot > 0 {
295                 if have > 0 {
296                         und.logger.Warn("partial recovery is not implemented")
297                 }
298                 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
299         }
300
301         if und.cluster.Collections.BlobSigning {
302                 key := []byte(und.cluster.Collections.BlobSigningKey)
303                 coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
304         }
305         und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
306         err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
307                 "collection": map[string]interface{}{
308                         "manifest_text": coll.ManifestText,
309                 },
310         })
311         if err != nil {
312                 return "", fmt.Errorf("error saving new collection: %s", err)
313         }
314         und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
315         return coll.UUID, nil
316 }