1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/lib/config"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/ctxlog"
21 "github.com/sirupsen/logrus"
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
30 logger := ctxlog.New(stderr, "text", "info")
33 logger.WithError(err).Error("fatal")
35 logger.Info("exiting")
38 loader := config.NewLoader(stdin, logger)
39 loader.SkipLegacy = true
41 flags := flag.NewFlagSet("", flag.ContinueOnError)
42 flags.SetOutput(stderr)
43 flags.Usage = func() {
44 fmt.Fprintf(flags.Output(), `Usage:
45 %s [options ...] /path/to/manifest.txt [...]
47 This program recovers deleted collections. Recovery is
48 possible when the collection's manifest is still available and
49 all of its data blocks are still available or recoverable
50 (e.g., garbage collection is not enabled, the blocks are too
51 new for garbage collection, the blocks are referenced by other
52 collections, or the blocks have been trashed but not yet
55 For each provided collection manifest, once all data blocks
56 are recovered/protected from garbage collection, a new
57 collection is saved and its UUID is printed on stdout.
59 Exit status will be zero if recovery is successful, i.e., a
60 collection is saved for each provided manifest.
65 loader.SetupFlags(flags)
66 loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
67 err = flags.Parse(args)
68 if err == flag.ErrHelp {
71 } else if err != nil {
75 if len(flags.Args()) == 0 {
80 lvl, err := logrus.ParseLevel(*loglevel)
86 cfg, err := loader.Load()
90 cluster, err := cfg.GetCluster("")
94 client, err := arvados.NewClientFromConfig(cluster)
98 client.AuthToken = cluster.SystemRootToken
106 for _, src := range flags.Args() {
107 logger := logger.WithField("src", src)
108 if len(src) == 27 && src[5:12] == "-57u5n-" {
109 logger.Error("log entry lookup not implemented")
113 mtxt, err := ioutil.ReadFile(src)
115 logger.WithError(err).Error("error loading manifest data")
119 uuid, err := und.RecoverManifest(string(mtxt))
121 logger.WithError(err).Error("recovery failed")
125 logger.WithField("UUID", uuid).Info("recovery succeeded")
126 fmt.Fprintln(stdout, uuid)
132 type undeleter struct {
133 client *arvados.Client
134 cluster *arvados.Cluster
135 logger logrus.FieldLogger
138 var errNotFound = errors.New("not found")
140 // Return the timestamp of the newest copy of blk on svc. Second
141 // return value is false if blk is not on svc at all, or an error
143 func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
144 found, err := svc.Index(und.client, blk)
146 logger.WithError(err).Warn("error getting index")
147 return time.Time{}, err
148 } else if len(found) == 0 {
149 return time.Time{}, errNotFound
152 for _, ent := range found {
153 t := time.Unix(0, ent.Mtime)
158 logger.WithField("latest", latest).Debug("found")
162 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
164 // Ensure the given block exists on the given server and won't be
165 // eligible for trashing until after our chosen deadline (blobsigexp).
166 // Returns an error if the block doesn't exist on the given server, or
167 // has an old timestamp and can't be updated. Reports errors via
170 // After we decide a block is "safe" (whether or not we had to untrash
171 // it), keep-balance might notice that it's currently unreferenced and
172 // decide to trash it, all before our recovered collection gets
173 // saved. But if the block's timestamp is more recent than blobsigttl,
174 // keepstore will refuse to trash it even if told to by keep-balance.
175 func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
176 if latest, err := und.newestMtime(logger, blk, svc); err != nil {
178 } else if latest.Add(blobsigttl).After(blobsigexp) {
181 if err := svc.Touch(ctx, und.client, blk); err != nil {
182 return fmt.Errorf("error updating timestamp: %s", err)
184 logger.Debug("updated timestamp")
185 if latest, err := und.newestMtime(logger, blk, svc); err == errNotFound {
186 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
187 } else if err != nil {
189 } else if latest.Add(blobsigttl).After(blobsigexp) {
192 return errTouchIneffective
196 // Untrash and update GC timestamps (as needed) on blocks referenced
197 // by the given manifest, save a new collection and return the new
198 // collection's UUID.
199 func (und undeleter) RecoverManifest(mtxt string) (string, error) {
200 ctx, cancel := context.WithCancel(context.Background())
203 coll := arvados.Collection{ManifestText: mtxt}
204 blks, err := coll.SizedDigests()
208 todo := make(chan int, len(blks))
209 for idx := range blks {
214 var services []arvados.KeepService
215 err = und.client.EachKeepService(func(svc arvados.KeepService) error {
216 if svc.ServiceType == "proxy" {
217 und.logger.WithField("service", svc).Debug("ignore proxy service")
219 services = append(services, svc)
224 return "", fmt.Errorf("error getting list of keep services: %s", err)
226 und.logger.WithField("services", services).Debug("got list of services")
228 // blobsigexp is our deadline for saving the rescued
229 // collection. This must be less than BlobSigningTTL
230 // (otherwise our rescued blocks could be garbage collected
231 // again before we protect them by saving the collection) but
232 // the exact value is somewhat arbitrary. If it's too soon, it
233 // will arrive before we're ready to save, and save will
234 // fail. If it's too late, we'll needlessly update timestamps
235 // on some blocks that were recently written/touched (e.g., by
236 // a previous attempt to rescue this same collection) and
237 // would have lived long enough anyway if left alone.
238 // BlobSigningTTL/2 (typically around 1 week) is much longer
239 // than than we need to recover even a very large collection.
240 blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
241 blobsigexp := time.Now().Add(blobsigttl / 2)
242 und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
244 // We'll start a number of threads, each working on
245 // checking/recovering one block at a time. The threads
246 // themselves don't need much CPU/memory, but to avoid hitting
247 // limits on keepstore connections, backend storage bandwidth,
248 // etc., we limit concurrency to 2 per keepstore node.
249 workerThreads := 2 * len(services)
251 blkFound := make([]bool, len(blks))
252 var wg sync.WaitGroup
253 for i := 0; i < workerThreads; i++ {
258 for idx := range todo {
259 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
260 logger := und.logger.WithField("block", blk)
261 for _, untrashing := range []bool{false, true} {
262 for _, svc := range services {
263 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
265 if err := svc.Untrash(ctx, und.client, blk); err != nil {
266 logger.WithError(err).Debug("untrash failed")
269 logger.Info("untrashed")
271 err := und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
272 if err == errNotFound {
274 } else if err != nil {
282 logger.Debug("unrecoverable")
288 var have, havenot int
289 for _, ok := range blkFound {
298 und.logger.Warn("partial recovery is not implemented")
300 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
303 if und.cluster.Collections.BlobSigning {
304 key := []byte(und.cluster.Collections.BlobSigningKey)
305 coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
307 und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
308 err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
309 "collection": map[string]interface{}{
310 "manifest_text": coll.ManifestText,
314 return "", fmt.Errorf("error saving new collection: %s", err)
316 und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
317 return coll.UUID, nil