--- /dev/null
+import csv
+import sys
+
+select_column = sys.argv[1]
+select_values = sys.argv[2]
+dataset = sys.argv[3]
+cluster = sys.argv[4]
+
+sv = open(select_values, "rt")
+selectvals = [s.strip() for s in sv]
+
+print("selectvals", selectvals)
+
+ds = csv.reader(open(dataset, "rt"))
+header = next(ds)
+print("header is", header)
+columnindex = None
+for i,v in enumerate(header):
+ if v == select_column:
+ columnindex = i
+if columnindex is None:
+ raise Exception("Column %s not found" % select_column)
+
+print("column index", columnindex)
+
+ex = csv.writer(open("extracted.csv", "wt"))
+ex.writerow(["cluster"]+list(header))
+
+for row in ds:
+ if row[columnindex] in selectvals:
+ ex.writerow([cluster]+list(row))
-#
-# Demonstrate Arvados federation features. This performs a parallel
-# scatter over some arbitrary number of files and federated clusters,
-# then joins the results.
-#
+# Demonstrate Arvados federation features. This example searches a
+# list of CSV files that are hosted on different Arvados clusters.
+# For each file, send a task to the remote cluster which will scan
+# file and extracts the rows where the column "select_column" has one
+# of the values appearing in the "select_values" file. The home
+# cluster then runs a task which pulls the results from the remote
+# clusters and merges the results to produce a final report.
+
cwlVersion: v1.0
class: Workflow
$namespaces:
dockerPull: arvados/jobs
# Define a record type so we can conveniently associate the input
- # file, the cluster on which the file lives, and the project on that
- # cluster that will own the container requests and intermediate
- # outputs.
+ # file and the cluster where the task should run.
SchemaDefRequirement:
types:
- - name: FileOnCluster
- type: record
- fields:
- file: File
- cluster: string
- project: string
+ - $import: FileOnCluster.yml
inputs:
- # Expect an array of FileOnCluster records (defined above)
- # as our input.
- shards:
+ select_column: string
+ select_values: File
+
+ datasets:
type:
type: array
- items: FileOnCluster
+ items: FileOnCluster.yml#FileOnCluster
+
+ intermediate_projects: string[]
outputs:
# Will produce an output file with the results of the distributed
- # analysis jobs joined together.
+ # analysis jobs merged together.
joined:
type: File
- outputSource: gather-results/joined
+ outputSource: gather-results/out
steps:
distributed-analysis:
in:
- # Take "shards" array as input, we scatter over it below.
- shard: shards
-
- # Use an expression to extract the "file" field to assign to the
- # "inp" parameter of the tool.
- inp: {valueFrom: $(inputs.shard.file)}
+ select_column: select_column
+ select_values: select_values
+ dataset: datasets
+ intermediate_projects: intermediate_projects
# Scatter over shards, this means creating a parallel job for each
# element in the "shards" array. Expressions are evaluated for
# each element.
- scatter: shard
+ scatter: [dataset, intermediate_projects]
+ scatterMethod: dotproduct
- # Specify the cluster target for this job. This means each
- # separate scatter job will execute on the cluster that was
+ # Specify the cluster target for this task. This means each
+ # separate scatter task will execute on the cluster that was
# specified in the "cluster" field.
#
# Arvados handles streaming data between clusters, for example,
# the federation.
hints:
arv:ClusterTarget:
- cluster_id: $(inputs.shard.cluster)
- project_uuid: $(inputs.shard.project)
+ cluster_id: $(inputs.dataset.cluster)
+ project_uuid: $(inputs.intermediate_projects)
out: [out]
- run: md5sum.cwl
+ run: extract.cwl
# Collect the results of the distributed step and join them into a
# single output file. Arvados handles streaming inputs,
# intermediate results, and outputs between clusters on demand.
gather-results:
in:
- inp: distributed-analysis/out
- out: [joined]
- run: cat.cwl
+ dataset: distributed-analysis/out
+ out: [out]
+ run: merge.cwl
-shards:
+select_column: color
+select_values:
+ class: File
+ location: colors_to_select.txt
+
+datasets:
- cluster: clsr1
- project: clsr1-j7d0g-qxc4jcji7n4lafx
file:
class: File
- location: keep:485df2c5cec3207a32f49c42f1cdcca9+61/file-on-clsr1.dat
+ location: keep:0dcf9310e5bf0c07270416d3a0cd6a43+56/items1.csv
- cluster: clsr2
- project: clsr2-j7d0g-ivdrm1hyym21vkq
file:
class: File
- location: keep:ae6e9c3e9bfa52a0122ecb489d8198ff+61/file-on-clsr2.dat
+ location: keep:12707d325a3f4687674b858bd32beae9+56/items2.csv
- cluster: clsr3
- project: clsr3-j7d0g-e3njz2s53lyb0ka
file:
class: File
- location: keep:0b43a0ef9ea592d5d7b299978dfa8643+61/file-on-clsr3.dat
+ location: keep:dbff6bb7fc43176527af5eb9dec28871+56/items3.csv
+
+intermediate_projects:
+ - clsr1-j7d0g-qxc4jcji7n4lafx
+ - clsr2-j7d0g-e7r20egb8hlgn53
+ - clsr3-j7d0g-vrl00zoku9spnen