X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/682dd5b6cc23a455766a7651e3e841257660b31c..9bd3b2729a61f62ddbab10ac65fd9f7de837a10d:/services/api/app/models/job.rb diff --git a/services/api/app/models/job.rb b/services/api/app/models/job.rb index 6c24293334..248d16a4ef 100644 --- a/services/api/app/models/job.rb +++ b/services/api/app/models/job.rb @@ -2,6 +2,7 @@ class Job < ArvadosModel include HasUuid include KindAndEtag include CommonApiTemplate + serialize :components, Hash attr_protected :arvados_sdk_version, :docker_image_locator serialize :script_parameters, Hash serialize :runtime_constraints, Hash @@ -10,6 +11,7 @@ class Job < ArvadosModel after_commit :trigger_crunch_dispatch_if_cancelled, :on => :update before_validation :set_priority before_validation :update_state_from_old_state_attrs + before_validation :update_script_parameters_digest validate :ensure_script_version_is_commit validate :find_docker_image_locator validate :find_arvados_sdk_version @@ -52,6 +54,7 @@ class Job < ArvadosModel t.add :queue_position t.add :node_uuids t.add :description + t.add :components end # Supported states for a job @@ -78,12 +81,13 @@ class Job < ArvadosModel end def queue_position - Job::queue.each_with_index do |job, index| - if job[:uuid] == self.uuid - return index - end - end - nil + # We used to report this accurately, but the implementation made queue + # API requests O(n**2) for the size of the queue. See #8800. + # We've soft-disabled it because it's not clear we even want this + # functionality: now that we have Node Manager with support for multiple + # node sizes, "queue position" tells you very little about when a job will + # run. + state == Queued ? 0 : nil end def self.running @@ -92,8 +96,7 @@ class Job < ArvadosModel end def lock locked_by_uuid - transaction do - self.reload + with_lock do unless self.state == Queued and self.is_locked_by_uuid.nil? raise AlreadyLockedError end @@ -103,8 +106,188 @@ class Job < ArvadosModel end end + def update_script_parameters_digest + self.script_parameters_digest = self.class.sorted_hash_digest(script_parameters) + end + + def self.searchable_columns operator + super - ["script_parameters_digest"] + end + + def self.load_job_specific_filters attrs, orig_filters, read_users + # Convert Job-specific @filters entries into general SQL filters. + script_info = {"repository" => nil, "script" => nil} + git_filters = Hash.new do |hash, key| + hash[key] = {"max_version" => "HEAD", "exclude_versions" => []} + end + filters = [] + orig_filters.each do |attr, operator, operand| + if (script_info.has_key? attr) and (operator == "=") + if script_info[attr].nil? + script_info[attr] = operand + elsif script_info[attr] != operand + raise ArgumentError.new("incompatible #{attr} filters") + end + end + case operator + when "in git" + git_filters[attr]["min_version"] = operand + when "not in git" + git_filters[attr]["exclude_versions"] += Array.wrap(operand) + when "in docker", "not in docker" + image_hashes = Array.wrap(operand).flat_map do |search_term| + image_search, image_tag = search_term.split(':', 2) + Collection. + find_all_for_docker_image(image_search, image_tag, read_users). + map(&:portable_data_hash) + end + filters << [attr, operator.sub(/ docker$/, ""), image_hashes] + else + filters << [attr, operator, operand] + end + end + + # Build a real script_version filter from any "not? in git" filters. + git_filters.each_pair do |attr, filter| + case attr + when "script_version" + script_info.each_pair do |key, value| + if value.nil? + raise ArgumentError.new("script_version filter needs #{key} filter") + end + end + filter["repository"] = script_info["repository"] + if attrs[:script_version] + filter["max_version"] = attrs[:script_version] + else + # Using HEAD, set earlier by the hash default, is fine. + end + when "arvados_sdk_version" + filter["repository"] = "arvados" + else + raise ArgumentError.new("unknown attribute for git filter: #{attr}") + end + revisions = Commit.find_commit_range(filter["repository"], + filter["min_version"], + filter["max_version"], + filter["exclude_versions"]) + if revisions.empty? + raise ArgumentError. + new("error searching #{filter['repository']} from " + + "'#{filter['min_version']}' to '#{filter['max_version']}', " + + "excluding #{filter['exclude_versions']}") + end + filters.append([attr, "in", revisions]) + end + + filters + end + + def self.find_reusable attrs, params, filters, read_users + if filters.empty? # Translate older creation parameters into filters. + filters = + [["repository", "=", attrs[:repository]], + ["script", "=", attrs[:script]], + ["script_version", "not in git", params[:exclude_script_versions]], + ].reject { |filter| filter.last.nil? or filter.last.empty? } + if !params[:minimum_script_version].blank? + filters << ["script_version", "in git", + params[:minimum_script_version]] + else + filters += default_git_filters("script_version", attrs[:repository], + attrs[:script_version]) + end + if image_search = attrs[:runtime_constraints].andand["docker_image"] + if image_tag = attrs[:runtime_constraints]["docker_image_tag"] + image_search += ":#{image_tag}" + end + image_locator = Collection. + for_latest_docker_image(image_search).andand.portable_data_hash + else + image_locator = nil + end + filters << ["docker_image_locator", "=", image_locator] + if sdk_version = attrs[:runtime_constraints].andand["arvados_sdk_version"] + filters += default_git_filters("arvados_sdk_version", "arvados", sdk_version) + end + filters = load_job_specific_filters(attrs, filters, read_users) + end + + # Check specified filters for some reasonableness. + filter_names = filters.map { |f| f.first }.uniq + ["repository", "script"].each do |req_filter| + if not filter_names.include?(req_filter) + return send_error("#{req_filter} filter required") + end + end + + # Search for a reusable Job, and return it if found. + candidates = Job. + readable_by(current_user). + where('state = ? or (owner_uuid = ? and state in (?))', + Job::Complete, current_user.uuid, [Job::Queued, Job::Running]). + where('script_parameters_digest = ?', Job.sorted_hash_digest(attrs[:script_parameters])). + where('nondeterministic is distinct from ?', true). + order('state desc, created_at') # prefer Running jobs over Queued + candidates = apply_filters candidates, filters + chosen = nil + incomplete_job = nil + candidates.each do |j| + if j.state != Job::Complete + # We'll use this if we don't find a job that has completed + incomplete_job ||= j + next + end + + if chosen == false + # We have already decided not to reuse any completed job + next + elsif chosen + if chosen.output != j.output + # If two matching jobs produced different outputs, run a new + # job (or use one that's already running/queued) instead of + # choosing one arbitrarily. + chosen = false + end + # ...and that's the only thing we need to do once we've chosen + # a job to reuse. + elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.output) + # As soon as the output we will end up returning (if any) is + # decided, check whether it will be visible to the user; if + # not, any further investigation of reusable jobs is futile. + chosen = false + else + chosen = j + end + end + chosen || incomplete_job + end + + def self.default_git_filters(attr_name, repo_name, refspec) + # Add a filter to @filters for `attr_name` = the latest commit available + # in `repo_name` at `refspec`. No filter is added if refspec can't be + # resolved. + commits = Commit.find_commit_range(repo_name, nil, refspec, nil) + if commit_hash = commits.first + [[attr_name, "=", commit_hash]] + else + [] + end + end + protected + def self.sorted_hash_digest h + Digest::MD5.hexdigest(Oj.dump(deep_sort_hash(h))) + end + + def self.deep_sort_hash h + return h unless h.is_a? Hash + h.sort.collect do |k, v| + [k, deep_sort_hash(v)] + end.to_h + end + def foreign_key_attributes super + %w(output log) end @@ -238,7 +421,8 @@ class Job < ArvadosModel output_changed? or log_changed? or tasks_summary_changed? or - state_changed? + state_changed? or + components_changed? logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}" return false end