'show' == ctrl.action_name
}
+ def generate_provenance(cr)
+ return if params['tab_pane'] != "Provenance"
+
+ nodes = {}
+ nodes[cr[:uuid]] = cr
+ if cr[:container_uuid]
+ ContainerRequest.where(requesting_container_uuid: cr[:container_uuid]).each do |child|
+ nodes[child[:uuid]] = child
+ end
+ end
+ @svg = ProvenanceHelper::create_provenance_graph nodes,
+ "provenance_svg",
+ {
+ :request => request,
+ :direction => :top_down,
+ }
+ end
+
def show_pane_list
- panes = %w(Status Log Advanced)
+ panes = %w(Status Log Provenance Advanced)
if @object.andand.state == 'Uncommitted'
- panes = %w(Inputs) + panes - %w(Log)
+ panes = %w(Inputs) + panes - %w(Log Provenance)
end
panes
end
+ def show
+ generate_provenance(@object)
+ super
+ end
+
def cancel
@object.update_attributes! priority: 0
if params[:return_to]
gr
end
+ def cr_edges cr
+ uuid = cr[:uuid]
+ gr = ""
+
+ # Search for input mounts
+ input_obj = cr[:mounts].andand[:"/var/lib/cwl/cwl.input.json"].andand[:content] || cr[:mounts] || {}
+ if input_obj
+ ProvenanceHelper::find_collections input_obj, 'input' do |col_hash, col_uuid, key|
+ if col_uuid
+ gr += describe_node(col_uuid)
+ gr += edge(col_uuid, uuid, {:label => key})
+ else
+ gr += describe_node(col_hash)
+ gr += edge(col_hash, uuid, {:label => key})
+ end
+ end
+ end
+
+ [
+ [:output_uuid, 'output'],
+ [:log_uuid, 'log']
+ ].each do |attr, label|
+ if cr[attr]
+ gr += describe_node(cr[attr])
+ gr += edge(uuid, cr[attr], {label: label})
+ end
+ end
+
+ gr
+ end
+
def job_edges job, edge_opts={}
uuid = job_uuid(job)
gr = ""
if rsc == Job
job = @pdata[uuid]
gr += job_edges job if job
+ elsif rsc == ContainerRequest
+ cr = @pdata[uuid]
+ if cr
+ gr += cr_edges cr
+ gr += describe_node(uuid, {href: {controller: 'container_requests',
+ id: uuid},
+ label: @pdata[uuid][:name],
+ shape: 'oval'})
+ # Search for child CRs
+ if cr[:container_uuid]
+ child_crs = ContainerRequest.where(requesting_container_uuid: cr[:container_uuid])
+ child_crs.each do |child|
+ gr += generate_provenance_edges(child[:uuid])
+ gr += edge(uuid, child[:uuid], {label: 'child'})
+ end
+ end
+ end
end
end
--- /dev/null
+<%= render partial: 'application/svg_div', locals: {
+ divId: "provenance_graph",
+ svgId: "provenance_svg",
+ svg: @svg } %>
assert_equal completed_cr['cmd'], copied_cr['cmd']
assert_equal completed_cr['runtime_constraints']['ram'], copied_cr['runtime_constraints'][:ram]
end
+
+ [
+ ['completed', true],
+ ['running', true],
+ ['queued', true],
+ ['uncommitted', false],
+ ].each do |cr_fixture, should_show|
+ test "provenance tab should #{should_show ? '' : 'not'} be shown on #{cr_fixture} container requests" do
+ cr = api_fixture('container_requests')[cr_fixture]
+ assert_not_nil cr
+ get(:show,
+ {id: cr['uuid']},
+ session_for(:active))
+ assert_response :success
+ if should_show
+ assert_includes @response.body, "href=\"#Provenance\""
+ else
+ assert_not_includes @response.body, "href=\"#Provenance\""
+ end
+ end
+ end
end
assert_text 'This workflow does not need any further inputs'
page.assert_selector 'a', text: 'Run'
end
+
+ test "Provenance graph shown on committed container requests" do
+ cr = api_fixture('container_requests', 'completed')
+ visit page_with_token("active", "/container_requests/#{cr['uuid']}")
+ assert page.has_text? 'Provenance'
+ click_link 'Provenance'
+ wait_for_ajax
+ # Check for provenance graph existance
+ page.assert_selector '#provenance_svg'
+ page.assert_selector 'ellipse+text', text: cr['name'], visible: false
+ page.assert_selector 'g.node>title', text: cr['uuid'], visible: false
+ end
end
The @arv-migrate-docker19@ tool converts Docker images stored in Arvados from image format v1 (Docker <= 1.9) to image format v2 (Docker >= 1.10).
-Requires Docker running on the local host (can be either 1.9 or 1.10+).
+Requires Docker running on the local host (can be either 1.9 or 1.10+). Linux kernel >= 3.18-rc6 to support overlayfs.
Usage:
-# Run @arvados/docker/migrate-docker19/build.sh@ to create @arvados/migrate-docker19@ Docker image.
+# Install arvados/migrate-docker19 image: @docker pull arvados/migrate-docker19:1.0@. If you're unable to do this, you can run @arvados/docker/migrate-docker19/build.sh@ to create @arvados/migrate-docker19@ Docker image.
+# Make sure you have the right modules installed: @sudo modprobe overlayfs bridge br_netfilter nf_nat@
# Set ARVADOS_API_HOST and ARVADOS_API_TOKEN to the cluster you want to migrate.
-# Run @arv-migrate-docker19@ from the Arvados Python SDK on the host (not in a container).
+# Your temporary directory should have the size of all layers of the biggest image in the cluster, this is hard to estimate, but you can start with five times that size. You can set up a different directory by using the @--tmp-dir@ switch. Make sure that the user running the docker daemon has permissions to write in that directory.
+# Run @arv-migrate-docker19 --dry-run@ from the Arvados Python SDK on the host (not in a container). This will print out some information useful for the migration.
+# Finally to make the migration run @arv-migrate-docker19@ from the Arvados Python SDK on the host (not in a container).
This will query Arvados for v1 format Docker images. For each image that does not already have a corresponding v2 format image (as indicated by a docker_image_migration tag) it will perform the following process:
from cwltool.errors import WorkflowException
from cwltool.process import get_feature, UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFiles, adjustDirObjs
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.utils import aslist
import arvados.collection
from .arvdocker import arv_docker_get_image
from . import done
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location
from .fsaccess import CollectionFetcher
-from .pathmapper import NoFollowPathMapper
+from .pathmapper import NoFollowPathMapper, trim_listing
from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
mounts = {
self.outdir: {
"kind": "tmp"
+ },
+ self.tmpdir: {
+ "kind": "tmp"
}
}
scheduling_parameters = {}
- dirs = set()
- for f in self.pathmapper.files():
- pdh, p, tp, stg = self.pathmapper.mapper(f)
- if tp == "Directory" and '/' not in pdh:
- mounts[p] = {
- "kind": "collection",
- "portable_data_hash": pdh[5:]
- }
- dirs.add(pdh)
-
- for f in self.pathmapper.files():
- res, p, tp, stg = self.pathmapper.mapper(f)
- if res.startswith("keep:"):
- res = res[5:]
- elif res.startswith("/keep/"):
- res = res[6:]
- else:
+ rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
+ rf.sort(key=lambda k: k.resolved)
+ prevdir = None
+ for resolved, target, tp, stg in rf:
+ if not stg:
continue
- sp = res.split("/", 1)
- pdh = sp[0]
- if pdh not in dirs:
- mounts[p] = {
- "kind": "collection",
- "portable_data_hash": pdh
- }
- if len(sp) == 2:
- mounts[p]["path"] = urllib.unquote(sp[1])
+ if prevdir and target.startswith(prevdir):
+ continue
+ if tp == "Directory":
+ targetdir = target
+ else:
+ targetdir = os.path.dirname(target)
+ sp = resolved.split("/", 1)
+ pdh = sp[0][5:] # remove "keep:"
+ mounts[targetdir] = {
+ "kind": "collection",
+ "portable_data_hash": pdh
+ }
+ if len(sp) == 2:
+ if tp == "Directory":
+ path = sp[1]
+ else:
+ path = os.path.dirname(sp[1])
+ if path and path != "/":
+ mounts[targetdir]["path"] = path
+ prevdir = targetdir + "/"
with Perf(metrics, "generatefiles %s" % self.name):
if self.generatefiles["listing"]:
"""
adjustDirObjs(self.job_order, trim_listing)
+ adjustFileObjs(self.job_order, trim_anonymous_location)
+ adjustDirObjs(self.job_order, trim_anonymous_location)
container_req = {
"owner_uuid": self.arvrunner.project_uuid,
from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
-from cwltool.pathmapper import adjustDirObjs
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from schema_salad.sourceline import SourceLine
import arvados.collection
from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing, upload_workflow_collection
-from .pathmapper import VwdPathMapper
+from .runner import Runner, arvados_jobs_image, packed_workflow, upload_workflow_collection, trim_anonymous_location
+from .pathmapper import VwdPathMapper, trim_listing
from .perf import Perf
from . import done
from ._version import __version__
self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
adjustDirObjs(self.job_order, trim_listing)
+ adjustFileObjs(self.job_order, trim_anonymous_location)
+ adjustDirObjs(self.job_order, trim_anonymous_location)
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
import ruamel.yaml as yaml
-from .runner import upload_dependencies, trim_listing, packed_workflow, upload_workflow_collection
+from .runner import upload_dependencies, packed_workflow, upload_workflow_collection, trim_anonymous_location
+from .pathmapper import trim_listing
from .arvtool import ArvadosCommandTool
from .perf import Perf
packed = packed_workflow(arvRunner, tool)
adjustDirObjs(job_order, trim_listing)
+ adjustFileObjs(job_order, trim_anonymous_location)
+ adjustDirObjs(job_order, trim_anonymous_location)
main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
for inp in main["inputs"]:
return v
def keeppathObj(v):
- v["location"] = keeppath(v["location"])
+ if "location" in v:
+ v["location"] = keeppath(v["location"])
for k,v in job_order_object.items():
if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
logger = logging.getLogger('arvados.cwl-runner')
+def trim_listing(obj):
+ """Remove 'listing' field from Directory objects that are keep references.
+
+ When Directory objects represent Keep references, it is redundant and
+ potentially very expensive to pass fully enumerated Directory objects
+ between instances of cwl-runner (e.g. a submitting a job, or using the
+ RunInSingleContainer feature), so delete the 'listing' field when it is
+ safe to do so.
+
+ """
+
+ if obj.get("location", "").startswith("keep:") and "listing" in obj:
+ del obj["listing"]
+
+
class ArvPathMapper(PathMapper):
"""Convert container-local paths to and from Keep collection ids."""
self.collection_pattern = collection_pattern
self.file_pattern = file_pattern
self.name = name
+ self.referenced_files = [r["location"] for r in referenced_files]
super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
def visit(self, srcobj, uploadfiles):
import ruamel.yaml as yaml
from .arvdocker import arv_docker_get_image
-from .pathmapper import ArvPathMapper
+from .pathmapper import ArvPathMapper, trim_listing
from ._version import __version__
from . import done
logger = logging.getLogger('arvados.cwl-runner')
-def trim_listing(obj):
- """Remove 'listing' field from Directory objects that are keep references.
+def trim_anonymous_location(obj):
+ """Remove 'location' field from File and Directory literals.
+
+ To make internal handling easier, literals are assigned a random id for
+ 'location'. However, when writing the record back out, this can break
+ reproducibility. Since it is valid for literals not have a 'location'
+ field, remove it.
- When Directory objects represent Keep references, it redundant and
- potentially very expensive to pass fully enumerated Directory objects
- between instances of cwl-runner (e.g. a submitting a job, or using the
- RunInSingleContainer feature), so delete the 'listing' field when it is
- safe to do so.
"""
- if obj.get("location", "").startswith("keep:") and "listing" in obj:
- del obj["listing"]
if obj.get("location", "").startswith("_:"):
del obj["location"]
'use_existing': enable_reuse,
'priority': 1,
'mounts': {
+ '/tmp': {'kind': 'tmp'},
'/var/spool/cwl': {'kind': 'tmp'}
},
'state': 'Committed',
'use_existing': True,
'priority': 1,
'mounts': {
+ '/tmp': {'kind': 'tmp'},
'/var/spool/cwl': {'kind': 'tmp'}
},
'state': 'Committed',
'use_existing': True,
'priority': 1,
'mounts': {
+ '/tmp': {'kind': 'tmp'},
'/var/spool/cwl': {'kind': 'tmp'},
'/var/spool/cwl/foo': {
'kind': 'collection',
'use_existing': True,
'priority': 1,
'mounts': {
+ '/tmp': {'kind': 'tmp'},
'/var/spool/cwl': {'kind': 'tmp'},
"stderr": {
"kind": "file",
arvjob.collect_outputs.assert_called_with("keep:abc+123")
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
+
+ # The test passes no builder.resources
+ # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_mounts(self, keepdocker):
+ arv_docker_clear_cache()
+
+ runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ tool = cmap({
+ "inputs": [
+ {"id": "p1",
+ "type": "Directory"}
+ ],
+ "outputs": [],
+ "baseCommand": "ls",
+ "arguments": [{"valueFrom": "$(runtime.outdir)"}]
+ })
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ arvtool.formatgraph = None
+ job_order = {
+ "p1": {
+ "class": "Directory",
+ "location": "keep:99999999999999999999999999999994+44",
+ "listing": [
+ {
+ "class": "File",
+ "location": "keep:99999999999999999999999999999994+44/file1",
+ },
+ {
+ "class": "File",
+ "location": "keep:99999999999999999999999999999994+44/file2",
+ }
+ ]
+ }
+ }
+ for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_run_mounts",
+ make_fs_access=make_fs_access, tmpdir="/tmp"):
+ j.run()
+ runner.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher({
+ 'environment': {
+ 'HOME': '/var/spool/cwl',
+ 'TMPDIR': '/tmp'
+ },
+ 'name': 'test_run_mounts',
+ 'runtime_constraints': {
+ 'vcpus': 1,
+ 'ram': 1073741824
+ },
+ 'use_existing': True,
+ 'priority': 1,
+ 'mounts': {
+ "/keep/99999999999999999999999999999994+44": {
+ "kind": "collection",
+ "portable_data_hash": "99999999999999999999999999999994+44"
+ },
+ '/tmp': {'kind': 'tmp'},
+ '/var/spool/cwl': {'kind': 'tmp'}
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': 'arvados/jobs',
+ 'command': ['ls', '/var/spool/cwl'],
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {},
+ 'properties': {},
+ }))
import (
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- . "gopkg.in/check.v1"
"net"
"net/http"
"os"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ . "gopkg.in/check.v1"
)
// Gocheck boilerplate
Dict{"log": Dict{"bogus_attr": "foo"}},
&getback)
c.Assert(err, ErrorMatches, "arvados API server error: .*")
- c.Assert(err, ErrorMatches, ".*unknown attribute: bogus_attr.*")
+ c.Assert(err, ErrorMatches, ".*unknown attribute(: | ')bogus_attr.*")
c.Assert(err, FitsTypeOf, APIServerError{})
c.Assert(err.(APIServerError).HttpStatusCode, Equals, 422)
}
"encoding/json"
"fmt"
"log"
+ "net/http"
"os"
"os/signal"
"reflect"
if this.Arvados.KeepServiceURIs != nil {
this.foundNonDiskSvc = true
this.replicasPerService = 0
- this.setClientSettingsNonDisk()
+ if c, ok := this.Client.(*http.Client); ok {
+ this.setClientSettingsNonDisk(c)
+ }
roots := make(map[string]string)
for i, uri := range this.Arvados.KeepServiceURIs {
roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
gatewayRoots[service.Uuid] = url
}
- if this.foundNonDiskSvc {
- this.setClientSettingsNonDisk()
- } else {
- this.setClientSettingsDisk()
+ if client, ok := this.Client.(*http.Client); ok {
+ if this.foundNonDiskSvc {
+ this.setClientSettingsNonDisk(client)
+ } else {
+ this.setClientSettingsDisk(client)
+ }
}
this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
"crypto/md5"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
)
// A Keep "block" is 64MB.
multipleResponseError
}
-var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
+type InsufficientReplicasError error
+
+type OversizeBlockError error
+
+var ErrOversizeBlock = OversizeBlockError(errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")"))
var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
var InvalidLocatorError = errors.New("Invalid locator")
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
+type HTTPClient interface {
+ Do(*http.Request) (*http.Response, error)
+}
+
// Information about Arvados and Keep servers.
type KeepClient struct {
Arvados *arvadosclient.ArvadosClient
writableLocalRoots *map[string]string
gatewayRoots *map[string]string
lock sync.RWMutex
- Client *http.Client
+ Client HTTPClient
Retries int
BlockCache *BlockCache
// Returns the locator for the written block, the number of replicas
// written, and an error.
//
-// Returns an InsufficientReplicas error if 0 <= replicas <
+// Returns an InsufficientReplicasError if 0 <= replicas <
// kc.Wants_replicas.
func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
// Buffer for reads from 'r'
var bufsize int
if dataBytes > 0 {
if dataBytes > BLOCKSIZE {
- return "", 0, OversizeBlockError
+ return "", 0, ErrOversizeBlock
}
bufsize = int(dataBytes)
} else {
import (
"crypto/md5"
+ "errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
- . "gopkg.in/check.v1"
"io"
"io/ioutil"
"log"
"strings"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
+ . "gopkg.in/check.v1"
)
// Gocheck boilerplate
_, replicas, err := kc.PutB([]byte("foo"))
- c.Check(err, Equals, InsufficientReplicasError)
+ c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
c.Check(replicas, Equals, 1)
c.Check(<-st.handled, Equals, ks1[0].url)
}
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
- c.Check(err, Equals, InsufficientReplicasError)
+ c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
c.Check(replicas, Equals, 2)
}
_, replicas, err := kc.PutB([]byte("foo"))
- c.Check(err, Equals, InsufficientReplicasError)
+ c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
c.Check(replicas, Equals, 1)
c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
_, replicas, err := kc.PutB([]byte("foo"))
- c.Check(err, Equals, InsufficientReplicasError)
+ c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
c.Check(replicas, Equals, 0)
}
c.Assert(kc.replicasPerService, Equals, 0)
c.Assert(kc.foundNonDiskSvc, Equals, true)
- c.Assert(kc.Client.Timeout, Equals, 300*time.Second)
+ c.Assert(kc.Client.(*http.Client).Timeout, Equals, 300*time.Second)
}
"crypto/md5"
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
"log"
"regexp"
"strings"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
)
// Function used to emit debug messages. The easiest way to enable
// Set timeouts applicable when connecting to non-disk services
// (assumed to be over the Internet).
-func (this *KeepClient) setClientSettingsNonDisk() {
- if this.Client.Timeout == 0 {
- // Maximum time to wait for a complete response
- this.Client.Timeout = 300 * time.Second
-
- // TCP and TLS connection settings
- this.Client.Transport = &http.Transport{
- Dial: (&net.Dialer{
- // The maximum time to wait to set up
- // the initial TCP connection.
- Timeout: 30 * time.Second,
-
- // The TCP keep alive heartbeat
- // interval.
- KeepAlive: 120 * time.Second,
- }).Dial,
-
- TLSHandshakeTimeout: 10 * time.Second,
- }
+func (*KeepClient) setClientSettingsNonDisk(client *http.Client) {
+ // Maximum time to wait for a complete response
+ client.Timeout = 300 * time.Second
+
+ // TCP and TLS connection settings
+ client.Transport = &http.Transport{
+ Dial: (&net.Dialer{
+ // The maximum time to wait to set up
+ // the initial TCP connection.
+ Timeout: 30 * time.Second,
+
+ // The TCP keep alive heartbeat
+ // interval.
+ KeepAlive: 120 * time.Second,
+ }).Dial,
+
+ TLSHandshakeTimeout: 10 * time.Second,
}
}
// Set timeouts applicable when connecting to keepstore services directly
// (assumed to be on the local network).
-func (this *KeepClient) setClientSettingsDisk() {
- if this.Client.Timeout == 0 {
- // Maximum time to wait for a complete response
- this.Client.Timeout = 20 * time.Second
-
- // TCP and TLS connection timeouts
- this.Client.Transport = &http.Transport{
- Dial: (&net.Dialer{
- // The maximum time to wait to set up
- // the initial TCP connection.
- Timeout: 2 * time.Second,
-
- // The TCP keep alive heartbeat
- // interval.
- KeepAlive: 180 * time.Second,
- }).Dial,
-
- TLSHandshakeTimeout: 4 * time.Second,
- }
+func (*KeepClient) setClientSettingsDisk(client *http.Client) {
+ // Maximum time to wait for a complete response
+ client.Timeout = 20 * time.Second
+
+ // TCP and TLS connection timeouts
+ client.Transport = &http.Transport{
+ Dial: (&net.Dialer{
+ // The maximum time to wait to set up
+ // the initial TCP connection.
+ Timeout: 2 * time.Second,
+
+ // The TCP keep alive heartbeat
+ // interval.
+ KeepAlive: 180 * time.Second,
+ }).Dial,
+
+ TLSHandshakeTimeout: 4 * time.Second,
}
}
DebugPrintf("DEBUG: [%08x] Upload %v success", requestID, url)
upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response}
} else {
+ if resp.StatusCode >= 300 && response == "" {
+ response = resp.Status
+ }
DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, resp.StatusCode, response)
upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
}
retriesRemaining := 1 + this.Retries
var retryServers []string
+ lastError := make(map[string]string)
+
for retriesRemaining > 0 {
retriesRemaining -= 1
next_server = 0
active += 1
} else {
if active == 0 && retriesRemaining == 0 {
- return locator, replicasDone, InsufficientReplicasError
+ msg := "Could not write sufficient replicas: "
+ for _, resp := range lastError {
+ msg += resp + "; "
+ }
+ msg = msg[:len(msg)-2]
+ return locator, replicasDone, InsufficientReplicasError(errors.New(msg))
} else {
break
}
replicasDone += status.replicas_stored
replicasTodo -= status.replicas_stored
locator = status.response
- } else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+ delete(lastError, status.url)
+ } else {
+ msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
+ if len(msg) > 100 {
+ msg = msg[:100]
+ }
+ lastError[status.url] = msg
+ }
+
+ if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
(status.statusCode >= 500 && status.statusCode != 503) {
// Timeout, too many requests, or other server side failure
// Do not retry when status code is 503, which means the keep server is full
env = os.environ.copy()
env['RAILS_ENV'] = 'test'
env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
- if env.get('ARVADOS_TEST_EXPERIMENTAL_WS'):
- env.pop('ARVADOS_WEBSOCKETS', None)
- else:
- env['ARVADOS_WEBSOCKETS'] = 'yes'
+ env.pop('ARVADOS_WEBSOCKETS', None)
env.pop('ARVADOS_TEST_API_HOST', None)
env.pop('ARVADOS_API_HOST', None)
env.pop('ARVADOS_API_HOST_INSECURE', None)
inflect.irregular 'human', 'humans'
end
-module Kernel
- def suppress_warnings
- original_verbosity = $VERBOSE
- $VERBOSE = nil
- result = yield
- $VERBOSE = original_verbosity
- return result
- end
-end
-
class Arvados
-
class TransactionFailedError < StandardError
end
@attributes = j
end
end
+
+ protected
+
+ def suppress_warnings
+ original_verbosity = $VERBOSE
+ begin
+ $VERBOSE = nil
+ yield
+ ensure
+ $VERBOSE = original_verbosity
+ end
+ end
end
def self.parse(tok)
begin
Locator.parse!(tok)
- rescue ArgumentError => e
+ rescue ArgumentError
nil
end
end
stream_name = nil
block_tokens = []
file_tokens = []
- line.scan /\S+/ do |token|
+ line.scan(/\S+/) do |token|
if stream_name.nil?
stream_name = unescape token
elsif file_tokens.empty? and Locator.valid? token
@text.each_line do |line|
stream_name = nil
in_file_tokens = false
- line.scan /\S+/ do |token|
+ line.scan(/\S+/) do |token|
if stream_name.nil?
stream_name = unescape token
elsif in_file_tokens or not Locator.valid? token
/db/*.sqlite3
# Ignore all logfiles and tempfiles.
-/log/*.log
-/log/*.log.gz
+/log
/tmp
# Sensitive files and local configuration
source 'https://rubygems.org'
-gem 'rails', '~> 3.2'
-
-# Bundle edge Rails instead:
-# gem 'rails', :git => 'git://github.com/rails/rails.git'
+gem 'rails', '~> 4.0'
+gem 'responders', '~> 2.0'
+gem 'protected_attributes'
group :test, :development do
gem 'factory_girl_rails'
# pg is the only supported database driver.
gem 'pg'
-# Start using multi_json once we are on Rails 3.2;
-# Rails 3.1 has a dependency on multi_json < 1.3.0 but we need version 1.3.4 to
-# fix bug https://github.com/collectiveidea/json_spec/issues/27
gem 'multi_json'
gem 'oj'
+gem 'oj_mimic_json'
-# Gems used only for assets and not required
-# in production environments by default.
-group :assets do
- gem 'sass-rails', '~> 3.2'
- gem 'coffee-rails', '~> 3.2'
-
- # See https://github.com/sstephenson/execjs#readme for more supported runtimes
- gem 'therubyracer'
-
- gem 'uglifier', '~> 2.0'
-end
+# for building assets
+gem 'sass-rails', '~> 4.0'
+gem 'coffee-rails', '~> 4.0'
+gem 'therubyracer'
+gem 'uglifier', '~> 2.0'
gem 'jquery-rails'
-# To use ActiveModel has_secure_password
-# gem 'bcrypt-ruby', '~> 3.0.0'
-
-# Use unicorn as the web server
-# gem 'unicorn'
-
-# Deploy with Capistrano
-# gem 'capistrano'
-
-# To use debugger
-# gem 'ruby-debug'
-
gem 'rvm-capistrano', :group => :test
gem 'acts_as_api'
gem 'passenger'
-gem 'omniauth', '~> 1.1'
+# Restricted because omniauth >= 1.5.0 requires Ruby >= 2.1.9:
+gem 'omniauth', '~> 1.4.0'
gem 'omniauth-oauth2', '~> 1.1'
gem 'andand'
gem 'trollop'
gem 'faye-websocket'
-gem 'themes_for_rails'
+gem 'themes_for_rails', git: 'https://github.com/curoverse/themes_for_rails'
gem 'arvados', '>= 0.1.20150615153458'
gem 'arvados-cli', '>= 0.1.20161017193526'
-# pg_power lets us use partial indexes in schema.rb in Rails 3
-gem 'pg_power'
-
gem 'puma', '~> 2.0'
gem 'sshkey'
gem 'safe_yaml'
gem 'lograge'
gem 'logstash-event'
+gem 'rails-observers'
+
# Install any plugin gems
-Dir.glob(File.join(File.dirname(__FILE__), 'lib', '**', "Gemfile")) do |gemfile|
- eval(IO.read(gemfile), binding)
+Dir.glob(File.join(File.dirname(__FILE__), 'lib', '**', "Gemfile")) do |f|
+ eval(IO.read(f), binding)
end
+GIT
+ remote: https://github.com/curoverse/themes_for_rails
+ revision: 61154877047d2346890bda0b7be5827cf51a6a76
+ specs:
+ themes_for_rails (0.5.1)
+ rails (>= 3.0.0)
+
GEM
remote: https://rubygems.org/
specs:
- actionmailer (3.2.22.5)
- actionpack (= 3.2.22.5)
- mail (~> 2.5.4)
- actionpack (3.2.22.5)
- activemodel (= 3.2.22.5)
- activesupport (= 3.2.22.5)
- builder (~> 3.0.0)
+ actionmailer (4.2.5.2)
+ actionpack (= 4.2.5.2)
+ actionview (= 4.2.5.2)
+ activejob (= 4.2.5.2)
+ mail (~> 2.5, >= 2.5.4)
+ rails-dom-testing (~> 1.0, >= 1.0.5)
+ actionpack (4.2.5.2)
+ actionview (= 4.2.5.2)
+ activesupport (= 4.2.5.2)
+ rack (~> 1.6)
+ rack-test (~> 0.6.2)
+ rails-dom-testing (~> 1.0, >= 1.0.5)
+ rails-html-sanitizer (~> 1.0, >= 1.0.2)
+ actionview (4.2.5.2)
+ activesupport (= 4.2.5.2)
+ builder (~> 3.1)
erubis (~> 2.7.0)
- journey (~> 1.0.4)
- rack (~> 1.4.5)
- rack-cache (~> 1.2)
- rack-test (~> 0.6.1)
- sprockets (~> 2.2.1)
- activemodel (3.2.22.5)
- activesupport (= 3.2.22.5)
- builder (~> 3.0.0)
- activerecord (3.2.22.5)
- activemodel (= 3.2.22.5)
- activesupport (= 3.2.22.5)
- arel (~> 3.0.2)
- tzinfo (~> 0.3.29)
- activeresource (3.2.22.5)
- activemodel (= 3.2.22.5)
- activesupport (= 3.2.22.5)
- activesupport (3.2.22.5)
- i18n (~> 0.6, >= 0.6.4)
- multi_json (~> 1.0)
- acts_as_api (0.4.3)
+ rails-dom-testing (~> 1.0, >= 1.0.5)
+ rails-html-sanitizer (~> 1.0, >= 1.0.2)
+ activejob (4.2.5.2)
+ activesupport (= 4.2.5.2)
+ globalid (>= 0.3.0)
+ activemodel (4.2.5.2)
+ activesupport (= 4.2.5.2)
+ builder (~> 3.1)
+ activerecord (4.2.5.2)
+ activemodel (= 4.2.5.2)
+ activesupport (= 4.2.5.2)
+ arel (~> 6.0)
+ activesupport (4.2.5.2)
+ i18n (~> 0.7)
+ json (~> 1.7, >= 1.7.7)
+ minitest (~> 5.1)
+ thread_safe (~> 0.3, >= 0.3.4)
+ tzinfo (~> 1.1)
+ acts_as_api (1.0.0)
activemodel (>= 3.0.0)
activesupport (>= 3.0.0)
rack (>= 1.1.0)
- addressable (2.4.0)
+ addressable (2.5.0)
+ public_suffix (~> 2.0, >= 2.0.2)
andand (1.3.3)
- arel (3.0.3)
- arvados (0.1.20160513152536)
+ arel (6.0.4)
+ arvados (0.1.20170215224121)
activesupport (>= 3, < 4.2.6)
andand (~> 1.3, >= 1.3.3)
google-api-client (>= 0.7, < 0.8.9)
i18n (~> 0)
json (~> 1.7, >= 1.7.7)
jwt (>= 0.1.5, < 2)
- arvados-cli (0.1.20161017193526)
- activesupport (~> 3.2, >= 3.2.13)
+ arvados-cli (0.1.20170322173355)
+ activesupport (>= 3.2.13, < 5)
andand (~> 1.3, >= 1.3.3)
arvados (~> 0.1, >= 0.1.20150128223554)
curb (~> 0.8)
addressable (>= 2.3.1)
extlib (>= 0.9.15)
multi_json (>= 1.0.0)
- builder (3.0.4)
+ builder (3.2.3)
capistrano (2.15.9)
highline
net-scp (>= 1.0.0)
net-sftp (>= 2.0.0)
net-ssh (>= 2.0.14)
net-ssh-gateway (>= 1.1.0)
- coffee-rails (3.2.2)
+ coffee-rails (4.2.1)
coffee-script (>= 2.2.0)
- railties (~> 3.2.0)
+ railties (>= 4.0.0, < 5.2.x)
coffee-script (2.4.1)
coffee-script-source
execjs
- coffee-script-source (1.10.0)
+ coffee-script-source (1.12.2)
curb (0.9.3)
database_cleaner (1.5.3)
erubis (2.7.0)
- eventmachine (1.2.0.1)
+ eventmachine (1.2.3)
execjs (2.7.0)
extlib (0.9.16)
- factory_girl (4.7.0)
+ factory_girl (4.8.0)
activesupport (>= 3.0.0)
- factory_girl_rails (4.7.0)
- factory_girl (~> 4.7.0)
+ factory_girl_rails (4.8.0)
+ factory_girl (~> 4.8.0)
railties (>= 3.0.0)
- faraday (0.9.2)
+ faraday (0.11.0)
multipart-post (>= 1.2, < 3)
- faye-websocket (0.10.4)
+ faye-websocket (0.10.7)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
+ globalid (0.3.7)
+ activesupport (>= 4.1.0)
google-api-client (0.8.7)
activesupport (>= 3.2, < 5.0)
addressable (~> 2.3)
multi_json (~> 1.11)
os (~> 0.9)
signet (~> 0.7)
- hashie (3.4.6)
+ hashie (3.5.5)
highline (1.7.8)
hike (1.2.3)
- i18n (0.7.0)
- journey (1.0.4)
- jquery-rails (3.1.4)
- railties (>= 3.0, < 5.0)
+ i18n (0.8.1)
+ jquery-rails (4.2.2)
+ rails-dom-testing (>= 1, < 3)
+ railties (>= 4.2.0)
thor (>= 0.14, < 2.0)
- json (1.8.3)
+ json (1.8.6)
jwt (1.5.6)
launchy (2.4.3)
addressable (~> 2.3)
- libv8 (3.16.14.15)
+ libv8 (3.16.14.19)
little-plugger (1.1.4)
- logging (2.1.0)
+ logging (2.2.0)
little-plugger (~> 1.1)
multi_json (~> 1.10)
- lograge (0.3.6)
- actionpack (>= 3)
- activesupport (>= 3)
- railties (>= 3)
+ lograge (0.4.1)
+ actionpack (>= 4, < 5.1)
+ activesupport (>= 4, < 5.1)
+ railties (>= 4, < 5.1)
logstash-event (1.2.02)
- mail (2.5.4)
- mime-types (~> 1.16)
- treetop (~> 1.4.8)
+ loofah (2.0.3)
+ nokogiri (>= 1.5.9)
+ mail (2.6.4)
+ mime-types (>= 1.16, < 4)
memoist (0.15.0)
metaclass (0.0.4)
- mime-types (1.25.1)
- mocha (1.2.0)
+ mime-types (3.1)
+ mime-types-data (~> 3.2015)
+ mime-types-data (3.2016.0521)
+ mini_portile2 (2.1.0)
+ minitest (5.10.1)
+ mocha (1.2.1)
metaclass (~> 0.0.1)
multi_json (1.12.1)
- multi_xml (0.5.5)
+ multi_xml (0.6.0)
multipart-post (2.0.0)
net-scp (1.2.1)
net-ssh (>= 2.6.5)
net-sftp (2.1.2)
net-ssh (>= 2.6.5)
- net-ssh (3.2.0)
- net-ssh-gateway (1.2.0)
- net-ssh (>= 2.6.5)
- oauth2 (1.2.0)
- faraday (>= 0.8, < 0.10)
+ net-ssh (4.1.0)
+ net-ssh-gateway (2.0.0)
+ net-ssh (>= 4.0.0)
+ nokogiri (1.7.1)
+ mini_portile2 (~> 2.1.0)
+ oauth2 (1.3.1)
+ faraday (>= 0.8, < 0.12)
jwt (~> 1.0)
multi_json (~> 1.3)
multi_xml (~> 0.5)
rack (>= 1.2, < 3)
- oj (2.15.0)
- omniauth (1.3.1)
+ oj (2.18.5)
+ oj_mimic_json (1.0.1)
+ omniauth (1.4.2)
hashie (>= 1.2, < 4)
rack (>= 1.0, < 3)
omniauth-oauth2 (1.4.0)
oauth2 (~> 1.0)
omniauth (~> 1.2)
os (0.9.6)
- passenger (5.0.30)
+ passenger (5.1.2)
rack
rake (>= 0.8.1)
- pg (0.19.0)
- pg_power (1.6.4)
- pg
- rails (~> 3.1)
- polyglot (0.3.5)
- power_assert (0.3.1)
+ pg (0.20.0)
+ power_assert (1.0.1)
+ protected_attributes (1.1.3)
+ activemodel (>= 4.0.1, < 5.0)
+ public_suffix (2.0.5)
puma (2.16.0)
- rack (1.4.7)
- rack-cache (1.6.1)
- rack (>= 0.4)
- rack-ssl (1.3.4)
- rack
+ rack (1.6.5)
rack-test (0.6.3)
rack (>= 1.0)
- rails (3.2.22.5)
- actionmailer (= 3.2.22.5)
- actionpack (= 3.2.22.5)
- activerecord (= 3.2.22.5)
- activeresource (= 3.2.22.5)
- activesupport (= 3.2.22.5)
- bundler (~> 1.0)
- railties (= 3.2.22.5)
- railties (3.2.22.5)
- actionpack (= 3.2.22.5)
- activesupport (= 3.2.22.5)
- rack-ssl (~> 1.3.2)
+ rails (4.2.5.2)
+ actionmailer (= 4.2.5.2)
+ actionpack (= 4.2.5.2)
+ actionview (= 4.2.5.2)
+ activejob (= 4.2.5.2)
+ activemodel (= 4.2.5.2)
+ activerecord (= 4.2.5.2)
+ activesupport (= 4.2.5.2)
+ bundler (>= 1.3.0, < 2.0)
+ railties (= 4.2.5.2)
+ sprockets-rails
+ rails-deprecated_sanitizer (1.0.3)
+ activesupport (>= 4.2.0.alpha)
+ rails-dom-testing (1.0.8)
+ activesupport (>= 4.2.0.beta, < 5.0)
+ nokogiri (~> 1.6)
+ rails-deprecated_sanitizer (>= 1.0.1)
+ rails-html-sanitizer (1.0.3)
+ loofah (~> 2.0)
+ rails-observers (0.1.2)
+ activemodel (~> 4.0)
+ railties (4.2.5.2)
+ actionpack (= 4.2.5.2)
+ activesupport (= 4.2.5.2)
rake (>= 0.8.7)
- rdoc (~> 3.4)
- thor (>= 0.14.6, < 2.0)
- rake (11.3.0)
- rdoc (3.12.2)
- json (~> 1.4)
+ thor (>= 0.18.1, < 2.0)
+ rake (12.0.0)
ref (2.0.0)
+ responders (2.3.0)
+ railties (>= 4.2.0, < 5.1)
retriable (1.4.1)
ruby-prof (0.16.2)
rvm-capistrano (1.5.6)
capistrano (~> 2.15.4)
safe_yaml (1.0.4)
- sass (3.4.22)
- sass-rails (3.2.6)
- railties (~> 3.2.0)
- sass (>= 3.1.10)
- tilt (~> 1.3)
+ sass (3.2.19)
+ sass-rails (4.0.5)
+ railties (>= 4.0.0, < 5.0)
+ sass (~> 3.2.2)
+ sprockets (~> 2.8, < 3.0)
+ sprockets-rails (~> 2.0)
signet (0.7.3)
addressable (~> 2.3)
faraday (~> 0.9)
simplecov-html (0.7.1)
simplecov-rcov (0.2.3)
simplecov (>= 0.4.1)
- sprockets (2.2.3)
+ sprockets (2.12.4)
hike (~> 1.2)
multi_json (~> 1.0)
rack (~> 1.0)
tilt (~> 1.1, != 1.3.0)
- sshkey (1.8.0)
- test-unit (3.2.1)
+ sprockets-rails (2.3.3)
+ actionpack (>= 3.0)
+ activesupport (>= 3.0)
+ sprockets (>= 2.8, < 4.0)
+ sshkey (1.9.0)
+ test-unit (3.2.3)
power_assert
test_after_commit (1.1.0)
activerecord (>= 3.2)
- themes_for_rails (0.5.1)
- rails (>= 3.0.0)
- therubyracer (0.12.2)
- libv8 (~> 3.16.14.0)
+ therubyracer (0.12.3)
+ libv8 (~> 3.16.14.15)
ref
- thor (0.19.1)
+ thor (0.19.4)
+ thread_safe (0.3.6)
tilt (1.4.1)
- treetop (1.4.15)
- polyglot
- polyglot (>= 0.3.1)
trollop (2.1.2)
- tzinfo (0.3.51)
+ tzinfo (1.2.2)
+ thread_safe (~> 0.1)
uglifier (2.7.2)
execjs (>= 0.3.0)
json (>= 1.8.0)
- websocket-driver (0.6.4)
+ websocket-driver (0.6.5)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.2)
andand
arvados (>= 0.1.20150615153458)
arvados-cli (>= 0.1.20161017193526)
- coffee-rails (~> 3.2)
+ coffee-rails (~> 4.0)
database_cleaner
factory_girl_rails
faye-websocket
mocha
multi_json
oj
- omniauth (~> 1.1)
+ oj_mimic_json
+ omniauth (~> 1.4.0)
omniauth-oauth2 (~> 1.1)
passenger
pg
- pg_power
+ protected_attributes
puma (~> 2.0)
- rails (~> 3.2)
+ rails (~> 4.0)
+ rails-observers
+ responders (~> 2.0)
ruby-prof
rvm-capistrano
safe_yaml
- sass-rails (~> 3.2)
+ sass-rails (~> 4.0)
simplecov (~> 0.7.1)
simplecov-rcov
sshkey
test-unit (~> 3.0)
test_after_commit
- themes_for_rails
+ themes_for_rails!
therubyracer
trollop
uglifier (~> 2.0)
BUNDLED WITH
- 1.13.6
+ 1.14.3
require File.expand_path('../config/application', __FILE__)
-begin
- ok = PgPower
-rescue
- abort "Hm, pg_power is missing. Make sure you use 'bundle exec rake ...'"
-end
-
Server::Application.load_tasks
namespace :test do
require 'load_param'
class ApplicationController < ActionController::Base
- include CurrentApiClient
include ThemesForRails::ActionController
+ include CurrentApiClient
include LoadParam
include DbCurrentTime
before_filter(:render_404_if_no_object,
except: [:index, :create] + ERROR_ACTIONS)
- theme :select_theme
+ theme Rails.configuration.arvados_theme
attr_writer :resource_attrs
end
def index
- @objects.uniq!(&:id) if @select.nil? or @select.include? "id"
+ if @select.nil? || @select.include?("id")
+ @objects = @objects.uniq(&:id)
+ end
if params[:eager] and params[:eager] != '0' and params[:eager] != 0 and params[:eager] != ''
@objects.each(&:eager_load_associations)
end
def remote_ip
# Caveat: this is highly dependent on the proxy setup. YMMV.
- if request.headers.has_key?('HTTP_X_REAL_IP') then
+ if request.headers.key?('HTTP_X_REAL_IP') then
# We're behind a reverse proxy
@remote_ip = request.headers['HTTP_X_REAL_IP']
else
end
super(*opts)
end
-
- def select_theme
- return Rails.configuration.arvados_theme
- end
end
include DbCurrentTime
def update
- if resource_attrs[:job_uuid]
- @object.job_readable = readable_job_uuids(resource_attrs[:job_uuid]).any?
+ if resource_attrs[:job_uuid].is_a? String
+ @object.job_readable = readable_job_uuids([resource_attrs[:job_uuid]]).any?
end
super
end
protected
- def readable_job_uuids(*uuids)
+ def readable_job_uuids(uuids)
Job.readable_by(*@read_users).select(:uuid).where(uuid: uuids).map(&:uuid)
end
end
crunchLogThrottleLines: Rails.application.config.crunch_log_throttle_lines,
crunchLimitLogBytesPerJob: Rails.application.config.crunch_limit_log_bytes_per_job,
crunchLogPartialLineThrottlePeriod: Rails.application.config.crunch_log_partial_line_throttle_period,
+ websocketUrl: Rails.application.config.websocket_address,
parameters: {
alt: {
type: "string",
resources: {}
}
- if Rails.application.config.websocket_address
- discovery[:websocketUrl] = Rails.application.config.websocket_address
- elsif ENV['ARVADOS_WEBSOCKETS']
- discovery[:websocketUrl] = root_url.sub(/^http/, 'ws') + "websocket"
- end
-
ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |k|
begin
ctl_class = "Arvados::V1::#{k.to_s.pluralize}Controller".constantize
# setup succeeded. send email to user
if params[:send_notification_email] == true || params[:send_notification_email] == 'true'
- UserNotifier.account_is_setup(@object).deliver
+ UserNotifier.account_is_setup(@object).deliver_now
end
send_json kind: "arvados#HashList", items: @response.as_api_response(nil)
# create_fixtures() is a no-op for cached fixture sets, so
# uncache them all.
- ActiveRecord::Fixtures.reset_cache
- ActiveRecord::Fixtures.
+ ActiveRecord::FixtureSet.reset_cache
+ ActiveRecord::FixtureSet.
create_fixtures(Rails.root.join('test', 'fixtures'), fixturesets)
# Dump cache of permissions etc.
# Stub: automatically register all new API clients
api_client_url_prefix = callback_url.match(%r{^.*?://[^/]+})[0] + '/'
act_as_system_user do
- @api_client = ApiClient.find_or_create_by_url_prefix api_client_url_prefix
+ @api_client = ApiClient.
+ find_or_create_by(url_prefix: api_client_url_prefix)
end
api_client_auth = ApiClientAuthorization.
end
def scopes_allow_request?(request)
- scopes_allow? [request.request_method, request.path].join(' ')
+ method = request.request_method
+ if method == 'HEAD'
+ (scopes_allow?(['HEAD', request.path].join(' ')) ||
+ scopes_allow?(['GET', request.path].join(' ')))
+ else
+ scopes_allow?([method, request.path].join(' '))
+ end
end
def logged_attributes
include DbCurrentTime
extend RecordFilters
- attr_protected :created_at
- attr_protected :modified_by_user_uuid
- attr_protected :modified_by_client_uuid
- attr_protected :modified_at
after_initialize :log_start_state
before_save :ensure_permission_to_save
before_save :ensure_owner_uuid_is_permitted
after_find :convert_serialized_symbols_to_strings
before_validation :normalize_collection_uuids
before_validation :set_default_owner
- validate :ensure_serialized_attribute_type
validate :ensure_valid_uuids
# Note: This only returns permission links. It does not account for
# permissions obtained via user.is_admin or
# user.uuid==object.owner_uuid.
has_many(:permissions,
+ ->{where(link_class: 'permission')},
foreign_key: :head_uuid,
class_name: 'Link',
- primary_key: :uuid,
- conditions: "link_class = 'permission'")
+ primary_key: :uuid)
class PermissionDeniedError < StandardError
def http_status
"#{current_api_base}/#{self.class.to_s.pluralize.underscore}/#{self.uuid}"
end
+ def self.permit_attribute_params raw_params
+ # strong_parameters does not provide security: permissions are
+ # implemented with before_save hooks.
+ #
+ # The following permit! is necessary even with
+ # "ActionController::Parameters.permit_all_parameters = true",
+ # because permit_all does not permit nested attributes.
+ if raw_params
+ serialized_attributes.each do |colname, coder|
+ param = raw_params[colname.to_sym]
+ if param.nil?
+ # ok
+ elsif !param.is_a?(coder.object_class)
+ raise ArgumentError.new("#{colname} parameter must be #{coder.object_class}, not #{param.class}")
+ elsif has_nonstring_keys?(param)
+ raise ArgumentError.new("#{colname} parameter cannot have non-string hash keys")
+ end
+ end
+ end
+ ActionController::Parameters.new(raw_params).permit!
+ end
+
+ def initialize raw_params={}, *args
+ super(self.class.permit_attribute_params(raw_params), *args)
+ end
+
+ def self.create raw_params={}, *args
+ super(permit_attribute_params(raw_params), *args)
+ end
+
+ def update_attributes raw_params={}, *args
+ super(self.class.permit_attribute_params(raw_params), *args)
+ end
+
def self.selectable_attributes(template=:user)
# Return an array of attribute name strings that can be selected
# in the given template.
def update_modified_by_fields
current_time = db_current_time
+ self.created_at = created_at_was || current_time
self.updated_at = current_time
self.owner_uuid ||= current_default_owner if self.respond_to? :owner_uuid=
self.modified_at = current_time
true
end
+ def self.has_nonstring_keys? x
+ if x.is_a? Hash
+ x.each do |k,v|
+ return true if !(k.is_a?(String) || k.is_a?(Symbol)) || has_nonstring_keys?(v)
+ end
+ elsif x.is_a? Array
+ x.each do |v|
+ return true if has_nonstring_keys?(v)
+ end
+ end
+ false
+ end
+
def self.has_symbols? x
if x.is_a? Hash
x.each do |k,v|
end
def self.where_serialized(colname, value)
- sorted = deep_sort_hash(value)
- where("#{colname.to_s} IN (?)", [sorted.to_yaml, SafeJSON.dump(sorted)])
+ if value.empty?
+ # rails4 stores as null, rails3 stored as serialized [] or {}
+ sql = "#{colname.to_s} is null or #{colname.to_s} IN (?)"
+ sorted = value
+ else
+ sql = "#{colname.to_s} IN (?)"
+ sorted = deep_sort_hash(value)
+ end
+ where(sql, [sorted.to_yaml, SafeJSON.dump(sorted)])
end
Serializer = {
}
def self.serialize(colname, type)
- super(colname, Serializer[type])
+ coder = Serializer[type]
+ @serialized_attributes ||= {}
+ @serialized_attributes[colname.to_s] = coder
+ super(colname, coder)
end
- def ensure_serialized_attribute_type
- # Specifying a type in the "serialize" declaration causes rails to
- # raise an exception if a different data type is retrieved from
- # the database during load(). The validation preventing such
- # crash-inducing records from being inserted in the database in
- # the first place seems to have been left as an exercise to the
- # developer.
- self.class.serialized_attributes.each do |colname, attr|
- if attr.object_class
- if self.attributes[colname].class != attr.object_class
- self.errors.add colname.to_sym, "must be a #{attr.object_class.to_s}, not a #{self.attributes[colname].class.to_s}"
- elsif self.class.has_symbols? attributes[colname]
- self.errors.add colname.to_sym, "must not contain symbols: #{attributes[colname].inspect}"
- end
- end
- end
+ def self.serialized_attributes
+ @serialized_attributes ||= {}
+ end
+
+ def serialized_attributes
+ self.class.serialized_attributes
end
def convert_serialized_symbols_to_strings
self.class.serialized_attributes.each do |colname, attr|
if self.class.has_symbols? attributes[colname]
attributes[colname] = self.class.recursive_stringify attributes[colname]
- self.send(colname + '=',
- self.class.recursive_stringify(attributes[colname]))
+ send(colname + '=',
+ self.class.recursive_stringify(attributes[colname]))
end
end
end
before_save :set_file_names
# Query only untrashed collections by default.
- default_scope where("is_trashed = false")
+ default_scope { where("is_trashed = false") }
api_accessible :user, extend: :common do |t|
t.add :name
t.add :properties
end
- def properties
- @properties ||= Hash.new
- super
- end
-
def head_kind
if k = ArvadosModel::resource_class_for_uuid(head_uuid)
k.kind
end
def send_notify
- connection.execute "NOTIFY logs, '#{self.id}'"
+ ActiveRecord::Base.connection.execute "NOTIFY logs, '#{self.id}'"
end
end
has_many :api_client_authorizations
validates(:username,
format: {
- with: /^[A-Za-z][A-Za-z0-9]*$/,
+ with: /\A[A-Za-z][A-Za-z0-9]*\z/,
message: "must begin with a letter and contain only alphanumerics",
},
uniqueness: true,
# Send admin notifications
def send_admin_notifications
- AdminNotifier.new_user(self).deliver
+ AdminNotifier.new_user(self).deliver_now
if not self.is_active then
- AdminNotifier.new_inactive_user(self).deliver
+ AdminNotifier.new_inactive_user(self).deliver_now
end
end
if self.prefs_changed?
if self.prefs_was.andand.empty? || !self.prefs_was.andand['profile']
profile_notification_address = Rails.configuration.user_profile_notification_address
- ProfileNotifier.profile_created(self, profile_notification_address).deliver if profile_notification_address
+ ProfileNotifier.profile_created(self, profile_notification_address).deliver_now if profile_notification_address
end
end
end
include KindAndEtag
include CommonApiTemplate
- has_many :login_permissions, :foreign_key => :head_uuid, :class_name => 'Link', :primary_key => :uuid, :conditions => "link_class = 'permission' and name = 'can_login'"
+ has_many(:login_permissions,
+ -> { where("link_class = 'permission' and name = 'can_login'") },
+ foreign_key: :head_uuid,
+ class_name: 'Link',
+ primary_key: :uuid)
api_accessible :user, extend: :common do |t|
t.add :hostname
# to log in.
workbench_address: false
- # The ARVADOS_WEBSOCKETS environment variable determines whether to
- # serve http, websockets, or both.
+ # Client-facing URI for websocket service. Nginx should be
+ # configured to proxy this URI to arvados-ws; see
+ # http://doc.arvados.org/install/install-ws.html
#
- # If ARVADOS_WEBSOCKETS="true", http and websockets are both served
- # from the same process.
+ # If websocket_address is false (which is the default), no websocket
+ # server will be advertised to clients. This configuration is not
+ # supported.
#
- # If ARVADOS_WEBSOCKETS="ws-only", only websockets is served.
- #
- # If ARVADOS_WEBSOCKETS="false" or not set at all, only http is
- # served. In this case, you should have a separate process serving
- # websockets, and the address of that service should be given here
- # as websocket_address.
- #
- # If websocket_address is false (which is the default), the
- # discovery document will tell clients to use the current server as
- # the websocket service, or (if the current server does not have
- # websockets enabled) not to use websockets at all.
- #
- # Example: Clients will connect to the specified endpoint.
- #websocket_address: wss://127.0.0.1:3333/websocket
- # Default: Clients will connect to this server if it's running
- # websockets, otherwise none at all.
+ # Example:
+ #websocket_address: wss://ws.zzzzz.arvadosapi.com/websocket
websocket_address: false
# Maximum number of websocket connections allowed
action_mailer.perform_deliveries: false
active_support.deprecation: :log
action_dispatch.best_standards_support: :builtin
- active_record.mass_assignment_sanitizer: :strict
active_record.auto_explain_threshold_in_seconds: 0.5
assets.compress: false
assets.debug: true
cache_classes: true
consider_all_requests_local: false
action_controller.perform_caching: true
- serve_static_assets: false
+ serve_static_files: false
assets.compress: true
assets.compile: false
assets.digest: true
test:
force_ssl: false
cache_classes: true
- serve_static_assets: true
+ serve_static_files: true
static_cache_control: public, max-age=3600
whiny_nils: true
consider_all_requests_local: true
action_controller.allow_forgery_protection: false
action_mailer.delivery_method: :test
active_support.deprecation: :stderr
- active_record.mass_assignment_sanitizer: :strict
uuid_prefix: zzzzz
sso_app_id: arvados-server
sso_app_secret: <%= rand(2**512).to_s(36) %>
workbench_address: https://localhost:3001/
git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
- websocket_address: <% if ENV['ARVADOS_TEST_EXPERIMENTAL_WS'] %>"wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"<% else %>false<% end %>
+ websocket_address: "wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"
trash_sweep_interval: -1
docker_image_formats: ["v1"]
require 'rails/all'
require 'digest'
+module Kernel
+ def suppress_warnings
+ verbose_orig = $VERBOSE
+ begin
+ $VERBOSE = nil
+ yield
+ ensure
+ $VERBOSE = verbose_orig
+ end
+ end
+end
+
if defined?(Bundler)
- # If you precompile assets before deploying to production, use this line
- Bundler.require(*Rails.groups(:assets => %w(development test)))
- # If you want your assets lazily compiled in production, use this line
- # Bundler.require(:default, :assets, Rails.env)
+ suppress_warnings do
+ # If you precompile assets before deploying to production, use this line
+ Bundler.require(*Rails.groups(:assets => %w(development test)))
+ # If you want your assets lazily compiled in production, use this line
+ # Bundler.require(:default, :assets, Rails.env)
+ end
end
module Server
# Configure sensitive parameters which will be filtered from the log file.
config.filter_parameters += [:password]
+ # Load entire application at startup.
+ config.eager_load = true
+
+ config.active_record.raise_in_transactional_callbacks = true
+
+ config.active_support.test_order = :sorted
+
I18n.enforce_available_locales = false
# Before using the filesystem backend for Rails.cache, check
# Only use best-standards-support built into browsers
config.action_dispatch.best_standards_support = :builtin
- # Raise exception on mass assignment protection for Active Record models
- config.active_record.mass_assignment_sanitizer = :strict
-
# Log the query plan for queries taking more than this (works
# with SQLite, MySQL, and PostgreSQL)
config.active_record.auto_explain_threshold_in_seconds = 0.5
config.action_controller.perform_caching = true
# Disable Rails's static asset server (Apache or nginx will already do this)
- config.serve_static_assets = false
+ config.serve_static_files = false
# Compress JavaScripts and CSS
config.assets.compress = true
config.cache_classes = true
# Configure static asset server for tests with Cache-Control for performance
- config.serve_static_assets = true
+ config.serve_static_files = true
config.static_cache_control = "public, max-age=3600"
# Log error messages when you accidentally call methods on nil
# Print deprecation notices to the stderr
config.active_support.deprecation = :stderr
- # Raise exception on mass assignment protection for Active Record models
- config.active_record.mass_assignment_sanitizer = :strict
-
# No need for SSL while testing
config.force_ssl = false
-require 'eventbus'
+if ENV['ARVADOS_WEBSOCKETS']
+ Server::Application.configure do
+ Rails.logger.error "Built-in websocket server is disabled. See note (2017-03-23, e8cc0d7) at https://dev.arvados.org/projects/arvados/wiki/Upgrading_to_master"
-# See application.yml for details about configuring the websocket service.
+ class EventBusRemoved
+ def overloaded?
+ false
+ end
+ def on_connect ws
+ ws.on :open do |e|
+ EM::Timer.new 1 do
+ ws.send(SafeJSON.dump({status: 501, message: "Server misconfigured? see http://doc.arvados.org/install/install-ws.html"}))
+ end
+ EM::Timer.new 3 do
+ ws.close
+ end
+ end
+ end
+ end
-Server::Application.configure do
- # Enables websockets if ARVADOS_WEBSOCKETS is defined with any value. If
- # ARVADOS_WEBSOCKETS=ws-only, server will only accept websocket connections
- # and return an error response for all other requests.
- if ENV['ARVADOS_WEBSOCKETS']
- config.middleware.insert_after ArvadosApiToken, RackSocket, {
- :handler => EventBus,
- :mount => "/websocket",
- :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
- }
- Rails.logger.info "Websockets #{ENV['ARVADOS_WEBSOCKETS']}, running at /websocket"
- else
- Rails.logger.info "Websockets disabled"
+ config.middleware.insert_after(ArvadosApiToken, RackSocket, {
+ handler: EventBusRemoved,
+ mount: "/websocket",
+ websocket_only: (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
+ })
end
end
EOS
end
+ config.secret_key_base = config.secret_token
end
class Request < Rack::Request
# This Rails method messes with valid JSON, for example turning the empty
# array [] into 'nil'. We don't want that, so turn it into a no-op.
+ remove_method :deep_munge
def deep_munge(hash)
hash
end
--- /dev/null
+ActionController::Parameters.permit_all_parameters = true
class ActiveSupport::TimeWithZone
+ remove_method :as_json
def as_json *args
strftime "%Y-%m-%dT%H:%M:%S.%NZ"
end
end
class Time
+ remove_method :as_json
def as_json *args
strftime "%Y-%m-%dT%H:%M:%S.%NZ"
end
Server::Application.routes.draw do
themes_for_rails
- # See http://guides.rubyonrails.org/routing.html
-
# OPTIONS requests are not allowed at routes that use cookies.
['/auth/*a', '/login', '/logout'].each do |nono|
- match nono, :to => 'user_sessions#cross_origin_forbidden', :via => 'OPTIONS'
+ match nono, to: 'user_sessions#cross_origin_forbidden', via: 'OPTIONS'
end
# OPTIONS at discovery and API paths get an empty response with CORS headers.
- match '/discovery/v1/*a', :to => 'static#empty', :via => 'OPTIONS'
- match '/arvados/v1/*a', :to => 'static#empty', :via => 'OPTIONS'
+ match '/discovery/v1/*a', to: 'static#empty', via: 'OPTIONS'
+ match '/arvados/v1/*a', to: 'static#empty', via: 'OPTIONS'
namespace :arvados do
namespace :v1 do
get 'logins', on: :member
get 'get_all_logins', on: :collection
end
- get '/permissions/:uuid', :to => 'links#get_permissions'
+ get '/permissions/:uuid', to: 'links#get_permissions'
end
end
end
# omniauth
- match '/auth/:provider/callback', :to => 'user_sessions#create'
- match '/auth/failure', :to => 'user_sessions#failure'
+ match '/auth/:provider/callback', to: 'user_sessions#create', via: [:get, :post]
+ match '/auth/failure', to: 'user_sessions#failure', via: [:get, :post]
# not handled by omniauth provider -> 403 with no CORS headers.
- get '/auth/*a', :to => 'user_sessions#cross_origin_forbidden'
+ get '/auth/*a', to: 'user_sessions#cross_origin_forbidden'
# Custom logout
- match '/login', :to => 'user_sessions#login'
- match '/logout', :to => 'user_sessions#logout'
+ match '/login', to: 'user_sessions#login', via: [:get, :post]
+ match '/logout', to: 'user_sessions#logout', via: [:get, :post]
- match '/discovery/v1/apis/arvados/v1/rest', :to => 'arvados/v1/schema#index'
+ match '/discovery/v1/apis/arvados/v1/rest', to: 'arvados/v1/schema#index', via: [:get, :post]
- match '/static/login_failure', :to => 'static#login_failure', :as => :login_failure
+ match '/static/login_failure', to: 'static#login_failure', as: :login_failure, via: [:get, :post]
# Send unroutable requests to an arbitrary controller
# (ends up at ApplicationController#render_not_found)
- match '*a', :to => 'static#render_not_found'
+ match '*a', to: 'static#render_not_found', via: [:get, :post, :put, :patch, :delete, :options]
- root :to => 'static#home'
+ root to: 'static#home'
end
--- /dev/null
+class SerializedColumnsAcceptNull < ActiveRecord::Migration
+ def change
+ change_column :api_client_authorizations, :scopes, :text, null: true, default: '["all"]'
+ end
+end
created_at timestamp without time zone NOT NULL,
updated_at timestamp without time zone NOT NULL,
default_owner_uuid character varying(255),
- scopes text DEFAULT '---
-- all
-'::text NOT NULL,
+ scopes text DEFAULT '["all"]'::text,
uuid character varying(255) NOT NULL
);
INSERT INTO schema_migrations (version) VALUES ('20170301225558');
+INSERT INTO schema_migrations (version) VALUES ('20170319063406');
+
INSERT INTO schema_migrations (version) VALUES ('20170328215436');
INSERT INTO schema_migrations (version) VALUES ('20170330012505');
INSERT INTO schema_migrations (version) VALUES ('20170419173712');
-INSERT INTO schema_migrations (version) VALUES ('20170419175801');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20170419175801');
+
base.has_many(t.to_sym,
foreign_key: :owner_uuid,
primary_key: :uuid,
- dependent: :restrict)
+ dependent: :restrict_with_exception)
end
# We need custom protection for changing an owner's primary
# key. (Apart from this restriction, admins are allowed to change
# need to create a token
if !api_client_auth
# Get (or create) trusted api client
- apiClient = ApiClient.find_or_create_by_url_prefix_and_is_trusted("ssh://root@localhost/", true)
+ apiClient = ApiClient.
+ find_or_create_by(url_prefix: "ssh://root@localhost/",
+ is_trusted: true)
# Check if there is an unexpired superuser token corresponding to this api client
api_client_auth =
+++ /dev/null
-# If any threads raise an unhandled exception, make them all die.
-# We trust a supervisor like runit to restart the server in this case.
-Thread.abort_on_exception = true
-
-require 'eventmachine'
-require 'faye/websocket'
-require 'load_param'
-require 'oj'
-require 'record_filters'
-require 'safe_json'
-require 'set'
-require 'thread'
-
-# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
-module Faye
- class WebSocket
- attr_accessor :user
- attr_accessor :last_log_id
- attr_accessor :filters
- attr_accessor :sent_ids
- attr_accessor :queue
- attr_accessor :frame_mtx
- end
-end
-
-module WebSocket
- class Driver
-
- class Server
- alias_method :_write, :write
-
- def write(data)
- # Most of the sending activity will be from the thread set up in
- # on_connect. However, there is also some automatic activity in the
- # form of ping/pong messages, so ensure that the write method used to
- # send one complete message to the underlying socket can only be
- # called by one thread at a time.
- self.frame_mtx.synchronize do
- _write(data)
- end
- end
- end
- end
-end
-
-# Store the filters supplied by the user that will be applied to the logs table
-# to determine which events to return to the listener.
-class Filter
- include LoadParam
-
- attr_accessor :filters
-
- def initialize p
- @params = p
- load_filters_param
- end
-
- def params
- @params
- end
-end
-
-# Manages websocket connections, accepts subscription messages and publishes
-# log table events.
-class EventBus
- include CurrentApiClient
- include RecordFilters
-
- # used in RecordFilters
- def model_class
- Log
- end
-
- # Initialize EventBus. Takes no parameters.
- def initialize
- @channel = EventMachine::Channel.new
- @mtx = Mutex.new
- @bgthread = false
- @connection_count = 0
- end
-
- def send_message(ws, obj)
- ws.send(SafeJSON.dump(obj))
- end
-
- # Push out any pending events to the connection +ws+
- # +notify_id+ the id of the most recent row in the log table, may be nil
- #
- # This accepts a websocket and a notify_id (this is the row id from Postgres
- # LISTEN/NOTIFY, it may be nil if called from somewhere else)
- #
- # It queries the database for log rows that are either
- # a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
- # b) if ws.last_log_id is nil, then it queries the row notify_id
- #
- # Regular Arvados permissions are applied using readable_by() and filters using record_filters().
- def push_events ws, notify_id
- begin
- # Must have at least one filter set up to receive events
- if ws.filters.length > 0
- # Start with log rows readable by user
- logs = Log.readable_by(ws.user)
-
- cond_id = nil
- cond_out = []
- param_out = []
-
- if not ws.last_log_id.nil?
- # We are catching up from some starting point.
- cond_id = "logs.id > ?"
- param_out << ws.last_log_id
- elsif not notify_id.nil?
- # Get next row being notified.
- cond_id = "logs.id = ?"
- param_out << notify_id
- else
- # No log id to start from, nothing to do, return
- return
- end
-
- # Now build filters provided by client
- ws.filters.each do |filter|
- ft = record_filters filter.filters, Log
- if ft[:cond_out].any?
- # Join the clauses within a single subscription filter with AND
- # so it is consistent with regular queries
- cond_out << "(#{ft[:cond_out].join ') AND ('})"
- param_out += ft[:param_out]
- end
- end
-
- # Add filters to query
- if cond_out.any?
- # Join subscriptions with OR
- logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
- else
- logs = logs.where(cond_id, *param_out)
- end
-
- # Execute query and actually send the matching log rows. Load
- # the full log records only when we're ready to send them,
- # though: otherwise, (1) postgres has to build the whole
- # result set and return it to us before we can send the first
- # event, and (2) we store lots of records in memory while
- # waiting to spool them out to the client. Both of these are
- # troublesome when log records are large (e.g., a collection
- # update contains both old and new manifest_text).
- #
- # Note: find_each implies order('id asc'), which is what we
- # want.
- logs.select('logs.id').find_each do |l|
- if not ws.sent_ids.include?(l.id)
- # only send if not a duplicate
- send_message(ws, Log.find(l.id).as_api_response)
- end
- if not ws.last_log_id.nil?
- # record ids only when sending "catchup" messages, not notifies
- ws.sent_ids << l.id
- end
- end
- ws.last_log_id = nil
- end
- rescue ArgumentError => e
- # There was some kind of user error.
- Rails.logger.warn "Error publishing event: #{$!}"
- send_message(ws, {status: 500, message: $!})
- ws.close
- rescue => e
- Rails.logger.warn "Error publishing event: #{$!}"
- Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
- send_message(ws, {status: 500, message: $!})
- ws.close
- # These exceptions typically indicate serious server trouble:
- # out of memory issues, database connection problems, etc. Go ahead and
- # crash; we expect that a supervisor service like runit will restart us.
- raise
- end
- end
-
- # Handle inbound subscribe or unsubscribe message.
- def handle_message ws, event
- begin
- begin
- # Parse event data as JSON
- p = SafeJSON.load(event.data).symbolize_keys
- filter = Filter.new(p)
- rescue Oj::Error => e
- send_message(ws, {status: 400, message: "malformed request"})
- return
- end
-
- if p[:method] == 'subscribe'
- # Handle subscribe event
-
- if p[:last_log_id]
- # Set or reset the last_log_id. The event bus only reports events
- # for rows that come after last_log_id.
- ws.last_log_id = p[:last_log_id].to_i
- # Reset sent_ids for consistency
- # (always re-deliver all matching messages following last_log_id)
- ws.sent_ids = Set.new
- end
-
- if ws.filters.length < Rails.configuration.websocket_max_filters
- # Add a filter. This gets the :filters field which is the same
- # format as used for regular index queries.
- ws.filters << filter
- send_message(ws, {status: 200, message: 'subscribe ok', filter: p})
-
- # Send any pending events
- push_events ws, nil
- else
- send_message(ws, {status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"})
- end
-
- elsif p[:method] == 'unsubscribe'
- # Handle unsubscribe event
-
- len = ws.filters.length
- ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
- if ws.filters.length < len
- send_message(ws, {status: 200, message: 'unsubscribe ok'})
- else
- send_message(ws, {status: 404, message: 'filter not found'})
- end
-
- else
- send_message(ws, {status: 400, message: "missing or unrecognized method"})
- end
- rescue => e
- Rails.logger.warn "Error handling message: #{$!}"
- Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
- send_message(ws, {status: 500, message: 'error'})
- ws.close
- end
- end
-
- def overloaded?
- @mtx.synchronize do
- @connection_count >= Rails.configuration.websocket_max_connections
- end
- end
-
- # Called by RackSocket when a new websocket connection has been established.
- def on_connect ws
- # Disconnect if no valid API token.
- # current_user is included from CurrentApiClient
- if not current_user
- send_message(ws, {status: 401, message: "Valid API token required"})
- # Wait for the handshake to complete before closing the
- # socket. Otherwise, nginx responds with HTTP 502 Bad gateway,
- # and the client never sees our real error message.
- ws.on :open do |event|
- ws.close
- end
- return
- end
-
- # Initialize our custom fields on the websocket connection object.
- ws.user = current_user
- ws.filters = []
- ws.last_log_id = nil
- ws.sent_ids = Set.new
- ws.queue = Queue.new
- ws.frame_mtx = Mutex.new
-
- @mtx.synchronize do
- @connection_count += 1
- end
-
- # Subscribe to internal postgres notifications through @channel and
- # forward them to the thread associated with the connection.
- sub = @channel.subscribe do |msg|
- if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
- send_message(ws, {status: 500, message: 'Notify backlog too long'})
- ws.close
- @channel.unsubscribe sub
- ws.queue.clear
- else
- ws.queue << [:notify, msg]
- end
- end
-
- # Set up callback for inbound message dispatch.
- ws.on :message do |event|
- ws.queue << [:message, event]
- end
-
- # Set up socket close callback
- ws.on :close do |event|
- @channel.unsubscribe sub
- ws.queue.clear
- ws.queue << [:close, nil]
- end
-
- # Spin off a new thread to handle sending events to the client. We need a
- # separate thread per connection so that a slow client doesn't interfere
- # with other clients.
- #
- # We don't want the loop in the request thread because on a TERM signal,
- # Puma waits for outstanding requests to complete, and long-lived websocket
- # connections may not complete in a timely manner.
- Thread.new do
- # Loop and react to socket events.
- begin
- loop do
- eventType, msg = ws.queue.pop
- if eventType == :message
- handle_message ws, msg
- elsif eventType == :notify
- push_events ws, msg
- elsif eventType == :close
- break
- end
- end
- ensure
- @mtx.synchronize do
- @connection_count -= 1
- end
- ActiveRecord::Base.connection.close
- end
- end
-
- # Start up thread to monitor the Postgres database, if none exists already.
- @mtx.synchronize do
- unless @bgthread
- @bgthread = true
- Thread.new do
- # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
- ActiveRecord::Base.connection_pool.with_connection do |connection|
- conn = connection.instance_variable_get(:@connection)
- begin
- conn.async_exec "LISTEN logs"
- while true
- # wait_for_notify will block until there is a change
- # notification from Postgres about the logs table, then push
- # the notification into the EventMachine channel. Each
- # websocket connection subscribes to the other end of the
- # channel and calls #push_events to actually dispatch the
- # events to the client.
- conn.wait_for_notify do |channel, pid, payload|
- @channel.push payload.to_i
- end
- end
- ensure
- # Don't want the connection to still be listening once we return
- # it to the pool - could result in weird behavior for the next
- # thread to check it out.
- conn.async_exec "UNLISTEN *"
- end
- end
- @bgthread = false
- end
- end
- end
-
- end
-end
base.validate :validate_uuid
base.before_create :assign_uuid
base.before_destroy :destroy_permission_links
- base.has_many :links_via_head, class_name: 'Link', foreign_key: :head_uuid, primary_key: :uuid, conditions: "not (link_class = 'permission')", dependent: :destroy
- base.has_many :links_via_tail, class_name: 'Link', foreign_key: :tail_uuid, primary_key: :uuid, conditions: "not (link_class = 'permission')", dependent: :destroy
+ base.has_many(:links_via_head,
+ -> { where("not (link_class = 'permission')") },
+ class_name: 'Link',
+ foreign_key: :head_uuid,
+ primary_key: :uuid,
+ dependent: :destroy)
+ base.has_many(:links_via_tail,
+ -> { where("not (link_class = 'permission')") },
+ class_name: 'Link',
+ foreign_key: :tail_uuid,
+ primary_key: :uuid,
+ dependent: :destroy)
end
module ClassMethods
# Any ordering columns must be selected when doing select,
# otherwise it is an SQL error, so filter out invaliding orderings.
@orders.select! { |o|
- col, dir = o.split
+ col, _ = o.split
# match select column against order array entry
@select.select { |s| col == "#{table_name}.#{s}" }.any?
}
require 'safe_json'
class Serializer
+ class TypeMismatch < ArgumentError
+ end
+
def self.dump(val)
+ if !val.is_a?(object_class)
+ raise TypeMismatch.new("cannot serialize #{val.class} as #{object_class}")
+ end
SafeJSON.dump(val)
end
desc "Remove old container log entries from the logs table"
task delete_old_container_logs: :environment do
- delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN containers ON logs.object_uuid = containers.uuid WHERE event_type IN ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat') AND containers.log IS NOT NULL AND containers.finished_at < '#{Rails.configuration.clean_container_log_rows_after.ago}')"
+ delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN containers ON logs.object_uuid = containers.uuid WHERE event_type IN ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat') AND containers.log IS NOT NULL AND clock_timestamp() - containers.finished_at > interval '#{Rails.configuration.clean_container_log_rows_after} seconds')"
ActiveRecord::Base.connection.execute(delete_sql)
end
namespace :db do
desc "Remove old job stderr entries from the logs table"
task delete_old_job_logs: :environment do
- delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN jobs ON logs.object_uuid = jobs.uuid WHERE event_type = 'stderr' AND jobs.log IS NOT NULL AND jobs.finished_at < '#{Rails.configuration.clean_job_log_rows_after.ago}')"
+ delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN jobs ON logs.object_uuid = jobs.uuid WHERE event_type = 'stderr' AND jobs.log IS NOT NULL AND clock_timestamp() - jobs.finished_at > interval '#{Rails.configuration.clean_job_log_rows_after} seconds')"
ActiveRecord::Base.connection.execute(delete_sql)
end
module WhitelistUpdate
def check_update_whitelist permitted_fields
attribute_names.each do |field|
- if not permitted_fields.include? field.to_sym and self.send((field.to_s + "_changed?").to_sym)
- errors.add field, "cannot be modified in this state"
+ if !permitted_fields.include?(field.to_sym) && really_changed(field)
+ errors.add field, "cannot be modified in this state (#{send(field+"_was").inspect}, #{send(field).inspect})"
end
end
end
+ def really_changed(attr)
+ return false if !send(attr+"_changed?")
+ old = send(attr+"_was")
+ new = send(attr)
+ if (old.nil? || old == [] || old == {}) && (new.nil? || new == [] || new == {})
+ false
+ else
+ old != new
+ end
+ end
+
def validate_state_change
if self.state_changed?
unless state_transitions[self.state_was].andand.include? self.state
assert_response :success
assert_not_nil assigns(:object)
resp = assigns(:object)
- assert_equal foo_collection[:portable_data_hash], resp['portable_data_hash']
- assert_signed_manifest resp['manifest_text']
+ assert_equal foo_collection[:portable_data_hash], resp[:portable_data_hash]
+ assert_signed_manifest resp[:manifest_text]
# The manifest in the response will have had permission hints added.
# Remove any permission hints in the response before comparing it to the source.
- stripped_manifest = resp['manifest_text'].gsub(/\+A[A-Za-z0-9@_-]+/, '')
+ stripped_manifest = resp[:manifest_text].gsub(/\+A[A-Za-z0-9@_-]+/, '')
assert_equal foo_collection[:manifest_text], stripped_manifest
end
end
setup do
@initial_link_count = Link.count
@vm_uuid = virtual_machines(:testvm).uuid
+ ActionMailer::Base.deliveries = []
end
test "activate a user after signing UA" do
get :logins, id: vm.uuid
assert_response :success
assert_equal 1, json_response['items'].length
- assert_equal nil, json_response['items'][0]['public_key']
- assert_equal nil, json_response['items'][0]['authorized_key_uuid']
+ assert_nil json_response['items'][0]['public_key']
+ assert_nil json_response['items'][0]['authorized_key_uuid']
assert_equal u.uuid, json_response['items'][0]['user_uuid']
assert_equal 'bobblogin', json_response['items'][0]['username']
end
begin
Rails.env = 'production'
Rails.application.reload_routes!
- assert_raises ActionController::RoutingError do
+ assert_raises ActionController::UrlGenerationError do
post :reset
end
ensure
require 'test_helper'
-class Arvados::V1::ApiTokensScopeTest < ActionController::IntegrationTest
+class ApiTokensScopeTest < ActionDispatch::IntegrationTest
fixtures :all
def v1_url(*parts)
- (['arvados', 'v1'] + parts).join('/')
+ (['', 'arvados', 'v1'] + parts).join('/')
end
test "user list token can only list users" do
assert_response 403
get(v1_url('specimens', specimens(:owned_by_active_user).uuid), *get_args)
assert_response :success
+ head(v1_url('specimens', specimens(:owned_by_active_user).uuid), *get_args)
+ assert_response :success
get(v1_url('specimens', specimens(:owned_by_spectator).uuid), *get_args)
assert_includes(403..404, @response.status)
end
require 'test_helper'
require 'helpers/git_test_helper'
-class CrunchDispatchTest < ActionDispatch::IntegrationTest
+class CrunchDispatchIntegrationTest < ActionDispatch::IntegrationTest
include GitTestHelper
fixtures :all
require 'test_helper'
class DatabaseResetTest < ActionDispatch::IntegrationTest
- self.use_transactional_fixtures = false
-
slow_test "reset fails when Rails.env != 'test'" do
rails_env_was = Rails.env
begin
# Generally, new routes should appear under /arvados/v1/. If
# they appear elsewhere, that might have been caused by default
# rails generator behavior that we don't want.
- assert_match(/^\/(|\*a|arvados\/v1\/.*|auth\/.*|login|logout|database\/reset|discovery\/.*|static\/.*|themes\/.*)(\(\.:format\))?$/,
+ assert_match(/^\/(|\*a|arvados\/v1\/.*|auth\/.*|login|logout|database\/reset|discovery\/.*|static\/.*|themes\/.*|assets)(\(\.:format\))?$/,
route.path.spec.to_s,
"Unexpected new route: #{route.path.spec}")
end
require 'test_helper'
-class PipelineTest < ActionDispatch::IntegrationTest
+class PipelineIntegrationTest < ActionDispatch::IntegrationTest
# These tests simulate the workflow of arv-run-pipeline-instance
# and other pipeline-running code.
require 'test_helper'
-class Arvados::V1::ReaderTokensTest < ActionController::IntegrationTest
+class ReaderTokensTest < ActionDispatch::IntegrationTest
fixtures :all
def spectator_specimen
+++ /dev/null
-require 'database_cleaner'
-require 'safe_json'
-require 'test_helper'
-
-DatabaseCleaner.strategy = :deletion
-
-class WebsocketTest < ActionDispatch::IntegrationTest
- self.use_transactional_fixtures = false
-
- setup do
- DatabaseCleaner.start
- end
-
- teardown do
- DatabaseCleaner.clean
- end
-
- def self.startup
- s = TCPServer.new('0.0.0.0', 0)
- @@port = s.addr[1]
- s.close
- @@pidfile = "tmp/pids/passenger.#{@@port}.pid"
- DatabaseCleaner.start
- Dir.chdir(Rails.root) do |apidir|
- # Only passenger seems to be able to run the websockets server
- # successfully.
- _system('passenger', 'start', '-d',
- "-p#{@@port}",
- "--log-file", "/dev/stderr",
- "--pid-file", @@pidfile)
- timeout = Time.now.tv_sec + 10
- begin
- sleep 0.2
- begin
- server_pid = IO.read(@@pidfile).to_i
- good_pid = (server_pid > 0) and (Process.kill(0, pid) rescue false)
- rescue Errno::ENOENT
- good_pid = false
- end
- end while (not good_pid) and (Time.now.tv_sec < timeout)
- if not good_pid
- raise RuntimeError, "could not find API server Rails pid"
- end
- STDERR.puts "Started websocket server on port #{@@port} with pid #{server_pid}"
- end
- end
-
- def self.shutdown
- Dir.chdir(Rails.root) do
- _system('passenger', 'stop', "-p#{@@port}",
- "--pid-file", @@pidfile)
- end
- # DatabaseCleaner leaves the database empty. Prefer to leave it full.
- dc = DatabaseController.new
- dc.define_singleton_method :render do |*args| end
- dc.reset
- end
-
- def self._system(*cmd)
- Bundler.with_clean_env do
- env = {
- 'ARVADOS_WEBSOCKETS' => 'ws-only',
- 'RAILS_ENV' => 'test',
- }
- if not system(env, *cmd)
- raise RuntimeError, "Command exited #{$?}: #{cmd.inspect}"
- end
- end
- end
-
- def ws_helper(token: nil, timeout: 8)
- opened = false
- close_status = nil
- too_long = false
-
- EM.run do
- if token
- ws = Faye::WebSocket::Client.new("ws://localhost:#{@@port}/websocket?api_token=#{api_client_authorizations(token).api_token}")
- else
- ws = Faye::WebSocket::Client.new("ws://localhost:#{@@port}/websocket")
- end
-
- ws.on :open do |event|
- opened = true
- if timeout
- EM::Timer.new(timeout) do
- too_long = true if close_status.nil?
- EM.stop_event_loop
- end
- end
- end
-
- ws.on :error do |event|
- STDERR.puts "websocket client error: #{event.inspect}"
- end
-
- ws.on :close do |event|
- close_status = [:close, event.code, event.reason]
- EM.stop_event_loop
- end
-
- yield ws
- end
-
- assert opened, "Should have opened web socket"
- assert (not too_long), "Test took too long"
- assert_equal 1000, close_status[1], "Connection closed unexpectedly (check log for errors)"
- end
-
- test "connect with no token" do
- status = nil
-
- ws_helper do |ws|
- ws.on :message do |event|
- d = SafeJSON.load event.data
- status = d["status"]
- ws.close
- end
- end
-
- assert_equal 401, status
- end
-
- test "connect, subscribe and get response" do
- status = nil
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe'}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- status = d["status"]
- ws.close
- end
- end
-
- assert_equal 200, status
- end
-
- def subscribe_test
- state = 1
- spec = nil
- ev_uuid = nil
-
- authorize_with :active
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe'}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- spec = Specimen.create
- state = 2
- when 2
- ev_uuid = d["object_uuid"]
- ws.close
- end
- end
-
- end
-
- assert_not_nil spec
- assert_equal spec.uuid, ev_uuid
- end
-
- test "connect, subscribe, get event" do
- subscribe_test()
- end
-
- test "connect, subscribe, get two events" do
- state = 1
- spec = nil
- human = nil
- spec_ev_uuid = nil
- human_ev_uuid = nil
-
- authorize_with :active
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe'}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- spec = Specimen.create
- human = Human.create
- state = 2
- when 2
- spec_ev_uuid = d["object_uuid"]
- state = 3
- when 3
- human_ev_uuid = d["object_uuid"]
- state = 4
- ws.close
- when 4
- assert false, "Should not get any more events"
- end
- end
-
- end
-
- assert_not_nil spec
- assert_not_nil human
- assert_equal spec.uuid, spec_ev_uuid
- assert_equal human.uuid, human_ev_uuid
- end
-
- test "connect, subscribe, filter events" do
- state = 1
- human = nil
- human_ev_uuid = nil
-
- authorize_with :active
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- Specimen.create
- human = Human.create
- state = 2
- when 2
- human_ev_uuid = d["object_uuid"]
- state = 3
- ws.close
- when 3
- assert false, "Should not get any more events"
- end
- end
-
- end
-
- assert_not_nil human
- assert_equal human.uuid, human_ev_uuid
- end
-
-
- test "connect, subscribe, multiple filters" do
- state = 1
- spec = nil
- human = nil
- spec_ev_uuid = nil
- human_ev_uuid = nil
-
- authorize_with :active
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
- ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#specimen']]}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- state = 2
- when 2
- assert_equal 200, d["status"]
- spec = Specimen.create
- Trait.create # not part of filters, should not be received
- human = Human.create
- state = 3
- when 3
- spec_ev_uuid = d["object_uuid"]
- state = 4
- when 4
- human_ev_uuid = d["object_uuid"]
- state = 5
- ws.close
- when 5
- assert false, "Should not get any more events"
- end
- end
-
- end
-
- assert_not_nil spec
- assert_not_nil human
- assert_equal spec.uuid, spec_ev_uuid
- assert_equal human.uuid, human_ev_uuid
- end
-
-
- test "connect, subscribe, compound filter" do
- state = 1
- t1 = nil
-
- authorize_with :active
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#trait'], ['event_type', '=', 'update']]}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- t1 = Trait.create("name" => "foo")
- t1.name = "bar"
- t1.save!
- state = 2
- when 2
- assert_equal 'update', d['event_type']
- state = 3
- ws.close
- when 3
- assert false, "Should not get any more events"
- end
- end
-
- end
-
- assert_equal 3, state
- assert_not_nil t1
- end
-
- test "connect, subscribe, ask events starting at seq num" do
- state = 1
-
- authorize_with :active
-
- lastid = logs(:admin_changes_specimen).id
- l1 = nil
- l2 = nil
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe', last_log_id: lastid}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- state = 2
- when 2
- l1 = d["object_uuid"]
- assert_not_nil l1, "Unexpected message: #{d}"
- state = 3
- when 3
- l2 = d["object_uuid"]
- assert_not_nil l2, "Unexpected message: #{d}"
- state = 4
- ws.close
- when 4
- assert false, "Should not get any more events"
- end
- end
- end
-
- expect_next_logs = Log.where('id > ?', lastid).order('id asc')
- assert_equal expect_next_logs[0].object_uuid, l1
- assert_equal expect_next_logs[1].object_uuid, l2
- end
-
- slow_test "connect, subscribe, get event, unsubscribe" do
- state = 1
- spec = nil
- spec_ev_uuid = nil
-
- authorize_with :active
-
- ws_helper(token: :active, timeout: false) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe'}.to_json)
- EM::Timer.new 3 do
- # Set a time limit on the test because after unsubscribing the server
- # still has to process the next event (and then hopefully correctly
- # decides not to send it because we unsubscribed.)
- ws.close
- end
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- spec = Specimen.create
- state = 2
- when 2
- spec_ev_uuid = d["object_uuid"]
- ws.send ({method: 'unsubscribe'}.to_json)
-
- EM::Timer.new 1 do
- Specimen.create
- end
-
- state = 3
- when 3
- assert_equal 200, d["status"]
- state = 4
- when 4
- assert false, "Should not get any more events"
- end
- end
-
- end
-
- assert_not_nil spec
- assert_equal spec.uuid, spec_ev_uuid
- end
-
- slow_test "connect, subscribe, get event, unsubscribe with filter" do
- state = 1
- spec = nil
- spec_ev_uuid = nil
-
- authorize_with :active
-
- ws_helper(token: :active, timeout: false) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
- EM::Timer.new 6 do
- # Set a time limit on the test because after unsubscribing the server
- # still has to process the next event (and then hopefully correctly
- # decides not to send it because we unsubscribed.)
- ws.close
- end
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- spec = Human.create
- state = 2
- when 2
- spec_ev_uuid = d["object_uuid"]
- ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
-
- EM::Timer.new 1 do
- Human.create
- end
-
- state = 3
- when 3
- assert_equal 200, d["status"]
- state = 4
- when 4
- assert false, "Should not get any more events"
- end
- end
-
- end
-
- assert_not_nil spec
- assert_equal spec.uuid, spec_ev_uuid
- end
-
-
- slow_test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
- state = 1
- spec = nil
- spec_ev_uuid = nil
- human = nil
- human_ev_uuid = nil
-
- authorize_with :active
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe'}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- spec = Specimen.create
- state = 2
- when 2
- spec_ev_uuid = d["object_uuid"]
- ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
-
- EM::Timer.new 1 do
- human = Human.create
- end
-
- state = 3
- when 3
- assert_equal 404, d["status"]
- state = 4
- when 4
- human_ev_uuid = d["object_uuid"]
- state = 5
- ws.close
- when 5
- assert false, "Should not get any more events"
- end
- end
-
- end
-
- assert_not_nil spec
- assert_not_nil human
- assert_equal spec.uuid, spec_ev_uuid
- assert_equal human.uuid, human_ev_uuid
- end
-
- slow_test "connected, not subscribed, no event" do
- authorize_with :active
-
- ws_helper(token: :active, timeout: false) do |ws|
- ws.on :open do |event|
- EM::Timer.new 1 do
- Specimen.create
- end
-
- EM::Timer.new 3 do
- ws.close
- end
- end
-
- ws.on :message do |event|
- assert false, "Should not get any messages, message was #{event.data}"
- end
- end
- end
-
- slow_test "connected, not authorized to see event" do
- state = 1
-
- authorize_with :admin
-
- ws_helper(token: :active, timeout: false) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'subscribe'}.to_json)
-
- EM::Timer.new 3 do
- ws.close
- end
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- Specimen.create
- state = 2
- when 2
- assert false, "Should not get any messages, message was #{event.data}"
- end
- end
-
- end
-
- end
-
- test "connect, try bogus method" do
- status = nil
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({method: 'frobnabble'}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- status = d["status"]
- ws.close
- end
- end
-
- assert_equal 400, status
- end
-
- test "connect, missing method" do
- status = nil
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send ({fizzbuzz: 'frobnabble'}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- status = d["status"]
- ws.close
- end
- end
-
- assert_equal 400, status
- end
-
- test "connect, send malformed request" do
- status = nil
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- ws.send '<XML4EVER></XML4EVER>'
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- status = d["status"]
- ws.close
- end
- end
-
- assert_equal 400, status
- end
-
-
- test "connect, try subscribe too many filters" do
- state = 1
-
- authorize_with :active
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- (1..17).each do |i|
- ws.send ({method: 'subscribe', filters: [['object_uuid', '=', i]]}.to_json)
- end
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when (1..Rails.configuration.websocket_max_filters)
- assert_equal 200, d["status"]
- state += 1
- when (Rails.configuration.websocket_max_filters+1)
- assert_equal 403, d["status"]
- ws.close
- end
- end
-
- end
-
- assert_equal Rails.configuration.websocket_max_filters+1, state
-
- end
-
- slow_test "connect, subscribe, lots of events" do
- state = 1
- event_count = 0
- log_start = Log.order(:id).last.id
-
- authorize_with :active
-
- ws_helper(token: :active, timeout: false) do |ws|
- EM::Timer.new 45 do
- # Needs a longer timeout than the default
- ws.close
- end
-
- ws.on :open do |event|
- ws.send ({method: 'subscribe'}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- ActiveRecord::Base.transaction do
- (1..202).each do
- Specimen.create
- end
- end
- state = 2
- when 2
- event_count += 1
- assert_equal d['id'], event_count+log_start
- if event_count == 202
- ws.close
- end
- end
- end
-
- end
-
- assert_equal 202, event_count
- end
-
-
- test "connect, subscribe with invalid filter" do
- state = 1
-
- authorize_with :active
-
- ws_helper(token: :active) do |ws|
- ws.on :open do |event|
- # test that #6451 is fixed (invalid filter crashes websockets)
- ws.send ({method: 'subscribe', filters: [['object_blarg', 'is_a', 'arvados#human']]}.to_json)
- end
-
- ws.on :message do |event|
- d = SafeJSON.load event.data
- case state
- when 1
- assert_equal 200, d["status"]
- Specimen.create
- Human.create
- state = 2
- when 2
- assert_equal 500, d["status"]
- state = 3
- ws.close
- when 3
- assert false, "Should not get any more events"
- end
- end
-
- end
-
- assert_equal 3, state
-
- # Try connecting again, ensure that websockets server is still running and
- # didn't crash per #6451
- subscribe_test()
-
- end
-
-
-end
ENV["RAILS_ENV"] = "test"
unless ENV["NO_COVERAGE_TEST"]
begin
- require 'simplecov'
- require 'simplecov-rcov'
+ verbose_orig = $VERBOSE
+ begin
+ $VERBOSE = nil
+ require 'simplecov'
+ require 'simplecov-rcov'
+ ensure
+ $VERBOSE = verbose_orig
+ end
class SimpleCov::Formatter::MergedFormatter
def format(result)
SimpleCov::Formatter::HTMLFormatter.new.format(result)
require File.expand_path('../../config/environment', __FILE__)
require 'rails/test_help'
require 'mocha'
+require 'mocha/mini_test'
module ArvadosTestSupport
def json_response
include ArvadosTestSupport
include CurrentApiClient
- setup do
- Rails.logger.warn "\n\n#{'=' * 70}\n#{self.class}\##{method_name}\n#{'-' * 70}\n\n"
- end
-
teardown do
Thread.current[:api_client_ip_address] = nil
Thread.current[:api_client_authorization] = nil
Thread.current[:api_client] = nil
Thread.current[:user] = nil
restore_configuration
+ User.invalidate_permissions_cache
+ end
+
+ def assert_equal(expect, *args)
+ if expect.nil?
+ assert_nil(*args)
+ else
+ super
+ end
end
def assert_not_allowed
def self.slow_test(name, &block)
define_method(name, block) unless skip_slow_tests?
end
-
- alias_method :skip, :omit
end
class ActionController::TestCase
{'a' => {'foo' => {:bar => 'baz'}}},
{'a' => {'foo' => {'bar' => :baz}}},
{'a' => {'foo' => ['bar', :baz]}},
+ ].each do |x|
+ test "prevent symbol keys in serialized db columns: #{x.inspect}" do
+ set_user_from_auth :active
+ link = Link.create!(link_class: 'test',
+ properties: x)
+ raw = ActiveRecord::Base.connection.
+ select_value("select properties from links where uuid='#{link.uuid}'")
+ refute_match(/:[fb]/, raw)
+ end
+ end
+
+ [ {['foo'] => 'bar'},
+ {'a' => {['foo', :foo] => 'bar'}},
+ {'a' => {{'foo' => 'bar'} => 'bar'}},
{'a' => {['foo', :foo] => ['bar', 'baz']}},
].each do |x|
- test "refuse symbol keys in serialized attribute: #{x.inspect}" do
- set_user_from_auth :admin_trustedclient
- assert_nothing_raised do
- Link.create!(link_class: 'test',
- properties: {})
- end
- assert_raises ActiveRecord::RecordInvalid do
+ test "refuse non-string keys in serialized db columns: #{x.inspect}" do
+ set_user_from_auth :active
+ assert_raises(ArgumentError) do
Link.create!(link_class: 'test',
properties: x)
end
test "No HashWithIndifferentAccess in database" do
set_user_from_auth :admin_trustedclient
- assert_raises ActiveRecord::RecordInvalid do
- Link.create!(link_class: 'test',
- properties: {'foo' => 'bar'}.with_indifferent_access)
- end
+ link = Link.create!(link_class: 'test',
+ properties: {'foo' => 'bar'}.with_indifferent_access)
+ raw = ActiveRecord::Base.connection.
+ select_value("select properties from links where uuid='#{link.uuid}'")
+ assert_equal '{"foo":"bar"}', raw
end
test "store long string" do
end
test "full text search index exists on models" do
+ indexes = {}
+ conn = ActiveRecord::Base.connection
+ conn.exec_query("SELECT i.relname as indname,
+ i.relowner as indowner,
+ idx.indrelid::regclass::text as table,
+ am.amname as indam,
+ idx.indkey,
+ ARRAY(
+ SELECT pg_get_indexdef(idx.indexrelid, k + 1, true)
+ FROM generate_subscripts(idx.indkey, 1) as k
+ ORDER BY k
+ ) as keys,
+ idx.indexprs IS NOT NULL as indexprs,
+ idx.indpred IS NOT NULL as indpred
+ FROM pg_index as idx
+ JOIN pg_class as i
+ ON i.oid = idx.indexrelid
+ JOIN pg_am as am
+ ON i.relam = am.oid
+ JOIN pg_namespace as ns
+ ON ns.oid = i.relnamespace
+ AND ns.nspname = ANY(current_schemas(false))").each do |idx|
+ if idx['keys'].match(/to_tsvector/)
+ indexes[idx['table']] ||= []
+ indexes[idx['table']] << idx
+ end
+ end
fts_tables = ["collections", "container_requests", "groups", "jobs",
"pipeline_instances", "pipeline_templates", "workflows"]
fts_tables.each do |table|
table_class = table.classify.constantize
if table_class.respond_to?('full_text_searchable_columns')
- fts_index_columns = table_class.full_text_searchable_columns
- index_columns = nil
- indexes = ActiveRecord::Base.connection.indexes(table)
- fts_index_by_columns = indexes.select do |index|
- if index.columns.first.match(/to_tsvector/)
- index_columns = index.columns.first.scan(/\((?<columns>[A-Za-z_]+)\,/).flatten!
- index_columns.sort == fts_index_columns.sort
- else
- false
+ expect = table_class.full_text_searchable_columns
+ ok = false
+ indexes[table].andand.each do |idx|
+ if expect == idx['keys'].scan(/COALESCE\(([A-Za-z_]+)/).flatten
+ ok = true
end
end
- assert !fts_index_by_columns.empty?, "#{table} has no FTS index with columns #{fts_index_columns}. Instead found FTS index with columns #{index_columns}"
+ assert ok, "#{table} has no full-text index\nexpect: #{expect.inspect}\nfound: #{indexes[table].inspect}"
end
end
end
active_user_token = api_client_authorizations("admin_vm").api_token
ApiClientAuthorization.
where(user_id: system_user.id).
- update_all(scopes: SafeJSON.dump(["GET /"]))
+ update_all(scopes: ["GET /"])
fixture_tokens = ApiClientAuthorization.all.collect(&:api_token)
new_token = create_superuser_token
refute_includes(fixture_tokens, new_token)
[
{script_parameters: ""},
{script_parameters: []},
- {script_parameters: {symbols: :are_not_allowed_here}},
+ {script_parameters: {["foo"] => ["bar"]}},
{runtime_constraints: ""},
{runtime_constraints: []},
{tasks_summary: ""},
{tasks_summary: []},
- {script_version: "no/branch/could/ever/possibly/have/this/name"},
].each do |invalid_attrs|
test "validation failures set error messages: #{invalid_attrs.to_json}" do
# Ensure valid_attrs doesn't produce errors -- otherwise we will
# not know whether errors reported below are actually caused by
# invalid_attrs.
- Job.create! job_attrs
+ Job.new(job_attrs).save!
- job = Job.create job_attrs(invalid_attrs)
- assert_raises(ActiveRecord::RecordInvalid, ArgumentError,
- "save! did not raise the expected exception") do
- job.save!
+ err = assert_raises(ArgumentError) do
+ Job.new(job_attrs(invalid_attrs)).save!
end
- assert_not_empty job.errors, "validation failure did not provide errors"
+ assert_match /parameters|constraints|summary/, err.message
end
end
+ test "invalid script_version" do
+ invalid = {
+ script_version: "no/branch/could/ever/possibly/have/this/name",
+ }
+ err = assert_raises(ActiveRecord::RecordInvalid) do
+ Job.new(job_attrs(invalid)).save!
+ end
+ assert_match /Script version .* does not resolve to a commit/, err.message
+ end
+
[
# Each test case is of the following format
# Array of parameters where each parameter is of the format:
if auto_admin_first_user_config
# This test requires no admin users exist (except for the system user)
users(:admin).delete
- @all_users = User.where("uuid not like '%-000000000000000'").where(:is_admin => true).find(:all)
- assert_equal 0, @all_users.size, "No admin users should exist (except for the system user)"
+ @all_users = User.where("uuid not like '%-000000000000000'").where(:is_admin => true)
+ assert_equal 0, @all_users.count, "No admin users should exist (except for the system user)"
end
Rails.configuration.auto_admin_first_user = auto_admin_first_user_config
end
test "find user method checks" do
- User.find(:all).each do |user|
+ User.all.each do |user|
assert_not_nil user.uuid, "non-null uuid expected for " + user.full_name
end
test "create new user" do
set_user_from_auth :admin
- @all_users = User.find(:all)
+ @all_users = User.all.to_a
user = User.new
user.first_name = "first_name_for_newly_created_user"
user.save
# verify there is one extra user in the db now
- assert_equal @all_users.size+1, User.find(:all).size
+ assert_equal @all_users.size+1, User.all.count
user = User.find(user.id) # get the user back
assert_equal(user.first_name, 'first_name_for_newly_created_user')
@active_user.delete
found_deleted_user = false
- User.find(:all).each do |user|
+ User.all.each do |user|
if user.uuid == active_user_uuid
found_deleted_user = true
break
definition = "more: etc"
w.update_attributes!(definition: definition)
w.reload
- assert_equal nil, w.name
- assert_equal nil, w.description
+ assert_nil w.name
+ assert_nil w.description
# Workflow name and desc set using definition yaml should be cleared
# if definition yaml is cleared
definition = nil
w.update_attributes!(definition: definition)
w.reload
- assert_equal nil, w.name
- assert_equal nil, w.description
+ assert_nil w.name
+ assert_nil w.description
# Workflow name and desc should be set to provided custom values
definition = "name: test name 3\ndescription: test desc 3\nother: some more"
HostOutputDir string
CleanupTempDir []string
Binds []string
+ Volumes map[string]struct{}
OutputPDH *string
SigChan chan os.Signal
ArvMountExit chan error
collectionPaths := []string{}
runner.Binds = nil
+ runner.Volumes = make(map[string]struct{})
needCertMount := true
var binds []string
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
case mnt.Kind == "tmp":
- runner.Binds = append(runner.Binds, bind)
+ runner.Volumes[bind] = struct{}{}
case mnt.Kind == "json":
jsondata, err := json.Marshal(mnt.Content)
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
+ runner.ContainerConfig.Volumes = runner.Volumes
+
runner.HostConfig = dockercontainer.HostConfig{
Binds: runner.Binds,
Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
} else if len(pathParts) >= 3 && pathParts[0] == "collections" {
if len(pathParts) >= 5 && pathParts[1] == "download" {
// /collections/download/ID/TOKEN/PATH...
- targetID = pathParts[2]
+ targetID = parseCollectionIDFromURL(pathParts[2])
tokens = []string{pathParts[3]}
targetPath = pathParts[4:]
pathToken = true
} else {
// /collections/ID/PATH...
- targetID = pathParts[1]
+ targetID = parseCollectionIDFromURL(pathParts[1])
tokens = h.Config.AnonymousTokens
targetPath = pathParts[2:]
}
- } else {
+ }
+
+ if targetID == "" {
statusCode = http.StatusNotFound
return
}
statusCode, statusText = http.StatusInternalServerError, err.Error()
return
}
- if kc.Client != nil && kc.Client.Transport != nil {
+ if client, ok := kc.Client.(*http.Client); ok && client.Transport != nil {
// Workaround for https://dev.arvados.org/issues/9005
- if t, ok := kc.Client.Transport.(*http.Transport); ok {
+ if t, ok := client.Transport.(*http.Transport); ok {
defer t.CloseIdleConnections()
}
}
c.Check(resp.Code, check.Equals, http.StatusMethodNotAllowed)
}
+func (s *UnitSuite) TestInvalidUUID(c *check.C) {
+ bogusID := strings.Replace(arvadostest.FooPdh, "+", "-", 1) + "-"
+ token := arvadostest.ActiveToken
+ for _, trial := range []string{
+ "http://keep-web/c=" + bogusID + "/foo",
+ "http://keep-web/c=" + bogusID + "/t=" + token + "/foo",
+ "http://keep-web/collections/download/" + bogusID + "/" + token + "/foo",
+ "http://keep-web/collections/" + bogusID + "/foo",
+ "http://" + bogusID + ".keep-web/" + bogusID + "/foo",
+ "http://" + bogusID + ".keep-web/t=" + token + "/" + bogusID + "/foo",
+ } {
+ c.Log(trial)
+ u, err := url.Parse(trial)
+ c.Assert(err, check.IsNil)
+ req := &http.Request{
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ }
+ resp := httptest.NewRecorder()
+ h := handler{Config: &Config{
+ AnonymousTokens: []string{arvadostest.AnonymousToken},
+ }}
+ h.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ }
+}
+
func mustParseURL(s string) *url.URL {
r, err := url.Parse(s)
if err != nil {
"os"
"os/signal"
"regexp"
+ "strings"
"sync"
"syscall"
"time"
}
}
-var listener net.Listener
+var (
+ listener net.Listener
+ router http.Handler
+)
func main() {
cfg := DefaultConfig()
if cfg.DefaultReplicas > 0 {
kc.Want_replicas = cfg.DefaultReplicas
}
- kc.Client.Timeout = time.Duration(cfg.Timeout)
+ kc.Client.(*http.Client).Timeout = time.Duration(cfg.Timeout)
go kc.RefreshServices(5*time.Minute, 3*time.Second)
listener, err = net.Listen("tcp", cfg.Listen)
signal.Notify(term, syscall.SIGINT)
// Start serving requests.
- http.Serve(listener, MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc))
+ router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc)
+ http.Serve(listener, router)
log.Println("shutting down")
}
return true, tok
}
-type GetBlockHandler struct {
+type proxyHandler struct {
+ http.Handler
*keepclient.KeepClient
*ApiTokenCache
}
-type PutBlockHandler struct {
- *keepclient.KeepClient
- *ApiTokenCache
-}
-
-type IndexHandler struct {
- *keepclient.KeepClient
- *ApiTokenCache
-}
-
-type InvalidPathHandler struct{}
-
-type OptionsHandler struct{}
-
-// MakeRESTRouter
-// Returns a mux.Router that passes GET and PUT requests to the
-// appropriate handlers.
-//
-func MakeRESTRouter(
- enable_get bool,
- enable_put bool,
- kc *keepclient.KeepClient) *mux.Router {
-
- t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
-
+// MakeRESTRouter returns an http.Handler that passes GET and PUT
+// requests to the appropriate handlers.
+func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient) http.Handler {
rest := mux.NewRouter()
+ h := &proxyHandler{
+ Handler: rest,
+ KeepClient: kc,
+ ApiTokenCache: &ApiTokenCache{
+ tokens: make(map[string]int64),
+ expireTime: 300,
+ },
+ }
if enable_get {
- rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
- GetBlockHandler{kc, t}).Methods("GET", "HEAD")
- rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+ rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Get).Methods("GET", "HEAD")
+ rest.HandleFunc(`/{locator:[0-9a-f]{32}}`, h.Get).Methods("GET", "HEAD")
// List all blocks
- rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
+ rest.HandleFunc(`/index`, h.Index).Methods("GET")
// List blocks whose hash has the given prefix
- rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
+ rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, h.Index).Methods("GET")
}
if enable_put {
- rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT")
- rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
- rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
- rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
- rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
+ rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Put).Methods("PUT")
+ rest.HandleFunc(`/{locator:[0-9a-f]{32}}`, h.Put).Methods("PUT")
+ rest.HandleFunc(`/`, h.Put).Methods("POST")
+ rest.HandleFunc(`/{any}`, h.Options).Methods("OPTIONS")
+ rest.HandleFunc(`/`, h.Options).Methods("OPTIONS")
}
rest.NotFoundHandler = InvalidPathHandler{}
+ return h
+}
- return rest
+var errLoopDetected = errors.New("loop detected")
+
+func (*proxyHandler) checkLoop(resp http.ResponseWriter, req *http.Request) error {
+ if via := req.Header.Get("Via"); strings.Index(via, " "+viaAlias) >= 0 {
+ log.Printf("proxy loop detected (request has Via: %q): perhaps keepproxy is misidentified by gateway config as an external client, or its keep_services record does not have service_type=proxy?", via)
+ http.Error(resp, errLoopDetected.Error(), http.StatusInternalServerError)
+ return errLoopDetected
+ }
+ return nil
}
func SetCorsHeaders(resp http.ResponseWriter) {
resp.Header().Set("Access-Control-Max-Age", "86486400")
}
-func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+type InvalidPathHandler struct{}
+
+func (InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
http.Error(resp, "Bad request", http.StatusBadRequest)
}
-func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *proxyHandler) Options(resp http.ResponseWriter, req *http.Request) {
log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
SetCorsHeaders(resp)
}
var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
-func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) {
+ if err := h.checkLoop(resp, req); err != nil {
+ return
+ }
SetCorsHeaders(resp)
+ resp.Header().Set("Via", req.Proto+" "+viaAlias)
locator := mux.Vars(req)["locator"]
var err error
}
}()
- kc := *this.KeepClient
+ kc := *h.KeepClient
+ kc.Client = &proxyClient{client: kc.Client, proto: req.Proto}
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
}
var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired))
var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header")
-func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) {
+ if err := h.checkLoop(resp, req); err != nil {
+ return
+ }
SetCorsHeaders(resp)
+ resp.Header().Set("Via", "HTTP/1.1 "+viaAlias)
+
+ kc := *h.KeepClient
+ kc.Client = &proxyClient{client: kc.Client, proto: req.Proto}
- kc := *this.KeepClient
var err error
var expectLength int64
var status = http.StatusInternalServerError
var pass bool
var tok string
- if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
+ if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass {
err = BadAuthorizationHeader
status = http.StatusForbidden
return
// Tell the client how many successful PUTs we accomplished
resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas))
- switch err {
+ switch err.(type) {
case nil:
status = http.StatusOK
_, err = io.WriteString(resp, locatorOut)
// Expects "complete" response (terminating with blank new line)
// Aborts on any errors
// Concatenates responses from all those keep servers and returns
-func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *proxyHandler) Index(resp http.ResponseWriter, req *http.Request) {
SetCorsHeaders(resp)
prefix := mux.Vars(req)["prefix"]
}
}()
- kc := *handler.KeepClient
+ kc := *h.KeepClient
- ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req)
+ ok, token := CheckAuthorizationHeader(&kc, h.ApiTokenCache, req)
if !ok {
status, err = http.StatusForbidden, BadAuthorizationHeader
return
import (
"bytes"
"crypto/md5"
+ "errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
"log"
"net/http"
"testing"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
. "gopkg.in/check.v1"
)
return kc
}
+func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
+ runProxy(c, nil, false)
+ defer closeListener()
+
+ req, err := http.NewRequest("POST",
+ "http://"+listener.Addr().String()+"/",
+ strings.NewReader("TestViaHeader"))
+ req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+ resp, err := (&http.Client{}).Do(req)
+ c.Assert(err, Equals, nil)
+ c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+ locator, err := ioutil.ReadAll(resp.Body)
+ c.Assert(err, Equals, nil)
+ resp.Body.Close()
+
+ req, err = http.NewRequest("GET",
+ "http://"+listener.Addr().String()+"/"+string(locator),
+ nil)
+ c.Assert(err, Equals, nil)
+ resp, err = (&http.Client{}).Do(req)
+ c.Assert(err, Equals, nil)
+ c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+ resp.Body.Close()
+}
+
+func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+
+ sr := map[string]string{
+ TestProxyUUID: "http://" + listener.Addr().String(),
+ }
+ router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+
+ content := []byte("TestLoopDetection")
+ _, _, err := kc.PutB(content)
+ c.Check(err, ErrorMatches, `.*loop detected.*`)
+
+ hash := fmt.Sprintf("%x", md5.Sum(content))
+ _, _, _, err = kc.Get(hash)
+ c.Check(err, ErrorMatches, `.*loop detected.*`)
+}
+
func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
kc := runProxy(c, nil, false)
defer closeListener()
bytes.NewReader(content))
c.Assert(err, IsNil)
req.Header.Set("Content-Length", t.sendLength)
- req.Header.Set("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+ req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
req.Header.Set("Content-Type", "application/octet-stream")
resp := httptest.NewRecorder()
hash2, rep, err := kc.PutB([]byte("bar"))
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
- c.Check(err, Equals, keepclient.InsufficientReplicasError)
+ c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
log.Print("PutB")
}
hash2, rep, err := kc.PutB([]byte("quux"))
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
- c.Check(err, Equals, keepclient.InsufficientReplicasError)
+ c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
}
func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
req, err := http.NewRequest("POST",
"http://"+listener.Addr().String()+"/",
strings.NewReader("qux"))
- req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+ req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
req.Header.Add("Content-Type", "application/octet-stream")
resp, err := client.Do(req)
c.Check(err, Equals, nil)
--- /dev/null
+package main
+
+import (
+ "net/http"
+
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+var viaAlias = "keepproxy"
+
+type proxyClient struct {
+ client keepclient.HTTPClient
+ proto string
+}
+
+func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) {
+ req.Header.Add("Via", pc.proto+" "+viaAlias)
+ return pc.client.Do(req)
+}
"io"
"io/ioutil"
"log"
+ "net/http"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
log.Fatal(err)
}
kc.Want_replicas = *Replicas
- kc.Client.Timeout = 10 * time.Minute
+ kc.Client.(*http.Client).Timeout = 10 * time.Minute
overrideServices(kc)