-class JobSummarizer(Summarizer):
- def __init__(self, job, **kwargs):
- arv = arvados.api('v1')
- if isinstance(job, basestring):
- self.job = arv.jobs().get(uuid=job).execute()
- else:
- self.job = job
- if self.job['log']:
- rdr = CollectionReader(self.job['log'])
- label = self.job['uuid']
- else:
- rdr = LiveLogReader(self.job['uuid'])
- label = self.job['uuid'] + ' (partial)'
- super(JobSummarizer, self).__init__(rdr, **kwargs)
+def NewSummarizer(process_or_uuid, **kwargs):
+ """Construct with the appropriate subclass for this uuid/object."""
+
+ if isinstance(process_or_uuid, dict):
+ process = process_or_uuid
+ uuid = process['uuid']
+ else:
+ uuid = process_or_uuid
+ process = None
+ arv = arvados.api('v1', model=OrderedJsonModel())
+
+ if '-dz642-' in uuid:
+ if process is None:
+ process = arv.containers().get(uuid=uuid).execute()
+ klass = ContainerTreeSummarizer
+ elif '-xvhdp-' in uuid:
+ if process is None:
+ process = arv.container_requests().get(uuid=uuid).execute()
+ klass = ContainerTreeSummarizer
+ elif '-8i9sb-' in uuid:
+ if process is None:
+ process = arv.jobs().get(uuid=uuid).execute()
+ klass = JobSummarizer
+ elif '-d1hrv-' in uuid:
+ if process is None:
+ process = arv.pipeline_instances().get(uuid=uuid).execute()
+ klass = PipelineSummarizer
+ elif '-4zz18-' in uuid:
+ return CollectionSummarizer(collection_id=uuid)
+ else:
+ raise ArgumentError("Unrecognized uuid %s", uuid)
+ return klass(process, uuid=uuid, **kwargs)
+
+
+class ProcessSummarizer(Summarizer):
+ """Process is a job, pipeline, container, or container request."""
+
+ def __init__(self, process, label=None, **kwargs):
+ rdr = None
+ self.process = process
+ if label is None:
+ label = self.process.get('name', self.process['uuid'])
+ if self.process.get('log'):
+ try:
+ rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
+ except arvados.errors.NotFoundError as e:
+ logger.warning("Trying event logs after failing to read "
+ "log collection %s: %s", self.process['log'], e)
+ if rdr is None:
+ rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
+ label = label + ' (partial)'
+ super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
+ self.existing_constraints = self.process.get('runtime_constraints', {})
+
+
+class JobSummarizer(ProcessSummarizer):
+ runtime_constraint_mem_unit = 1048576
+ map_runtime_constraint = {
+ 'keep_cache_ram': 'keep_cache_mb_per_task',
+ 'ram': 'min_ram_mb_per_node',
+ 'vcpus': 'min_cores_per_node',
+ }
+
+
+class ContainerSummarizer(ProcessSummarizer):
+ runtime_constraint_mem_unit = 1
+
+
+class MultiSummarizer(object):
+ def __init__(self, children={}, label=None, threads=1, **kwargs):
+ self.throttle = threading.Semaphore(threads)
+ self.children = children