X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d5ebfb1d35d2458742c545af4ee16c9f14de08bb..6dd8a072ec2e305df687f72dd294d760ae6c8e23:/services/api/app/models/job.rb diff --git a/services/api/app/models/job.rb b/services/api/app/models/job.rb index e7d1b39ce9..fa38ece244 100644 --- a/services/api/app/models/job.rb +++ b/services/api/app/models/job.rb @@ -1,8 +1,12 @@ +require 'log_reuse_info' +require 'safe_json' + class Job < ArvadosModel include HasUuid include KindAndEtag include CommonApiTemplate extend CurrentApiClient + extend LogReuseInfo serialize :components, Hash attr_protected :arvados_sdk_version, :docker_image_locator serialize :script_parameters, Hash @@ -67,6 +71,14 @@ class Job < ArvadosModel (Complete = 'Complete'), ] + after_initialize do + @need_crunch_dispatch_trigger = false + end + + def self.limit_index_columns_read + ["components"] + end + def assert_finished update_attributes(finished_at: finished_at || db_current_time, success: success.nil? ? false : success, @@ -115,6 +127,10 @@ class Job < ArvadosModel super - ["script_parameters_digest"] end + def self.full_text_searchable_columns + super - ["script_parameters_digest"] + end + def self.load_job_specific_filters attrs, orig_filters, read_users # Convert Job-specific @filters entries into general SQL filters. script_info = {"repository" => nil, "script" => nil} @@ -139,7 +155,7 @@ class Job < ArvadosModel image_hashes = Array.wrap(operand).flat_map do |search_term| image_search, image_tag = search_term.split(':', 2) Collection. - find_all_for_docker_image(image_search, image_tag, read_users). + find_all_for_docker_image(image_search, image_tag, read_users, filter_compatible_format: false). map(&:portable_data_hash) end filters << [attr, operator.sub(/ docker$/, ""), image_hashes] @@ -223,45 +239,83 @@ class Job < ArvadosModel end # Search for a reusable Job, and return it if found. - candidates = Job. - readable_by(current_user). - where('state = ? or (owner_uuid = ? and state in (?))', - Job::Complete, current_user.uuid, [Job::Queued, Job::Running]). - where('script_parameters_digest = ?', Job.sorted_hash_digest(attrs[:script_parameters])). - where('nondeterministic is distinct from ?', true). - order('state desc, created_at') # prefer Running jobs over Queued + candidates = Job.readable_by(current_user) + log_reuse_info { "starting with #{candidates.count} jobs readable by current user #{current_user.uuid}" } + + candidates = candidates.where( + 'state = ? or (owner_uuid = ? and state in (?))', + Job::Complete, current_user.uuid, [Job::Queued, Job::Running]) + log_reuse_info(candidates) { "after filtering on job state ((state=Complete) or (state=Queued/Running and (submitted by current user)))" } + + digest = Job.sorted_hash_digest(attrs[:script_parameters]) + candidates = candidates.where('script_parameters_digest = ?', digest) + log_reuse_info(candidates) { "after filtering on script_parameters_digest #{digest}" } + + candidates = candidates.where('nondeterministic is distinct from ?', true) + log_reuse_info(candidates) { "after filtering on !nondeterministic" } + + # prefer Running jobs over Queued + candidates = candidates.order('state desc, created_at') + candidates = apply_filters candidates, filters + log_reuse_info(candidates) { "after filtering on repo, script, and custom filters #{filters.inspect}" } + chosen = nil incomplete_job = nil candidates.each do |j| if j.state != Job::Complete - # We'll use this if we don't find a job that has completed - incomplete_job ||= j - next - end - - if chosen == false - # We have already decided not to reuse any completed job - next + if !incomplete_job + # We'll use this if we don't find a job that has completed + log_reuse_info { "job #{j.uuid} is reusable, but unfinished; continuing search for completed jobs" } + incomplete_job = j + else + log_reuse_info { "job #{j.uuid} is unfinished and we already have #{incomplete_job.uuid}; ignoring" } + end + elsif chosen == false + # Ignore: we have already decided not to reuse any completed + # job. + log_reuse_info { "job #{j.uuid} with output #{j.output} ignored, see above" } + elsif Rails.configuration.reuse_job_if_outputs_differ + if Collection.readable_by(current_user).find_by_portable_data_hash(j.output) + log_reuse_info { "job #{j.uuid} with output #{j.output} is reusable; decision is final." } + return j + else + # Ignore: keep locking for an incomplete job or one whose + # output is readable. + log_reuse_info { "job #{j.uuid} output #{j.output} unavailable to user; continuing search" } + end elsif chosen if chosen.output != j.output # If two matching jobs produced different outputs, run a new # job (or use one that's already running/queued) instead of # choosing one arbitrarily. + log_reuse_info { "job #{j.uuid} output #{j.output} disagrees; forgetting about #{chosen.uuid} and ignoring any other finished jobs (see reuse_job_if_outputs_differ in application.default.yml)" } chosen = false + else + log_reuse_info { "job #{j.uuid} output #{j.output} agrees with chosen #{chosen.uuid}; continuing search in case other candidates have different outputs" } end # ...and that's the only thing we need to do once we've chosen # a job to reuse. elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.output) - # As soon as the output we will end up returning (if any) is - # decided, check whether it will be visible to the user; if - # not, any further investigation of reusable jobs is futile. + # This user cannot read the output of this job. Any other + # completed job will have either the same output (making it + # unusable) or a different output (making it unusable because + # reuse_job_if_outputs_different is turned off). Therefore, + # any further investigation of reusable jobs is futile. + log_reuse_info { "job #{j.uuid} output #{j.output} is unavailable to user; this means no finished job can be reused (see reuse_job_if_outputs_differ in application.default.yml)" } chosen = false else + log_reuse_info { "job #{j.uuid} with output #{j.output} can be reused; continuing search in case other candidates have different outputs" } chosen = j end end - chosen || incomplete_job + j = chosen || incomplete_job + if j + log_reuse_info { "done, #{j.uuid} was selected" } + else + log_reuse_info { "done, nothing suitable" } + end + return j end def self.default_git_filters(attr_name, repo_name, refspec) @@ -276,24 +330,45 @@ class Job < ArvadosModel end end + def cancel(cascade: false, need_transaction: true) + if need_transaction + ActiveRecord::Base.transaction do + cancel(cascade: cascade, need_transaction: false) + end + return + end + + if self.state.in?([Queued, Running]) + self.state = Cancelled + self.save! + elsif self.state != Cancelled + raise InvalidStateTransitionError + end + + return if !cascade + + # cancel all children; they could be jobs or pipeline instances + children = self.components.andand.collect{|_, u| u}.compact + + return if children.empty? + + # cancel any child jobs + Job.where(uuid: children, state: [Queued, Running]).each do |job| + job.cancel(cascade: cascade, need_transaction: false) + end + + # cancel any child pipelines + PipelineInstance.where(uuid: children, state: [PipelineInstance::RunningOnServer, PipelineInstance::RunningOnClient]).each do |pi| + pi.cancel(cascade: cascade, need_transaction: false) + end + end + protected def self.sorted_hash_digest h Digest::MD5.hexdigest(Oj.dump(deep_sort_hash(h))) end - def self.deep_sort_hash x - if x.is_a? Hash - x.sort.collect do |k, v| - [k, deep_sort_hash(v)] - end.to_h - elsif x.is_a? Array - x.collect { |v| deep_sort_hash(v) } - else - x - end - end - def foreign_key_attributes super + %w(output log) end @@ -348,7 +423,7 @@ class Job < ArvadosModel assign_uuid Commit.tag_in_internal_repository repository, script_version, uuid rescue - uuid = uuid_was + self.uuid = uuid_was raise end end @@ -394,10 +469,11 @@ class Job < ArvadosModel end def find_docker_image_locator - runtime_constraints['docker_image'] = - Rails.configuration.default_docker_image_for_jobs if ((runtime_constraints.is_a? Hash) and - (runtime_constraints['docker_image']).nil? and - Rails.configuration.default_docker_image_for_jobs) + if runtime_constraints.is_a? Hash + runtime_constraints['docker_image'] ||= + Rails.configuration.default_docker_image_for_jobs + end + resolve_runtime_constraint("docker_image", :docker_image_locator) do |image_search| image_tag = runtime_constraints['docker_image_tag'] @@ -427,7 +503,7 @@ class Job < ArvadosModel output_changed? or log_changed? or tasks_summary_changed? or - state_changed? or + (state_changed? && state != Cancelled) or components_changed? logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}" return false @@ -577,24 +653,6 @@ class Job < ArvadosModel end def ensure_no_collection_uuids_in_script_params - # recursive_hash_search searches recursively through hashes and - # arrays in 'thing' for string fields matching regular expression - # 'pattern'. Returns true if pattern is found, false otherwise. - def recursive_hash_search thing, pattern - if thing.is_a? Hash - thing.each do |k, v| - return true if recursive_hash_search v, pattern - end - elsif thing.is_a? Array - thing.each do |k| - return true if recursive_hash_search k, pattern - end - elsif thing.is_a? String - return true if thing.match pattern - end - false - end - # Fail validation if any script_parameters field includes a string containing a # collection uuid pattern. if self.script_parameters_changed? @@ -605,4 +663,22 @@ class Job < ArvadosModel end true end + + # recursive_hash_search searches recursively through hashes and + # arrays in 'thing' for string fields matching regular expression + # 'pattern'. Returns true if pattern is found, false otherwise. + def recursive_hash_search thing, pattern + if thing.is_a? Hash + thing.each do |k, v| + return true if recursive_hash_search v, pattern + end + elsif thing.is_a? Array + thing.each do |k| + return true if recursive_hash_search k, pattern + end + elsif thing.is_a? String + return true if thing.match pattern + end + false + end end