13561: Avoid collections.index to include old versions
[arvados.git] / services / api / app / models / collection.rb
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 require 'arvados/keep'
6 require 'sweep_trashed_objects'
7 require 'trashable'
8
9 class Collection < ArvadosModel
10   extend CurrentApiClient
11   extend DbCurrentTime
12   include HasUuid
13   include KindAndEtag
14   include CommonApiTemplate
15   include Trashable
16
17   serialize :properties, Hash
18   serialize :storage_classes_desired, Array
19   serialize :storage_classes_confirmed, Array
20
21   before_validation :default_empty_manifest
22   before_validation :default_storage_classes, on: :create
23   before_validation :check_encoding
24   before_validation :check_manifest_validity
25   before_validation :check_signatures
26   before_validation :strip_signatures_and_update_replication_confirmed
27   validate :ensure_pdh_matches_manifest_text
28   validate :ensure_storage_classes_desired_is_not_empty
29   validate :ensure_storage_classes_contain_non_empty_strings
30   validate :old_versions_cannot_be_updated, on: :update
31   before_save :set_file_names
32   before_create :set_current_version_uuid
33
34   api_accessible :user, extend: :common do |t|
35     t.add :name
36     t.add :description
37     t.add :properties
38     t.add :portable_data_hash
39     t.add :signed_manifest_text, as: :manifest_text
40     t.add :manifest_text, as: :unsigned_manifest_text
41     t.add :replication_desired
42     t.add :replication_confirmed
43     t.add :replication_confirmed_at
44     t.add :storage_classes_desired
45     t.add :storage_classes_confirmed
46     t.add :storage_classes_confirmed_at
47     t.add :delete_at
48     t.add :trash_at
49     t.add :is_trashed
50     t.add :version
51     t.add :current_version_uuid
52     t.add :preserve_version
53   end
54
55   after_initialize do
56     @signatures_checked = false
57     @computed_pdh_for_manifest_text = false
58   end
59
60   def self.attributes_required_columns
61     super.merge(
62                 # If we don't list manifest_text explicitly, the
63                 # params[:select] code gets confused by the way we
64                 # expose signed_manifest_text as manifest_text in the
65                 # API response, and never let clients select the
66                 # manifest_text column.
67                 #
68                 # We need trash_at and is_trashed to determine the
69                 # correct timestamp in signed_manifest_text.
70                 'manifest_text' => ['manifest_text', 'trash_at', 'is_trashed'],
71                 'unsigned_manifest_text' => ['manifest_text'],
72                 )
73   end
74
75   def self.ignored_select_attributes
76     super + ["updated_at", "file_names"]
77   end
78
79   def self.limit_index_columns_read
80     ["manifest_text"]
81   end
82
83   FILE_TOKEN = /^[[:digit:]]+:[[:digit:]]+:/
84   def check_signatures
85     return false if self.manifest_text.nil?
86
87     return true if current_user.andand.is_admin
88
89     # Provided the manifest_text hasn't changed materially since an
90     # earlier validation, it's safe to pass this validation on
91     # subsequent passes without checking any signatures. This is
92     # important because the signatures have probably been stripped off
93     # by the time we get to a second validation pass!
94     if @signatures_checked && @signatures_checked == computed_pdh
95       return true
96     end
97
98     if self.manifest_text_changed?
99       # Check permissions on the collection manifest.
100       # If any signature cannot be verified, raise PermissionDeniedError
101       # which will return 403 Permission denied to the client.
102       api_token = current_api_client_authorization.andand.api_token
103       signing_opts = {
104         api_token: api_token,
105         now: @validation_timestamp.to_i,
106       }
107       self.manifest_text.each_line do |entry|
108         entry.split.each do |tok|
109           if tok == '.' or tok.starts_with? './'
110             # Stream name token.
111           elsif tok =~ FILE_TOKEN
112             # This is a filename token, not a blob locator. Note that we
113             # keep checking tokens after this, even though manifest
114             # format dictates that all subsequent tokens will also be
115             # filenames. Safety first!
116           elsif Blob.verify_signature tok, signing_opts
117             # OK.
118           elsif Keep::Locator.parse(tok).andand.signature
119             # Signature provided, but verify_signature did not like it.
120             logger.warn "Invalid signature on locator #{tok}"
121             raise ArvadosModel::PermissionDeniedError
122           elsif Rails.configuration.permit_create_collection_with_unsigned_manifest
123             # No signature provided, but we are running in insecure mode.
124             logger.debug "Missing signature on locator #{tok} ignored"
125           elsif Blob.new(tok).empty?
126             # No signature provided -- but no data to protect, either.
127           else
128             logger.warn "Missing signature on locator #{tok}"
129             raise ArvadosModel::PermissionDeniedError
130           end
131         end
132       end
133     end
134     @signatures_checked = computed_pdh
135   end
136
137   def strip_signatures_and_update_replication_confirmed
138     if self.manifest_text_changed?
139       in_old_manifest = {}
140       if not self.replication_confirmed.nil?
141         self.class.each_manifest_locator(manifest_text_was) do |match|
142           in_old_manifest[match[1]] = true
143         end
144       end
145
146       stripped_manifest = self.class.munge_manifest_locators(manifest_text) do |match|
147         if not self.replication_confirmed.nil? and not in_old_manifest[match[1]]
148           # If the new manifest_text contains locators whose hashes
149           # weren't in the old manifest_text, storage replication is no
150           # longer confirmed.
151           self.replication_confirmed_at = nil
152           self.replication_confirmed = nil
153         end
154
155         # Return the locator with all permission signatures removed,
156         # but otherwise intact.
157         match[0].gsub(/\+A[^+]*/, '')
158       end
159
160       if @computed_pdh_for_manifest_text == manifest_text
161         # If the cached PDH was valid before stripping, it is still
162         # valid after stripping.
163         @computed_pdh_for_manifest_text = stripped_manifest.dup
164       end
165
166       self[:manifest_text] = stripped_manifest
167     end
168     true
169   end
170
171   def ensure_pdh_matches_manifest_text
172     if not manifest_text_changed? and not portable_data_hash_changed?
173       true
174     elsif portable_data_hash.nil? or not portable_data_hash_changed?
175       self.portable_data_hash = computed_pdh
176     elsif portable_data_hash !~ Keep::Locator::LOCATOR_REGEXP
177       errors.add(:portable_data_hash, "is not a valid locator")
178       false
179     elsif portable_data_hash[0..31] != computed_pdh[0..31]
180       errors.add(:portable_data_hash,
181                  "'#{portable_data_hash}' does not match computed hash '#{computed_pdh}'")
182       false
183     else
184       # Ignore the client-provided size part: always store
185       # computed_pdh in the database.
186       self.portable_data_hash = computed_pdh
187     end
188   end
189
190   def set_file_names
191     if self.manifest_text_changed?
192       self.file_names = manifest_files
193     end
194     true
195   end
196
197   def manifest_files
198     return '' if !self.manifest_text
199
200     names = ''
201     self.manifest_text.scan(/ \d+:\d+:(\S+)/) do |name|
202       names << name.first.gsub('\040',' ') + "\n"
203     end
204     self.manifest_text.scan(/^\.\/(\S+)/m) do |stream_name|
205       names << stream_name.first.gsub('\040',' ') + "\n"
206     end
207     names
208   end
209
210   def default_empty_manifest
211     self.manifest_text ||= ''
212   end
213
214   def skip_uuid_existence_check
215     # Avoid checking the existence of current_version_uuid, as it's
216     # assigned on creation of a new 'current version' collection, so
217     # the collection's UUID only lives on memory when the validation check
218     # is performed.
219     ['current_version_uuid']
220   end
221
222   def set_current_version_uuid
223     self.current_version_uuid ||= self.uuid
224   end
225
226   def save! *args
227     # Skip if feature is disabled or saving a new record
228     if !Rails.configuration.collection_versioning || new_record?
229       return super
230     end
231     # Skip if updating a past version
232     if !self.changes.include?('uuid') && current_version_uuid != uuid
233       return super
234     end
235     # Skip if current version shouldn't (explicitly or implicitly) be preserved
236     if !should_preserve_version?
237       return super
238     end
239
240     changes = self.changes
241     # Updates that will be synced with older versions
242     synced_updates = ['uuid', 'owner_uuid', 'delete_at', 'trash_at', 'is_trashed',
243                       'replication_desired', 'storage_classes_desired'] & changes.keys
244     # Updates that will produce a new version
245     versionable_updates = ['manifest_text', 'description', 'properties', 'name'] & changes.keys
246
247     if versionable_updates.empty?
248       # Keep preserve_version enabled for the next update, if applicable.
249       self.preserve_version ||= self.preserve_version_was
250       if !self.changes.include?('preserve_version')
251         changes.delete('preserve_version')
252       end
253
254       if synced_updates.empty?
255         # Updates don't include interesting attributes, so don't save a new
256         # snapshot nor sync older versions.
257         return super
258       end
259     end
260
261     # Does row locking (transaction is implicit) because 'version'
262     # may be incremented and the older versions synced.
263     # Note that 'with_lock' reloads the object after locking.
264     with_lock do
265       # Sync older versions.
266       if !synced_updates.empty?
267         updates = {}
268         synced_updates.each do |attr|
269           if attr == 'uuid'
270             # Point old versions to current version's new UUID
271             updates['current_version_uuid'] = changes[attr].last
272           else
273             updates[attr] = changes[attr].last
274           end
275         end
276         Collection.where('current_version_uuid = ? AND uuid != ?', uuid, uuid).each do |c|
277           c.attributes = updates
278           # Use a different validation context to skip the 'old_versions_cannot_be_updated'
279           # validator, as on this case it is legal to update some fields.
280           leave_modified_by_user_alone do
281             c.save(context: :update_old_versions)
282           end
283         end
284         # Also update current object just in case a new version will be created,
285         # as it has to receive the same values for the synced attributes.
286         self.attributes = updates
287       end
288       snapshot = nil
289       # Make a new version if applicable.
290       if !versionable_updates.empty?
291         # Create a snapshot of the original collection
292         snapshot = self.dup
293         snapshot.uuid = nil # Reset UUID so it's created as a new record
294         snapshot.created_at = created_at
295         # Update current version number
296         self.version += 1
297         self.preserve_version = false
298       end
299       # Restore requested changes on the current version
300       changes.keys.each do |attr|
301         next if attr == 'version'
302         self.attributes = {attr => changes[attr].last}
303       end
304       # Save current version first to avoid index collision
305       super
306       # Save the snapshot with previous state (if applicable)
307       snapshot.andand.save!
308       return true
309     end
310   end
311
312   def should_preserve_version?
313     idle_threshold = Rails.configuration.preserve_version_if_idle
314     if !self.preserve_version_was &&
315       (idle_threshold < 0 ||
316         (idle_threshold > 0 && self.modified_at_was > db_current_time-idle_threshold.seconds))
317       return false
318     end
319     return true
320   end
321
322   def check_encoding
323     if manifest_text.encoding.name == 'UTF-8' and manifest_text.valid_encoding?
324       true
325     else
326       begin
327         # If Ruby thinks the encoding is something else, like 7-bit
328         # ASCII, but its stored bytes are equal to the (valid) UTF-8
329         # encoding of the same string, we declare it to be a UTF-8
330         # string.
331         utf8 = manifest_text
332         utf8.force_encoding Encoding::UTF_8
333         if utf8.valid_encoding? and utf8 == manifest_text.encode(Encoding::UTF_8)
334           self.manifest_text = utf8
335           return true
336         end
337       rescue
338       end
339       errors.add :manifest_text, "must use UTF-8 encoding"
340       false
341     end
342   end
343
344   def check_manifest_validity
345     begin
346       Keep::Manifest.validate! manifest_text
347       true
348     rescue ArgumentError => e
349       errors.add :manifest_text, e.message
350       false
351     end
352   end
353
354   def signed_manifest_text
355     if !has_attribute? :manifest_text
356       return nil
357     elsif is_trashed
358       return manifest_text
359     else
360       token = current_api_client_authorization.andand.api_token
361       exp = [db_current_time.to_i + Rails.configuration.blob_signature_ttl,
362              trash_at].compact.map(&:to_i).min
363       self.class.sign_manifest manifest_text, token, exp
364     end
365   end
366
367   def self.sign_manifest manifest, token, exp=nil
368     if exp.nil?
369       exp = db_current_time.to_i + Rails.configuration.blob_signature_ttl
370     end
371     signing_opts = {
372       api_token: token,
373       expire: exp,
374     }
375     m = munge_manifest_locators(manifest) do |match|
376       Blob.sign_locator(match[0], signing_opts)
377     end
378     return m
379   end
380
381   def self.munge_manifest_locators manifest
382     # Given a manifest text and a block, yield the regexp MatchData
383     # for each locator. Return a new manifest in which each locator
384     # has been replaced by the block's return value.
385     return nil if !manifest
386     return '' if manifest == ''
387
388     new_lines = []
389     manifest.each_line do |line|
390       line.rstrip!
391       new_words = []
392       line.split(' ').each do |word|
393         if new_words.empty?
394           new_words << word
395         elsif match = Keep::Locator::LOCATOR_REGEXP.match(word)
396           new_words << yield(match)
397         else
398           new_words << word
399         end
400       end
401       new_lines << new_words.join(' ')
402     end
403     new_lines.join("\n") + "\n"
404   end
405
406   def self.each_manifest_locator manifest
407     # Given a manifest text and a block, yield the regexp match object
408     # for each locator.
409     manifest.each_line do |line|
410       # line will have a trailing newline, but the last token is never
411       # a locator, so it's harmless here.
412       line.split(' ').each do |word|
413         if match = Keep::Locator::LOCATOR_REGEXP.match(word)
414           yield(match)
415         end
416       end
417     end
418   end
419
420   def self.normalize_uuid uuid
421     hash_part = nil
422     size_part = nil
423     uuid.split('+').each do |token|
424       if token.match(/^[0-9a-f]{32,}$/)
425         raise "uuid #{uuid} has multiple hash parts" if hash_part
426         hash_part = token
427       elsif token.match(/^\d+$/)
428         raise "uuid #{uuid} has multiple size parts" if size_part
429         size_part = token
430       end
431     end
432     raise "uuid #{uuid} has no hash part" if !hash_part
433     [hash_part, size_part].compact.join '+'
434   end
435
436   def self.get_compatible_images(readers, pattern, collections)
437     if collections.empty?
438       return []
439     end
440
441     migrations = Hash[
442       Link.where('tail_uuid in (?) AND link_class=? AND links.owner_uuid=?',
443                  collections.map(&:portable_data_hash),
444                  'docker_image_migration',
445                  system_user_uuid).
446       order('links.created_at asc').
447       map { |l|
448         [l.tail_uuid, l.head_uuid]
449       }]
450
451     migrated_collections = Hash[
452       Collection.readable_by(*readers).
453       where('portable_data_hash in (?)', migrations.values).
454       map { |c|
455         [c.portable_data_hash, c]
456       }]
457
458     collections.map { |c|
459       # Check if the listed image is compatible first, if not, then try the
460       # migration link.
461       manifest = Keep::Manifest.new(c.manifest_text)
462       if manifest.exact_file_count?(1) and manifest.files[0][1] =~ pattern
463         c
464       elsif m = migrated_collections[migrations[c.portable_data_hash]]
465         manifest = Keep::Manifest.new(m.manifest_text)
466         if manifest.exact_file_count?(1) and manifest.files[0][1] =~ pattern
467           m
468         end
469       end
470     }.compact
471   end
472
473   # Resolve a Docker repo+tag, hash, or collection PDH to an array of
474   # Collection objects, sorted by timestamp starting with the most recent
475   # match.
476   #
477   # If filter_compatible_format is true (the default), only return image
478   # collections which are support by the installation as indicated by
479   # Rails.configuration.docker_image_formats.  Will follow
480   # 'docker_image_migration' links if search_term resolves to an incompatible
481   # image, but an equivalent compatible image is available.
482   def self.find_all_for_docker_image(search_term, search_tag=nil, readers=nil, filter_compatible_format: true)
483     readers ||= [Thread.current[:user]]
484     base_search = Link.
485       readable_by(*readers).
486       readable_by(*readers, table_name: "collections").
487       joins("JOIN collections ON links.head_uuid = collections.uuid").
488       order("links.created_at DESC")
489
490     if (Rails.configuration.docker_image_formats.include? 'v1' and
491         Rails.configuration.docker_image_formats.include? 'v2') or filter_compatible_format == false
492       pattern = /^(sha256:)?[0-9A-Fa-f]{64}\.tar$/
493     elsif Rails.configuration.docker_image_formats.include? 'v2'
494       pattern = /^(sha256:)[0-9A-Fa-f]{64}\.tar$/
495     elsif Rails.configuration.docker_image_formats.include? 'v1'
496       pattern = /^[0-9A-Fa-f]{64}\.tar$/
497     else
498       raise "Unrecognized configuration for docker_image_formats #{Rails.configuration.docker_image_formats}"
499     end
500
501     # If the search term is a Collection locator that contains one file
502     # that looks like a Docker image, return it.
503     if loc = Keep::Locator.parse(search_term)
504       loc.strip_hints!
505       coll_match = readable_by(*readers).where(portable_data_hash: loc.to_s).limit(1)
506       return get_compatible_images(readers, pattern, coll_match)
507     end
508
509     if search_tag.nil? and (n = search_term.index(":"))
510       search_tag = search_term[n+1..-1]
511       search_term = search_term[0..n-1]
512     end
513
514     # Find Collections with matching Docker image repository+tag pairs.
515     matches = base_search.
516       where(link_class: "docker_image_repo+tag",
517             name: "#{search_term}:#{search_tag || 'latest'}")
518
519     # If that didn't work, find Collections with matching Docker image hashes.
520     if matches.empty?
521       matches = base_search.
522         where("link_class = ? and links.name LIKE ?",
523               "docker_image_hash", "#{search_term}%")
524     end
525
526     # Generate an order key for each result.  We want to order the results
527     # so that anything with an image timestamp is considered more recent than
528     # anything without; then we use the link's created_at as a tiebreaker.
529     uuid_timestamps = {}
530     matches.each do |link|
531       uuid_timestamps[link.head_uuid] = [(-link.properties["image_timestamp"].to_datetime.to_i rescue 0),
532        -link.created_at.to_i]
533      end
534
535     sorted = Collection.where('uuid in (?)', uuid_timestamps.keys).sort_by { |c|
536       uuid_timestamps[c.uuid]
537     }
538     compatible = get_compatible_images(readers, pattern, sorted)
539     if sorted.length > 0 and compatible.empty?
540       raise ArvadosModel::UnresolvableContainerError.new "Matching Docker image is incompatible with 'docker_image_formats' configuration."
541     end
542     compatible
543   end
544
545   def self.for_latest_docker_image(search_term, search_tag=nil, readers=nil)
546     find_all_for_docker_image(search_term, search_tag, readers).first
547   end
548
549   def self.searchable_columns operator
550     super - ["manifest_text", "current_version_uuid"]
551   end
552
553   def self.full_text_searchable_columns
554     super - ["manifest_text", "storage_classes_desired", "storage_classes_confirmed", "current_version_uuid"]
555   end
556
557   def self.where *args
558     SweepTrashedObjects.sweep_if_stale
559     super
560   end
561
562   protected
563
564   # Although the defaults for these columns is already set up on the schema,
565   # collection creation from an API client seems to ignore them, making the
566   # validation on empty desired storage classes return an error.
567   def default_storage_classes
568     if self.storage_classes_desired.nil? || self.storage_classes_desired.empty?
569       self.storage_classes_desired = ["default"]
570     end
571     self.storage_classes_confirmed ||= []
572   end
573
574   def portable_manifest_text
575     self.class.munge_manifest_locators(manifest_text) do |match|
576       if match[2] # size
577         match[1] + match[2]
578       else
579         match[1]
580       end
581     end
582   end
583
584   def compute_pdh
585     portable_manifest = portable_manifest_text
586     (Digest::MD5.hexdigest(portable_manifest) +
587      '+' +
588      portable_manifest.bytesize.to_s)
589   end
590
591   def computed_pdh
592     if @computed_pdh_for_manifest_text == manifest_text
593       return @computed_pdh
594     end
595     @computed_pdh = compute_pdh
596     @computed_pdh_for_manifest_text = manifest_text.dup
597     @computed_pdh
598   end
599
600   def ensure_permission_to_save
601     if (not current_user.andand.is_admin)
602       if (replication_confirmed_at_changed? or replication_confirmed_changed?) and
603         not (replication_confirmed_at.nil? and replication_confirmed.nil?)
604         raise ArvadosModel::PermissionDeniedError.new("replication_confirmed and replication_confirmed_at attributes cannot be changed, except by setting both to nil")
605       end
606       if (storage_classes_confirmed_changed? or storage_classes_confirmed_at_changed?) and
607         not (storage_classes_confirmed == [] and storage_classes_confirmed_at.nil?)
608         raise ArvadosModel::PermissionDeniedError.new("storage_classes_confirmed and storage_classes_confirmed_at attributes cannot be changed, except by setting them to [] and nil respectively")
609       end
610     end
611     super
612   end
613
614   def ensure_storage_classes_desired_is_not_empty
615     if self.storage_classes_desired.empty?
616       raise ArvadosModel::InvalidStateTransitionError.new("storage_classes_desired shouldn't be empty")
617     end
618   end
619
620   def ensure_storage_classes_contain_non_empty_strings
621     (self.storage_classes_desired + self.storage_classes_confirmed).each do |c|
622       if !c.is_a?(String) || c == ''
623         raise ArvadosModel::InvalidStateTransitionError.new("storage classes should only be non-empty strings")
624       end
625     end
626   end
627
628   def old_versions_cannot_be_updated
629     # We check for the '_was' values just in case the update operation
630     # includes a change on current_version_uuid or uuid.
631     if current_version_uuid_was != uuid_was
632       raise ArvadosModel::PermissionDeniedError.new("previous versions cannot be updated")
633     end
634   end
635 end