X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cbba74fcd57b7b81337d44c2e663ba317e6538de..6dd8a072ec2e305df687f72dd294d760ae6c8e23:/services/api/app/models/job.rb diff --git a/services/api/app/models/job.rb b/services/api/app/models/job.rb index 0aaa0bd3f9..fa38ece244 100644 --- a/services/api/app/models/job.rb +++ b/services/api/app/models/job.rb @@ -1,7 +1,12 @@ +require 'log_reuse_info' +require 'safe_json' + class Job < ArvadosModel include HasUuid include KindAndEtag include CommonApiTemplate + extend CurrentApiClient + extend LogReuseInfo serialize :components, Hash attr_protected :arvados_sdk_version, :docker_image_locator serialize :script_parameters, Hash @@ -66,6 +71,14 @@ class Job < ArvadosModel (Complete = 'Complete'), ] + after_initialize do + @need_crunch_dispatch_trigger = false + end + + def self.limit_index_columns_read + ["components"] + end + def assert_finished update_attributes(finished_at: finished_at || db_current_time, success: success.nil? ? false : success, @@ -114,19 +127,248 @@ class Job < ArvadosModel super - ["script_parameters_digest"] end + def self.full_text_searchable_columns + super - ["script_parameters_digest"] + end + + def self.load_job_specific_filters attrs, orig_filters, read_users + # Convert Job-specific @filters entries into general SQL filters. + script_info = {"repository" => nil, "script" => nil} + git_filters = Hash.new do |hash, key| + hash[key] = {"max_version" => "HEAD", "exclude_versions" => []} + end + filters = [] + orig_filters.each do |attr, operator, operand| + if (script_info.has_key? attr) and (operator == "=") + if script_info[attr].nil? + script_info[attr] = operand + elsif script_info[attr] != operand + raise ArgumentError.new("incompatible #{attr} filters") + end + end + case operator + when "in git" + git_filters[attr]["min_version"] = operand + when "not in git" + git_filters[attr]["exclude_versions"] += Array.wrap(operand) + when "in docker", "not in docker" + image_hashes = Array.wrap(operand).flat_map do |search_term| + image_search, image_tag = search_term.split(':', 2) + Collection. + find_all_for_docker_image(image_search, image_tag, read_users, filter_compatible_format: false). + map(&:portable_data_hash) + end + filters << [attr, operator.sub(/ docker$/, ""), image_hashes] + else + filters << [attr, operator, operand] + end + end + + # Build a real script_version filter from any "not? in git" filters. + git_filters.each_pair do |attr, filter| + case attr + when "script_version" + script_info.each_pair do |key, value| + if value.nil? + raise ArgumentError.new("script_version filter needs #{key} filter") + end + end + filter["repository"] = script_info["repository"] + if attrs[:script_version] + filter["max_version"] = attrs[:script_version] + else + # Using HEAD, set earlier by the hash default, is fine. + end + when "arvados_sdk_version" + filter["repository"] = "arvados" + else + raise ArgumentError.new("unknown attribute for git filter: #{attr}") + end + revisions = Commit.find_commit_range(filter["repository"], + filter["min_version"], + filter["max_version"], + filter["exclude_versions"]) + if revisions.empty? + raise ArgumentError. + new("error searching #{filter['repository']} from " + + "'#{filter['min_version']}' to '#{filter['max_version']}', " + + "excluding #{filter['exclude_versions']}") + end + filters.append([attr, "in", revisions]) + end + + filters + end + + def self.find_reusable attrs, params, filters, read_users + if filters.empty? # Translate older creation parameters into filters. + filters = + [["repository", "=", attrs[:repository]], + ["script", "=", attrs[:script]], + ["script_version", "not in git", params[:exclude_script_versions]], + ].reject { |filter| filter.last.nil? or filter.last.empty? } + if !params[:minimum_script_version].blank? + filters << ["script_version", "in git", + params[:minimum_script_version]] + else + filters += default_git_filters("script_version", attrs[:repository], + attrs[:script_version]) + end + if image_search = attrs[:runtime_constraints].andand["docker_image"] + if image_tag = attrs[:runtime_constraints]["docker_image_tag"] + image_search += ":#{image_tag}" + end + image_locator = Collection. + for_latest_docker_image(image_search).andand.portable_data_hash + else + image_locator = nil + end + filters << ["docker_image_locator", "=", image_locator] + if sdk_version = attrs[:runtime_constraints].andand["arvados_sdk_version"] + filters += default_git_filters("arvados_sdk_version", "arvados", sdk_version) + end + filters = load_job_specific_filters(attrs, filters, read_users) + end + + # Check specified filters for some reasonableness. + filter_names = filters.map { |f| f.first }.uniq + ["repository", "script"].each do |req_filter| + if not filter_names.include?(req_filter) + return send_error("#{req_filter} filter required") + end + end + + # Search for a reusable Job, and return it if found. + candidates = Job.readable_by(current_user) + log_reuse_info { "starting with #{candidates.count} jobs readable by current user #{current_user.uuid}" } + + candidates = candidates.where( + 'state = ? or (owner_uuid = ? and state in (?))', + Job::Complete, current_user.uuid, [Job::Queued, Job::Running]) + log_reuse_info(candidates) { "after filtering on job state ((state=Complete) or (state=Queued/Running and (submitted by current user)))" } + + digest = Job.sorted_hash_digest(attrs[:script_parameters]) + candidates = candidates.where('script_parameters_digest = ?', digest) + log_reuse_info(candidates) { "after filtering on script_parameters_digest #{digest}" } + + candidates = candidates.where('nondeterministic is distinct from ?', true) + log_reuse_info(candidates) { "after filtering on !nondeterministic" } + + # prefer Running jobs over Queued + candidates = candidates.order('state desc, created_at') + + candidates = apply_filters candidates, filters + log_reuse_info(candidates) { "after filtering on repo, script, and custom filters #{filters.inspect}" } + + chosen = nil + incomplete_job = nil + candidates.each do |j| + if j.state != Job::Complete + if !incomplete_job + # We'll use this if we don't find a job that has completed + log_reuse_info { "job #{j.uuid} is reusable, but unfinished; continuing search for completed jobs" } + incomplete_job = j + else + log_reuse_info { "job #{j.uuid} is unfinished and we already have #{incomplete_job.uuid}; ignoring" } + end + elsif chosen == false + # Ignore: we have already decided not to reuse any completed + # job. + log_reuse_info { "job #{j.uuid} with output #{j.output} ignored, see above" } + elsif Rails.configuration.reuse_job_if_outputs_differ + if Collection.readable_by(current_user).find_by_portable_data_hash(j.output) + log_reuse_info { "job #{j.uuid} with output #{j.output} is reusable; decision is final." } + return j + else + # Ignore: keep locking for an incomplete job or one whose + # output is readable. + log_reuse_info { "job #{j.uuid} output #{j.output} unavailable to user; continuing search" } + end + elsif chosen + if chosen.output != j.output + # If two matching jobs produced different outputs, run a new + # job (or use one that's already running/queued) instead of + # choosing one arbitrarily. + log_reuse_info { "job #{j.uuid} output #{j.output} disagrees; forgetting about #{chosen.uuid} and ignoring any other finished jobs (see reuse_job_if_outputs_differ in application.default.yml)" } + chosen = false + else + log_reuse_info { "job #{j.uuid} output #{j.output} agrees with chosen #{chosen.uuid}; continuing search in case other candidates have different outputs" } + end + # ...and that's the only thing we need to do once we've chosen + # a job to reuse. + elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.output) + # This user cannot read the output of this job. Any other + # completed job will have either the same output (making it + # unusable) or a different output (making it unusable because + # reuse_job_if_outputs_different is turned off). Therefore, + # any further investigation of reusable jobs is futile. + log_reuse_info { "job #{j.uuid} output #{j.output} is unavailable to user; this means no finished job can be reused (see reuse_job_if_outputs_differ in application.default.yml)" } + chosen = false + else + log_reuse_info { "job #{j.uuid} with output #{j.output} can be reused; continuing search in case other candidates have different outputs" } + chosen = j + end + end + j = chosen || incomplete_job + if j + log_reuse_info { "done, #{j.uuid} was selected" } + else + log_reuse_info { "done, nothing suitable" } + end + return j + end + + def self.default_git_filters(attr_name, repo_name, refspec) + # Add a filter to @filters for `attr_name` = the latest commit available + # in `repo_name` at `refspec`. No filter is added if refspec can't be + # resolved. + commits = Commit.find_commit_range(repo_name, nil, refspec, nil) + if commit_hash = commits.first + [[attr_name, "=", commit_hash]] + else + [] + end + end + + def cancel(cascade: false, need_transaction: true) + if need_transaction + ActiveRecord::Base.transaction do + cancel(cascade: cascade, need_transaction: false) + end + return + end + + if self.state.in?([Queued, Running]) + self.state = Cancelled + self.save! + elsif self.state != Cancelled + raise InvalidStateTransitionError + end + + return if !cascade + + # cancel all children; they could be jobs or pipeline instances + children = self.components.andand.collect{|_, u| u}.compact + + return if children.empty? + + # cancel any child jobs + Job.where(uuid: children, state: [Queued, Running]).each do |job| + job.cancel(cascade: cascade, need_transaction: false) + end + + # cancel any child pipelines + PipelineInstance.where(uuid: children, state: [PipelineInstance::RunningOnServer, PipelineInstance::RunningOnClient]).each do |pi| + pi.cancel(cascade: cascade, need_transaction: false) + end + end + protected def self.sorted_hash_digest h Digest::MD5.hexdigest(Oj.dump(deep_sort_hash(h))) end - def self.deep_sort_hash h - return h unless h.is_a? Hash - h.sort.collect do |k, v| - [k, deep_sort_hash(v)] - end.to_h - end - def foreign_key_attributes super + %w(output log) end @@ -181,7 +423,7 @@ class Job < ArvadosModel assign_uuid Commit.tag_in_internal_repository repository, script_version, uuid rescue - uuid = uuid_was + self.uuid = uuid_was raise end end @@ -227,10 +469,11 @@ class Job < ArvadosModel end def find_docker_image_locator - runtime_constraints['docker_image'] = - Rails.configuration.default_docker_image_for_jobs if ((runtime_constraints.is_a? Hash) and - (runtime_constraints['docker_image']).nil? and - Rails.configuration.default_docker_image_for_jobs) + if runtime_constraints.is_a? Hash + runtime_constraints['docker_image'] ||= + Rails.configuration.default_docker_image_for_jobs + end + resolve_runtime_constraint("docker_image", :docker_image_locator) do |image_search| image_tag = runtime_constraints['docker_image_tag'] @@ -260,7 +503,7 @@ class Job < ArvadosModel output_changed? or log_changed? or tasks_summary_changed? or - state_changed? or + (state_changed? && state != Cancelled) or components_changed? logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}" return false @@ -410,24 +653,6 @@ class Job < ArvadosModel end def ensure_no_collection_uuids_in_script_params - # recursive_hash_search searches recursively through hashes and - # arrays in 'thing' for string fields matching regular expression - # 'pattern'. Returns true if pattern is found, false otherwise. - def recursive_hash_search thing, pattern - if thing.is_a? Hash - thing.each do |k, v| - return true if recursive_hash_search v, pattern - end - elsif thing.is_a? Array - thing.each do |k| - return true if recursive_hash_search k, pattern - end - elsif thing.is_a? String - return true if thing.match pattern - end - false - end - # Fail validation if any script_parameters field includes a string containing a # collection uuid pattern. if self.script_parameters_changed? @@ -438,4 +663,22 @@ class Job < ArvadosModel end true end + + # recursive_hash_search searches recursively through hashes and + # arrays in 'thing' for string fields matching regular expression + # 'pattern'. Returns true if pattern is found, false otherwise. + def recursive_hash_search thing, pattern + if thing.is_a? Hash + thing.each do |k, v| + return true if recursive_hash_search v, pattern + end + elsif thing.is_a? Array + thing.each do |k| + return true if recursive_hash_search k, pattern + end + elsif thing.is_a? String + return true if thing.match pattern + end + false + end end