"fmt"
"io"
"io/ioutil"
+ "log"
"net/http"
"os"
+ "regexp"
"sort"
+ "strings"
"sync"
"sync/atomic"
"unsafe"
// KeepClient.Want_replicas. Returns the number of replicas that were written
// and if there was an error. Note this will return InsufficientReplias
// whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) {
+func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
// Buffer for reads from 'r'
var bufsize int
if expectedLength > 0 {
if expectedLength > BLOCKSIZE {
- return 0, OversizeBlockError
+ return "", 0, OversizeBlockError
}
bufsize = int(expectedLength)
} else {
// replicas is given in KeepClient.Want_replicas. Returns the number of
// replicas that were written and if there was an error. Note this will return
// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
+func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
t := streamer.AsyncStreamFromSlice(buf)
defer t.Close()
// of replicas is given in KeepClient.Want_replicas. Returns the number of
// replicas that were written and if there was an error. Note this will return
// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) {
- hash = fmt.Sprintf("%x", md5.Sum(buffer))
- replicas, err = this.PutHB(hash, buffer)
- return hash, replicas, err
+func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
+ hash := fmt.Sprintf("%x", md5.Sum(buffer))
+ return this.PutHB(hash, buffer)
}
// Put a block, given a Reader. This will read the entire reader into a buffer
-// to computed the hash. The desired number of replicas is given in
+// to compute the hash. The desired number of replicas is given in
// KeepClient.Want_replicas. Returns the number of replicas that were written
// and if there was an error. Note this will return InsufficientReplias
// whenever 0 <= replicas < this.Wants_replicas. Also nhote that if the block
// hash and data size are available, PutHR() is more efficient.
-func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) {
+func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
if buffer, err := ioutil.ReadAll(r); err != nil {
return "", 0, err
} else {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
unsafe.Pointer(&roots))
}
+
+type Locator struct {
+ Hash string
+ Size int
+ Signature string
+ Timestamp string
+}
+
+func MakeLocator2(hash string, hints string) (locator Locator) {
+ locator.Hash = hash
+ if hints != "" {
+ signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
+ for _, hint := range strings.Split(hints, "+") {
+ if hint != "" {
+ if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
+ fmt.Sscanf(hint, "%d", &locator.Size)
+ } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
+ locator.Signature = m[1]
+ locator.Timestamp = m[2]
+ } else if match, _ := regexp.MatchString("^[:upper:]", hint); match {
+ // Any unknown hint that starts with an uppercase letter is
+ // presumed to be valid and ignored, to permit forward compatibility.
+ } else {
+ // Unknown format; not a valid locator.
+ return Locator{"", 0, "", ""}
+ }
+ }
+ }
+ }
+ return locator
+}
+
+func MakeLocator(path string) Locator {
+ pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
+ if err != nil {
+ log.Print("Don't like regexp", err)
+ }
+
+ sm := pathpattern.FindStringSubmatch(path)
+ if sm == nil {
+ log.Print("Failed match ", path)
+ return Locator{"", 0, "", ""}
+ }
+
+ return MakeLocator2(sm[1], sm[2])
+}
<-st.handled
status := <-upload_status
- c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
+ c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
})
log.Printf("TestUploadToStubKeepServer done")
<-st.handled
status := <-upload_status
- c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
+ c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
})
log.Printf("TestUploadToStubKeepServerBufferReader done")
<-fh.handled
c.Check(err, Equals, nil)
- c.Check(phash, Equals, hash)
+ c.Check(phash, Equals, "")
c.Check(replicas, Equals, 2)
c.Check(<-st.handled, Equals, shuff[1])
c.Check(<-st.handled, Equals, shuff[2])
}
{
hash2, replicas, err := kc.PutB([]byte("foo"))
- c.Check(hash2, Equals, hash)
+ c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
c.Check(replicas, Equals, 2)
c.Check(err, Equals, nil)
}
log.Printf("TestPutProxy done")
}
+
+func (s *StandaloneSuite) TestMakeLocator(c *C) {
+ l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
+
+ c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
+ c.Check(l.Size, Equals, 3)
+ c.Check(l.Signature, Equals, "abcde")
+ c.Check(l.Timestamp, Equals, "12345678")
+}
"errors"
"fmt"
"io"
+ "io/ioutil"
"log"
"net/http"
"os"
"strconv"
+ "strings"
)
type keepDisk struct {
url string
statusCode int
replicas_stored int
+ response string
}
func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
var err error
var url = fmt.Sprintf("%s/%s", host, hash)
if req, err = http.NewRequest("PUT", url, nil); err != nil {
- upload_status <- uploadStatus{err, url, 0, 0}
+ upload_status <- uploadStatus{err, url, 0, 0, ""}
body.Close()
return
}
var resp *http.Response
if resp, err = this.Client.Do(req); err != nil {
- upload_status <- uploadStatus{err, url, 0, 0}
+ upload_status <- uploadStatus{err, url, 0, 0, ""}
body.Close()
return
}
fmt.Sscanf(xr, "%d", &rep)
}
+ respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ if err2 != nil && err2 != io.EOF {
+ upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
+ return
+ }
+
+ locator := strings.TrimSpace(string(respbody))
+
if resp.StatusCode == http.StatusOK {
- upload_status <- uploadStatus{nil, url, resp.StatusCode, rep}
+ upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
} else {
- upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep}
+ upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, locator}
}
}
func (this KeepClient) putReplicas(
hash string,
tr *streamer.AsyncStream,
- expectedLength int64) (replicas int, err error) {
+ expectedLength int64) (locator string, replicas int, err error) {
// Calculate the ordering for uploading to servers
sv := this.shuffledServiceRoots(hash)
active += 1
} else {
if active == 0 {
- return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
} else {
break
}
if status.statusCode == 200 {
// good news!
remaining_replicas -= status.replicas_stored
+ locator = status.response
} else {
// writing to keep server failed for some reason
log.Printf("Keep server put to %v failed with '%v'",
log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
}
- return this.Want_replicas, nil
+ return locator, this.Want_replicas, nil
}
from httplib import BadStatusLine
if 'headers' not in kwargs:
kwargs['headers'] = {}
+
+ if config.get("ARVADOS_EXTERNAL_CLIENT", "") == "true":
+ kwargs['headers']['X-External-Client'] = '1'
+
kwargs['headers']['Authorization'] = 'OAuth2 %s' % config.get('ARVADOS_API_TOKEN', 'ARVADOS_API_TOKEN_not_set')
try:
return self.orig_http_request(uri, **kwargs)
'arvados', apiVersion, http=http, discoveryServiceUrl=url)
http.cache = None
return services[version]
-
pass
class NotImplementedError(Exception):
pass
+class NoKeepServersError(Exception):
+ pass
with self._done_lock:
return (self._done < self._todo)
- def save_response(self, response_body):
+ def save_response(self, response_body, replicas_stored):
"""
Records a response body (a locator, possibly signed) returned by
the Keep server. It is not necessary to save more than
in response to a successful request is valid.
"""
with self._done_lock:
- self._done += 1
+ self._done += replicas_stored
self._response = response_body
def response(self):
url = self.args['service_root'] + self.args['data_hash']
api_token = config.get('ARVADOS_API_TOKEN')
headers = {'Authorization': "OAuth2 %s" % api_token}
+
+ if self.args['using_proxy']:
+ # We're using a proxy, so tell the proxy how many copies we
+ # want it to store
+ headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
+
try:
+ logging.debug("Uploading to {}".format(url))
resp, content = h.request(url.encode('utf-8'), 'PUT',
headers=headers,
body=self.args['data'])
(str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root']))
- return limiter.save_response(content.strip())
+ replicas_stored = 1
+ if 'x-keep-replicas-stored' in resp:
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ try:
+ replicas_stored = int(resp['x-keep-replicas-stored'])
+ except ValueError:
+ pass
+ return limiter.save_response(content.strip(), replicas_stored)
+
logging.warning("Request fail: PUT %s => %s %s" %
(url, resp['status'], content))
except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
self._cache = []
# default 256 megabyte cache
self.cache_max = 256 * 1024 * 1024
+ self.using_proxy = False
def shuffled_service_roots(self, hash):
if self.service_roots == None:
self.lock.acquire()
- try:
- keep_disks = arvados.api().keep_disks().list().execute()['items']
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_disks)
- self.service_roots = sorted(set(roots))
- logging.debug(str(self.service_roots))
- finally:
- self.lock.release()
+
+ # Override normal keep disk lookup with an explict proxy
+ # configuration.
+ keep_proxy_env = config.get("ARVADOS_KEEP_PROXY")
+ if keep_proxy_env != None and len(keep_proxy_env) > 0:
+
+ if keep_proxy_env[-1:] != '/':
+ keep_proxy_env += "/"
+ self.service_roots = [keep_proxy_env]
+ self.using_proxy = True
+ else:
+ try:
+ try:
+ keep_services = arvados.api().keep_services().accessible().execute()['items']
+ except Exception:
+ keep_services = arvados.api().keep_disks().list().execute()['items']
+
+ if len(keep_services) == 0:
+ raise arvados.errors.NoKeepServersError()
+
+ if 'service_type' in keep_services[0] and keep_services[0]['service_type'] == 'proxy':
+ self.using_proxy = True
+
+ roots = (("http%s://%s:%d/" %
+ ('s' if f['service_ssl_flag'] else '',
+ f['service_host'],
+ f['service_port']))
+ for f in keep_services)
+ self.service_roots = sorted(set(roots))
+ logging.debug(str(self.service_roots))
+ finally:
+ self.lock.release()
# Build an ordering with which to query the Keep servers based on the
# contents of the hash.
t = KeepClient.KeepWriterThread(data=data,
data_hash=data_hash,
service_root=service_root,
- thread_limiter=thread_limiter)
+ thread_limiter=thread_limiter,
+ using_proxy=self.using_proxy,
+ want_copies=(want_copies if self.using_proxy else 1))
t.start()
threads += [t]
for t in threads:
t.join()
have_copies = thread_limiter.done()
# If we're done, return the response from Keep
- if have_copies == want_copies:
+ if have_copies >= want_copies:
return thread_limiter.response()
raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %
keep0 = tempfile.mkdtemp()
keep_cmd = ["bin/keep",
"-volumes={}".format(keep0),
- "-listen=:{}".format(25107+n)]
+ "-listen=:{}".format(25107+n),
+ "-pid={}".format("tmp/keep{}.pid".format(n))]
for arg, val in keep_args.iteritems():
keep_cmd.append("{}={}".format(arg, val))
kp0 = subprocess.Popen(keep_cmd)
with open("tmp/keep{}.pid".format(n), 'w') as f:
f.write(str(kp0.pid))
+
with open("tmp/keep{}.volume".format(n), 'w') as f:
f.write(keep0)
else:
os.environ["GOPATH"] = os.getcwd() + ":" + os.environ["GOPATH"]
- subprocess.call(["go", "install", "keep"])
+ subprocess.call(["./go.sh", "install", "keep"])
if not os.path.exists("tmp"):
os.mkdir("tmp")
_start_keep(0, keep_args)
_start_keep(1, keep_args)
-
os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
os.environ["ARVADOS_API_HOST_INSECURE"] = "true"
if os.path.exists("tmp/keep{}.volume".format(n)):
with open("tmp/keep{}.volume".format(n), 'r') as r:
shutil.rmtree(r.read(), True)
+ os.unlink("tmp/keep{}.volume".format(n))
if os.path.exists("tmp/keep.blob_signing_key"):
os.remove("tmp/keep.blob_signing_key")
_stop_keep(0)
_stop_keep(1)
- shutil.rmtree("tmp", True)
+ os.chdir(cwd)
+
+def run_keep_proxy(auth):
+ stop_keep_proxy()
+
+ cwd = os.getcwd()
+ os.chdir(os.path.join(os.path.dirname(__file__), KEEP_SERVER_DIR))
+ if os.environ.get('GOPATH') == None:
+ os.environ["GOPATH"] = os.getcwd()
+ else:
+ os.environ["GOPATH"] = os.getcwd() + ":" + os.environ["GOPATH"]
+
+ subprocess.call(["./go.sh", "install", "arvados.org/keepproxy"])
+ if not os.path.exists("tmp"):
+ os.mkdir("tmp")
+
+ os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
+ os.environ["ARVADOS_API_HOST_INSECURE"] = "true"
+ os.environ["ARVADOS_API_TOKEN"] = fixture("api_client_authorizations")[auth]["api_token"]
+
+ kp0 = subprocess.Popen(["bin/keepproxy", "-pid=tmp/keepproxy.pid", "-listen=:{}".format(25101)])
+
+ authorize_with("admin")
+ api = arvados.api('v1', cache=False)
+ api.keep_services().create(body={"keep_service": {"service_host": "localhost", "service_port": 25101, "service_type": "proxy"} }).execute()
+
+ arvados.config.settings()["ARVADOS_KEEP_PROXY"] = "http://localhost:25101"
+
+ os.chdir(cwd)
+
+def stop_keep_proxy():
+ cwd = os.getcwd()
+ os.chdir(os.path.join(os.path.dirname(__file__), KEEP_SERVER_DIR))
+ kill_server_pid("tmp/keepproxy.pid", 0)
os.chdir(cwd)
def fixture(fix):
run_keep()
elif args.action == 'stop_keep':
stop_keep()
+ elif args.action == 'start_keep_proxy':
+ run_keep_proxy("admin")
+ elif args.action == 'stop_keep_proxy':
+ stop_keep_proxy()
else:
print('Unrecognized action "{}", actions are "start", "stop", "start_keep", "stop_keep"'.format(args.action))
class KeepTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
+ super(KeepTestCase, cls).setUpClass()
try:
del os.environ['KEEP_LOCAL_STORE']
except KeyError:
pass
+
+ # Make sure these are clear, we want to talk to the Keep servers
+ # directly.
+ os.environ["ARVADOS_KEEP_PROXY"] = ""
+ os.environ["ARVADOS_EXTERNAL_CLIENT"] = ""
+
run_test_server.run()
run_test_server.run_keep()
+ arvados.keep.global_client_object = None
+ arvados.config._settings = None
+ run_test_server.authorize_with("admin")
@classmethod
def tearDownClass(cls):
+ super(KeepTestCase, cls).tearDownClass()
run_test_server.stop()
run_test_server.stop_keep()
del os.environ['KEEP_LOCAL_STORE']
except KeyError:
pass
+
run_test_server.run()
run_test_server.run_keep(blob_signing_key='abcdefghijk0123456789',
enforce_permissions=True)
self.assertEqual(arvados.Keep.get("acbd18db4cc2f85cedef654fccc4a4d8"),
'foo',
'wrong content from Keep.get(md5("foo"))')
+
+
+class KeepProxyTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(KeepProxyTestCase, cls).setUpClass()
+
+ try:
+ del os.environ['KEEP_LOCAL_STORE']
+ except KeyError:
+ pass
+
+ os.environ["ARVADOS_KEEP_PROXY"] = ""
+ os.environ["ARVADOS_EXTERNAL_CLIENT"] = ""
+
+ run_test_server.run()
+ run_test_server.run_keep()
+ arvados.keep.global_client_object = None
+ arvados.config._settings = None
+ run_test_server.run_keep_proxy("admin")
+ KeepProxyTestCase.arvados_keep_proxy = arvados.config.get("ARVADOS_KEEP_PROXY")
+
+ @classmethod
+ def tearDownClass(cls):
+ super(KeepProxyTestCase, cls).tearDownClass()
+ run_test_server.stop()
+ run_test_server.stop_keep()
+ run_test_server.stop_keep_proxy()
+
+ def test_KeepProxyTest1(self):
+ # Will use ARVADOS_KEEP_PROXY environment variable that is set by
+ # run_keep_proxy() in setUpClass()
+
+ os.environ["ARVADOS_KEEP_PROXY"] = KeepProxyTestCase.arvados_keep_proxy
+ os.environ["ARVADOS_EXTERNAL_CLIENT"] = ""
+ arvados.keep.global_client_object = None
+ arvados.config._settings = None
+
+ baz_locator = arvados.Keep.put('baz')
+ self.assertEqual(baz_locator,
+ '73feffa4b7f6bb68e44cf984c85f6e88+3',
+ 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
+ self.assertEqual(arvados.Keep.get(baz_locator),
+ 'baz',
+ 'wrong content from Keep.get(md5("baz"))')
+
+ self.assertEqual(True, arvados.Keep.global_client_object().using_proxy)
+
+ def test_KeepProxyTest2(self):
+ # We don't want to use ARVADOS_KEEP_PROXY from run_keep_proxy() in
+ # setUpClass(), so clear it and set ARVADOS_EXTERNAL_CLIENT which will
+ # contact the API server.
+ os.environ["ARVADOS_KEEP_PROXY"] = ""
+ os.environ["ARVADOS_EXTERNAL_CLIENT"] = "true"
+ arvados.keep.global_client_object = None
+ arvados.config._settings = None
+
+ # Will send X-External-Client to server and get back the proxy from
+ # keep_services/accessible
+
+ baz_locator = arvados.Keep.put('baz2')
+ self.assertEqual(baz_locator,
+ '91f372a266fe2bf2823cb8ec7fda31ce+4',
+ 'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
+ self.assertEqual(arvados.Keep.get(baz_locator),
+ 'baz2',
+ 'wrong content from Keep.get(md5("baz2"))')
+
+ self.assertEqual(True, arvados.Keep.global_client_object().using_proxy)
"net"
"net/http"
"os"
+ "os/signal"
"sync"
+ "syscall"
"time"
)
go RefreshServicesList(&kc)
+ // Shut down the server gracefully (by closing the listener)
+ // if SIGTERM is received.
+ term := make(chan os.Signal, 1)
+ go func(sig <-chan os.Signal) {
+ s := <-sig
+ log.Println("caught signal:", s)
+ listener.Close()
+ }(term)
+ signal.Notify(term, syscall.SIGTERM)
+
+ if pidfile != "" {
+ f, err := os.Create(pidfile)
+ if err == nil {
+ fmt.Fprint(f, os.Getpid())
+ f.Close()
+ } else {
+ log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
+ }
+ }
+
log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
// Start listening for requests.
http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
+
+ log.Println("shutting down")
+
+ if pidfile != "" {
+ os.Remove(pidfile)
+ }
}
type ApiTokenCache struct {
rest := mux.NewRouter()
if enable_get {
- gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
- ghsig := rest.Handle(
- `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
- GetBlockHandler{kc, t})
-
- gh.Methods("GET", "HEAD")
- ghsig.Methods("GET", "HEAD")
+ rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
+ GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+ rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
}
if enable_put {
+ rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
}
kc := *this.KeepClient
hash := mux.Vars(req)["hash"]
- signature := mux.Vars(req)["signature"]
- timestamp := mux.Vars(req)["timestamp"]
+ hints := mux.Vars(req)["hints"]
+
+ locator := keepclient.MakeLocator2(hash, hints)
log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
var blocklen int64
if req.Method == "GET" {
- reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
+ reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
defer reader.Close()
} else if req.Method == "HEAD" {
- blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
+ blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
}
resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
kc := *this.KeepClient
hash := mux.Vars(req)["hash"]
+ hints := mux.Vars(req)["hints"]
+
+ locator := keepclient.MakeLocator2(hash, hints)
var contentLength int64 = -1
if req.Header.Get("Content-Length") != "" {
return
}
+ if locator.Size > 0 && int64(locator.Size) != contentLength {
+ http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
+ return
+ }
+
if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
return
}
// Now try to put the block through
- replicas, err := kc.PutHR(hash, req.Body, contentLength)
+ hash, replicas, err := kc.PutHR(hash, req.Body, contentLength)
// Tell the client how many successful PUTs we accomplished
resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
case nil:
// Default will return http.StatusOK
log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
+ n, err2 := io.WriteString(resp, hash)
+ if err2 != nil {
+ log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
+ }
case keepclient.OversizeBlockError:
// Too much data
// client can decide if getting less than the number of
// replications it asked for is a fatal error.
// Default will return http.StatusOK
+ n, err2 := io.WriteString(resp, hash)
+ if err2 != nil {
+ log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
+ }
} else {
http.Error(resp, "", http.StatusServiceUnavailable)
}
defer listener.Close()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ var hash2 string
{
_, _, err := kc.Ask(hash)
}
{
- hash2, rep, err := kc.PutB([]byte("foo"))
- c.Check(hash2, Equals, hash)
+ var rep int
+ var err error
+ hash2, rep, err = kc.PutB([]byte("foo"))
+ c.Check(hash2, Equals, fmt.Sprintf("%s+3", hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
log.Print("PutB")
}
{
- blocklen, _, err := kc.Ask(hash)
+ blocklen, _, err := kc.Ask(hash2)
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
log.Print("Ask 2")
}
{
- reader, blocklen, _, err := kc.Get(hash)
+ reader, blocklen, _, err := kc.Get(hash2)
c.Assert(err, Equals, nil)
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte("foo"))
{
hash2, rep, err := kc.PutB([]byte("bar"))
- c.Check(hash2, Equals, hash)
+ c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
c.Check(err, Equals, keepclient.InsufficientReplicasError)
log.Print("PutB")
{
hash2, rep, err := kc.PutB([]byte("baz"))
- c.Check(hash2, Equals, hash)
+ c.Check(hash2, Equals, fmt.Sprintf("%s+3", hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
log.Print("PutB")
kc := runProxy(c, []string{"keepproxy", "-no-put"}, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h", 29953)
defer listener.Close()
- hash := fmt.Sprintf("%x", md5.Sum([]byte("quux")))
-
{
hash2, rep, err := kc.PutB([]byte("quux"))
- c.Check(hash2, Equals, hash)
+ c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
c.Check(err, Equals, keepclient.InsufficientReplicasError)
log.Print("PutB")
permission_ttl_sec int
serialize_io bool
volumearg string
+ pidfile string
)
flag.StringVar(
&data_manager_token_file,
"e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
"supplied, Keep will scan mounted filesystems for volumes "+
"with a /keep top-level directory.")
+
+ flag.StringVar(
+ &pidfile,
+ "pid",
+ "",
+ "Path to write pid file")
+
flag.Parse()
// Look for local keep volumes.
}(term)
signal.Notify(term, syscall.SIGTERM)
+ if pidfile != "" {
+ f, err := os.Create(pidfile)
+ if err == nil {
+ fmt.Fprint(f, os.Getpid())
+ f.Close()
+ } else {
+ log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
+ }
+ }
+
// Start listening for requests.
srv := &http.Server{Addr: listen}
srv.Serve(listener)
log.Println("shutting down")
+
+ if pidfile != "" {
+ os.Remove(pidfile)
+ }
}
// MakeRESTRouter