1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/lib/config"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/ctxlog"
21 "github.com/sirupsen/logrus"
28 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
30 logger := ctxlog.New(stderr, "text", "info")
33 logger.WithError(err).Error("fatal")
35 logger.Info("exiting")
38 loader := config.NewLoader(stdin, logger)
39 loader.SkipLegacy = true
41 flags := flag.NewFlagSet("", flag.ContinueOnError)
42 flags.SetOutput(stderr)
43 flags.Usage = func() {
44 fmt.Fprintf(flags.Output(), `Usage:
45 %s [options ...] { /path/to/manifest.txt | log-entry-uuid } [...]
47 This program recovers deleted collections. Recovery is
48 possible when the collection's manifest is still available and
49 all of its data blocks are still available or recoverable
50 (e.g., garbage collection is not enabled, the blocks are too
51 new for garbage collection, the blocks are referenced by other
52 collections, or the blocks have been trashed but not yet
55 Collections can be specified either by filename (a local file
56 containing a manifest with the desired data) or by log UUID
57 (an Arvados log entry, typically a "delete" or "update" event,
58 whose "old attributes" have a manifest with the desired data).
60 For each provided collection manifest, once all data blocks
61 are recovered/protected from garbage collection, a new
62 collection is saved and its UUID is printed on stdout.
64 Restored collections will belong to the system (root) user.
66 Exit status will be zero if recovery is successful, i.e., a
67 collection is saved for each provided manifest.
72 loader.SetupFlags(flags)
73 loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
74 err = flags.Parse(args)
75 if err == flag.ErrHelp {
78 } else if err != nil {
82 if len(flags.Args()) == 0 {
87 lvl, err := logrus.ParseLevel(*loglevel)
93 cfg, err := loader.Load()
97 cluster, err := cfg.GetCluster("")
101 client, err := arvados.NewClientFromConfig(cluster)
105 client.AuthToken = cluster.SystemRootToken
113 for _, src := range flags.Args() {
114 logger := logger.WithField("src", src)
116 if len(src) == 27 && src[5:12] == "-57u5n-" {
118 EventType string `json:"event_type"`
119 EventAt time.Time `json:"event_at"`
120 ObjectUUID string `json:"object_uuid"`
122 OldAttributes struct {
123 ManifestText string `json:"manifest_text"`
124 } `json:"old_attributes"`
125 } `json:"properties"`
127 err = client.RequestAndDecode(&logent, "GET", "arvados/v1/logs/"+src, nil, nil)
129 logger.WithError(err).Error("failed to load log entry")
133 logger.WithFields(logrus.Fields{
134 "old_collection_uuid": logent.ObjectUUID,
135 "logged_event_type": logent.EventType,
136 "logged_event_time": logent.EventAt,
137 }).Info("loaded log entry")
138 mtxt = logent.Properties.OldAttributes.ManifestText
140 buf, err := ioutil.ReadFile(src)
142 logger.WithError(err).Error("failed to load manifest data from file")
148 uuid, err := und.RecoverManifest(string(mtxt))
150 logger.WithError(err).Error("recovery failed")
154 logger.WithField("UUID", uuid).Info("recovery succeeded")
155 fmt.Fprintln(stdout, uuid)
160 type undeleter struct {
161 client *arvados.Client
162 cluster *arvados.Cluster
163 logger logrus.FieldLogger
166 var errNotFound = errors.New("not found")
168 // Finds the timestamp of the newest copy of blk on svc. Returns
169 // errNotFound if blk is not on svc at all.
170 func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
171 found, err := svc.Index(und.client, blk)
173 logger.WithError(err).Warn("error getting index")
174 return time.Time{}, err
175 } else if len(found) == 0 {
176 return time.Time{}, errNotFound
179 for _, ent := range found {
180 t := time.Unix(0, ent.Mtime)
185 logger.WithField("latest", latest).Debug("found")
189 var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -- reported timestamp is still too old")
191 // Ensures the given block exists on the given server and won't be
192 // eligible for trashing until after our chosen deadline (blobsigexp).
193 // Returns an error if the block doesn't exist on the given server, or
194 // has an old timestamp and can't be updated.
196 // After we decide a block is "safe" (whether or not we had to untrash
197 // it), keep-balance might notice that it's currently unreferenced and
198 // decide to trash it, all before our recovered collection gets
199 // saved. But if the block's timestamp is more recent than blobsigttl,
200 // keepstore will refuse to trash it even if told to by keep-balance.
201 func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
202 if latest, err := und.newestMtime(logger, blk, svc); err != nil {
204 } else if latest.Add(blobsigttl).After(blobsigexp) {
207 if err := svc.Touch(ctx, und.client, blk); err != nil {
208 return fmt.Errorf("error updating timestamp: %s", err)
210 logger.Debug("updated timestamp")
211 if latest, err := und.newestMtime(logger, blk, svc); err == errNotFound {
212 return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
213 } else if err != nil {
215 } else if latest.Add(blobsigttl).After(blobsigexp) {
218 return errTouchIneffective
222 // Untrash and update GC timestamps (as needed) on blocks referenced
223 // by the given manifest, save a new collection and return the new
224 // collection's UUID.
225 func (und undeleter) RecoverManifest(mtxt string) (string, error) {
226 ctx, cancel := context.WithCancel(context.Background())
229 coll := arvados.Collection{ManifestText: mtxt}
230 blks, err := coll.SizedDigests()
234 todo := make(chan int, len(blks))
235 for idx := range blks {
240 var services []arvados.KeepService
241 err = und.client.EachKeepService(func(svc arvados.KeepService) error {
242 if svc.ServiceType == "proxy" {
243 und.logger.WithField("service", svc).Debug("ignore proxy service")
245 services = append(services, svc)
250 return "", fmt.Errorf("error getting list of keep services: %s", err)
252 und.logger.WithField("services", services).Debug("got list of services")
254 // blobsigexp is our deadline for saving the rescued
255 // collection. This must be less than BlobSigningTTL
256 // (otherwise our rescued blocks could be garbage collected
257 // again before we protect them by saving the collection) but
258 // the exact value is somewhat arbitrary. If it's too soon, it
259 // will arrive before we're ready to save, and save will
260 // fail. If it's too late, we'll needlessly update timestamps
261 // on some blocks that were recently written/touched (e.g., by
262 // a previous attempt to rescue this same collection) and
263 // would have lived long enough anyway if left alone.
264 // BlobSigningTTL/2 (typically around 1 week) is much longer
265 // than than we need to recover even a very large collection.
266 blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
267 blobsigexp := time.Now().Add(blobsigttl / 2)
268 und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
270 // We'll start a number of threads, each working on
271 // checking/recovering one block at a time. The threads
272 // themselves don't need much CPU/memory, but to avoid hitting
273 // limits on keepstore connections, backend storage bandwidth,
274 // etc., we limit concurrency to 2 per keepstore node.
275 workerThreads := 2 * len(services)
277 blkFound := make([]bool, len(blks))
278 var wg sync.WaitGroup
279 for i := 0; i < workerThreads; i++ {
284 for idx := range todo {
285 blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
286 logger := und.logger.WithField("block", blk)
287 for _, untrashing := range []bool{false, true} {
288 for _, svc := range services {
289 logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
291 if err := svc.Untrash(ctx, und.client, blk); err != nil {
292 logger.WithError(err).Debug("untrash failed")
295 logger.Info("untrashed")
297 err := und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp)
298 if err == errNotFound {
300 } else if err != nil {
308 logger.Debug("unrecoverable")
314 var have, havenot int
315 for _, ok := range blkFound {
324 und.logger.Warn("partial recovery is not implemented")
326 return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
329 if und.cluster.Collections.BlobSigning {
330 key := []byte(und.cluster.Collections.BlobSigningKey)
331 coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
333 und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
334 err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
335 "collection": map[string]interface{}{
336 "manifest_text": coll.ManifestText,
340 return "", fmt.Errorf("error saving new collection: %s", err)
342 und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
343 return coll.UUID, nil