-func (this *KeepClient) putReplicas(
- hash string,
- getReader func() io.Reader,
- expectedLength int64) (locator string, replicas int, err error) {
-
- reqid := this.getRequestID()
+func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ var resp arvados.BlockWriteResponse
+ var getReader func() io.Reader
+ if req.Data == nil && req.Reader == nil {
+ return resp, errors.New("invalid BlockWriteOptions: Data and Reader are both nil")
+ }
+ if req.DataSize < 0 {
+ return resp, fmt.Errorf("invalid BlockWriteOptions: negative DataSize %d", req.DataSize)
+ }
+ if req.DataSize > BLOCKSIZE || len(req.Data) > BLOCKSIZE {
+ return resp, ErrOversizeBlock
+ }
+ if req.Data != nil {
+ if req.DataSize > len(req.Data) {
+ return resp, errors.New("invalid BlockWriteOptions: DataSize > len(Data)")
+ }
+ if req.DataSize == 0 {
+ req.DataSize = len(req.Data)
+ }
+ getReader = func() io.Reader { return bytes.NewReader(req.Data[:req.DataSize]) }
+ } else {
+ buf := asyncbuf.NewBuffer(make([]byte, 0, req.DataSize))
+ reader := req.Reader
+ if req.Hash != "" {
+ reader = HashCheckingReader{req.Reader, md5.New(), req.Hash}
+ }
+ go func() {
+ _, err := io.Copy(buf, reader)
+ buf.CloseWithError(err)
+ }()
+ getReader = buf.NewReader
+ }
+ if req.Hash == "" {
+ m := md5.New()
+ _, err := io.Copy(m, getReader())
+ if err != nil {
+ return resp, err
+ }
+ req.Hash = fmt.Sprintf("%x", m.Sum(nil))
+ }
+ if req.StorageClasses == nil {
+ if len(kc.StorageClasses) > 0 {
+ req.StorageClasses = kc.StorageClasses
+ } else {
+ req.StorageClasses = kc.DefaultStorageClasses
+ }
+ }
+ if req.Replicas == 0 {
+ req.Replicas = kc.Want_replicas
+ }
+ if req.RequestID == "" {
+ req.RequestID = kc.getRequestID()
+ }
+ if req.Attempts == 0 {
+ req.Attempts = 1 + kc.Retries
+ }