1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/lib/config"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/ctxlog"
21 "github.com/sirupsen/logrus"
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
30 logger := ctxlog.New(stderr, "text", "info")
33 logger.WithError(err).Error("fatal")
35 logger.Info("exiting")
38 loader := config.NewLoader(stdin, logger)
39 loader.SkipLegacy = true
41 flags := flag.NewFlagSet("", flag.ContinueOnError)
42 flags.SetOutput(stderr)
43 flags.Usage = func() {
44 fmt.Fprintf(flags.Output(), `Usage:
45 %s [options ...] /path/to/manifest.txt [...]
47 This program recovers deleted collections. Recovery is
48 possible when the collection's manifest is still available and
49 all of its data blocks are still available or recoverable
50 (e.g., garbage collection is not enabled, the blocks are too
51 new for garbage collection, the blocks are referenced by other
52 collections, or the blocks have been trashed but not yet
55 For each provided collection manifest, once all data blocks
56 are recovered/protected from garbage collection, a new
57 collection is saved and its UUID is printed on stdout.
59 Exit status will be zero if recovery is successful, i.e., a
60 collection is saved for each provided manifest.
65 loader.SetupFlags(flags)
66 loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
67 err = flags.Parse(args)
68 if err == flag.ErrHelp {
71 } else if err != nil {
75 if len(flags.Args()) == 0 {
80 lvl, err := logrus.ParseLevel(*loglevel)
86 cfg, err := loader.Load()
90 cluster, err := cfg.GetCluster("")
94 client, err := arvados.NewClientFromConfig(cluster)
98 client.AuthToken = cluster.SystemRootToken
106 for _, src := range flags.Args() {
107 logger := logger.WithField("src", src)
108 if len(src) == 27 && src[5:12] == "-57u5n-" {
109 logger.Error("log entry lookup not implemented")
113 mtxt, err := ioutil.ReadFile(src)
115 logger.WithError(err).Error("error loading manifest data")
119 uuid, err := und.RecoverManifest(string(mtxt))
121 logger.WithError(err).Error("recovery failed")
125 logger.WithField("UUID", uuid).Info("recovery succeeded")
126 fmt.Fprintln(stdout, uuid)
132 type undeleter struct {
133 client *arvados.Client
134 cluster *arvados.Cluster
135 logger logrus.FieldLogger
138 var errNotFound = errors.New("not found")
140 // Finds the timestamp of the newest copy of blk on svc. Returns
141 // errNotFound if blk is not on svc at all.
142 func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
143 found, err := svc.Index(und.client, blk)
145 logger.WithError(err).Warn("error getting index")
146 return time.Time{}, err
147 } else if len(found) == 0 {
148 return time.Time{}, errNotFound
151 for _, ent := range found {
152 t := time.Unix(0, ent.Mtime)
157 logger.WithField("latest", latest).Debug("found")
161 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
163 // Ensures the given block exists on the given server and won't be
164 // eligible for trashing until after our chosen deadline (blobsigexp).
165 // Returns an error if the block doesn't exist on the given server, or
166 // has an old timestamp and can't be updated.
168 // After we decide a block is "safe" (whether or not we had to untrash
169 // it), keep-balance might notice that it's currently unreferenced and
170 // decide to trash it, all before our recovered collection gets
171 // saved. But if the block's timestamp is more recent than blobsigttl,
172 // keepstore will refuse to trash it even if told to by keep-balance.
173 func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
174 if latest, err := und.newestMtime(logger, blk, svc); err != nil {
176 } else if latest.Add(blobsigttl).After(blobsigexp) {
179 if err := svc.Touch(ctx, und.client, blk); err != nil {
180 return fmt.Errorf("error updating timestamp: %s", err)
182 logger.Debug("updated timestamp")
183 if latest, err := und.newestMtime(logger, blk, svc); err == errNotFound {
184 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
185 } else if err != nil {
187 } else if latest.Add(blobsigttl).After(blobsigexp) {
190 return errTouchIneffective
194 // Untrash and update GC timestamps (as needed) on blocks referenced
195 // by the given manifest, save a new collection and return the new
196 // collection's UUID.
197 func (und undeleter) RecoverManifest(mtxt string) (string, error) {
198 ctx, cancel := context.WithCancel(context.Background())
201 coll := arvados.Collection{ManifestText: mtxt}
202 blks, err := coll.SizedDigests()
206 todo := make(chan int, len(blks))
207 for idx := range blks {
212 var services []arvados.KeepService
213 err = und.client.EachKeepService(func(svc arvados.KeepService) error {
214 if svc.ServiceType == "proxy" {
215 und.logger.WithField("service", svc).Debug("ignore proxy service")
217 services = append(services, svc)
222 return "", fmt.Errorf("error getting list of keep services: %s", err)
224 und.logger.WithField("services", services).Debug("got list of services")
226 // blobsigexp is our deadline for saving the rescued
227 // collection. This must be less than BlobSigningTTL
228 // (otherwise our rescued blocks could be garbage collected
229 // again before we protect them by saving the collection) but
230 // the exact value is somewhat arbitrary. If it's too soon, it
231 // will arrive before we're ready to save, and save will
232 // fail. If it's too late, we'll needlessly update timestamps
233 // on some blocks that were recently written/touched (e.g., by
234 // a previous attempt to rescue this same collection) and
235 // would have lived long enough anyway if left alone.
236 // BlobSigningTTL/2 (typically around 1 week) is much longer
237 // than than we need to recover even a very large collection.
238 blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
239 blobsigexp := time.Now().Add(blobsigttl / 2)
240 und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
242 // We'll start a number of threads, each working on
243 // checking/recovering one block at a time. The threads
244 // themselves don't need much CPU/memory, but to avoid hitting
245 // limits on keepstore connections, backend storage bandwidth,
246 // etc., we limit concurrency to 2 per keepstore node.
247 workerThreads := 2 * len(services)
249 blkFound := make([]bool, len(blks))
250 var wg sync.WaitGroup
251 for i := 0; i < workerThreads; i++ {
256 for idx := range todo {
257 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
258 logger := und.logger.WithField("block", blk)
259 for _, untrashing := range []bool{false, true} {
260 for _, svc := range services {
261 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
263 if err := svc.Untrash(ctx, und.client, blk); err != nil {
264 logger.WithError(err).Debug("untrash failed")
267 logger.Info("untrashed")
269 err := und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
270 if err == errNotFound {
272 } else if err != nil {
280 logger.Debug("unrecoverable")
286 var have, havenot int
287 for _, ok := range blkFound {
296 und.logger.Warn("partial recovery is not implemented")
298 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
301 if und.cluster.Collections.BlobSigning {
302 key := []byte(und.cluster.Collections.BlobSigningKey)
303 coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
305 und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
306 err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
307 "collection": map[string]interface{}{
308 "manifest_text": coll.ManifestText,
312 return "", fmt.Errorf("error saving new collection: %s", err)
314 und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
315 return coll.UUID, nil