Vendor dependencies for GCS

This commit is contained in:
Alexander Neumann
2017-08-05 20:17:15 +02:00
parent ba75a3884c
commit 8ca6a9a240
1228 changed files with 1769186 additions and 1 deletions

View File

@@ -0,0 +1,241 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package bytestream provides a client for any service that exposes a ByteStream API.
//
// Note: This package is a work-in-progress. Backwards-incompatible changes should be expected.
package bytestream
// This file contains the client implementation of Bytestream declared at:
// https://github.com/googleapis/googleapis/blob/master/google/bytestream/bytestream.proto
import (
"fmt"
"math/rand"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "google.golang.org/genproto/googleapis/bytestream"
)
const (
// MaxBufSize is the maximum buffer size (in bytes) received in a read chunk or sent in a write chunk.
MaxBufSize = 2 * 1024 * 1024
backoffBase = 10 * time.Millisecond
backoffMax = 1 * time.Second
maxTries = 5
)
// Client is the go wrapper around a ByteStreamClient and provides an interface to it.
type Client struct {
client pb.ByteStreamClient
options []grpc.CallOption
}
// NewClient creates a new bytestream.Client.
func NewClient(cc *grpc.ClientConn, options ...grpc.CallOption) *Client {
return &Client{
client: pb.NewByteStreamClient(cc),
options: options,
}
}
// Reader reads from a byte stream.
type Reader struct {
ctx context.Context
c *Client
readClient pb.ByteStream_ReadClient
resourceName string
err error
buf []byte
}
// ResourceName gets the resource name this Reader is reading.
func (r *Reader) ResourceName() string {
return r.resourceName
}
// Read implements io.Reader.
// Read buffers received bytes that do not fit in p.
func (r *Reader) Read(p []byte) (int, error) {
if r.err != nil {
return 0, r.err
}
var backoffDelay time.Duration
for tries := 0; len(r.buf) == 0 && tries < maxTries; tries++ {
// No data in buffer.
resp, err := r.readClient.Recv()
if err != nil {
r.err = err
return 0, err
}
r.buf = resp.Data
if len(r.buf) != 0 {
break
}
// back off
if backoffDelay < backoffBase {
backoffDelay = backoffBase
} else {
backoffDelay = time.Duration(float64(backoffDelay) * 1.3 * (1 - 0.4*rand.Float64()))
}
if backoffDelay > backoffMax {
backoffDelay = backoffMax
}
select {
case <-time.After(backoffDelay):
case <-r.ctx.Done():
if err := r.ctx.Err(); err != nil {
r.err = err
}
return 0, r.err
}
}
// Copy from buffer.
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
// Close implements io.Closer.
func (r *Reader) Close() error {
if r.readClient == nil {
return nil
}
err := r.readClient.CloseSend()
r.readClient = nil
return err
}
// NewReader creates a new Reader to read a resource.
func (c *Client) NewReader(ctx context.Context, resourceName string) (*Reader, error) {
return c.NewReaderAt(ctx, resourceName, 0)
}
// NewReader creates a new Reader to read a resource from the given offset.
func (c *Client) NewReaderAt(ctx context.Context, resourceName string, offset int64) (*Reader, error) {
// readClient is set up for Read(). ReadAt() will copy needed fields into its reentrantReader.
readClient, err := c.client.Read(ctx, &pb.ReadRequest{
ResourceName: resourceName,
ReadOffset: offset,
}, c.options...)
if err != nil {
return nil, err
}
return &Reader{
ctx: ctx,
c: c,
resourceName: resourceName,
readClient: readClient,
}, nil
}
// Writer writes to a byte stream.
type Writer struct {
ctx context.Context
writeClient pb.ByteStream_WriteClient
resourceName string
offset int64
backoffDelay time.Duration
err error
}
// ResourceName gets the resource name this Writer is writing.
func (w *Writer) ResourceName() string {
return w.resourceName
}
// Write implements io.Writer.
func (w *Writer) Write(p []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
n := 0
for n < len(p) {
bufSize := len(p) - n
if bufSize > MaxBufSize {
bufSize = MaxBufSize
}
r := pb.WriteRequest{
WriteOffset: w.offset,
FinishWrite: false,
Data: p[n : n+bufSize],
}
// Bytestream only requires the resourceName to be sent in the first WriteRequest.
if w.offset == 0 {
r.ResourceName = w.resourceName
}
err := w.writeClient.Send(&r)
if err != nil {
w.err = err
return n, err
}
w.offset += int64(bufSize)
n += bufSize
}
return n, nil
}
// Close implements io.Closer. It is the caller's responsibility to call Close() when writing is done.
func (w *Writer) Close() error {
err := w.writeClient.Send(&pb.WriteRequest{
ResourceName: w.resourceName,
WriteOffset: w.offset,
FinishWrite: true,
Data: nil,
})
if err != nil {
w.err = err
return fmt.Errorf("Send(WriteRequest< FinishWrite >) failed: %v", err)
}
resp, err := w.writeClient.CloseAndRecv()
if err != nil {
w.err = err
return fmt.Errorf("CloseAndRecv: %v", err)
}
if resp == nil {
err = fmt.Errorf("expected a response on close, got %v", resp)
} else if resp.CommittedSize != w.offset {
err = fmt.Errorf("server only wrote %d bytes, want %d", resp.CommittedSize, w.offset)
}
w.err = err
return err
}
// NewWriter creates a new Writer to write a resource.
//
// resourceName specifies the name of the resource.
// The resource will be available after Close has been called.
//
// It is the caller's responsibility to call Close when writing is done.
//
// TODO: There is currently no way to resume a write. Maybe NewWriter should begin with a call to QueryWriteStatus.
func (c *Client) NewWriter(ctx context.Context, resourceName string) (*Writer, error) {
wc, err := c.client.Write(ctx, c.options...)
if err != nil {
return nil, err
}
return &Writer{
ctx: ctx,
writeClient: wc,
resourceName: resourceName,
}, nil
}

View File

@@ -0,0 +1,439 @@
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bytestream
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"net"
"testing"
"golang.org/x/net/context"
"google.golang.org/api/transport/bytestream/internal"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
pb "google.golang.org/genproto/googleapis/bytestream"
)
const testData = "0123456789"
// A grpcServer is an in-process gRPC server, listening on a system-chosen port on
// the local loopback interface. Servers are for testing only and are not
// intended to be used in production code.
// (Copied from "cloud.google.com/internal/testutil/server_test.go")
//
// To create a server, make a new grpcServer, register your handlers, then call
// Start:
//
// srv, err := NewServer()
// ...
// mypb.RegisterMyServiceServer(srv.Gsrv, &myHandler)
// ....
// srv.Start()
//
// Clients should connect to the server with no security:
//
// conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
// ...
type grpcServer struct {
Addr string
l net.Listener
Gsrv *grpc.Server
}
type TestSetup struct {
ctx context.Context
rpcTest *grpcServer
server *internal.Server
client *Client
}
func TestClientRead(t *testing.T) {
testCases := []struct {
name string
input string
resourceName string
extraBufSize int
extraError error
want string
wantErr bool
wantEOF bool
}{
{
name: "test foo",
input: testData,
resourceName: "foo",
want: testData,
}, {
name: "test bar",
input: testData,
resourceName: "bar",
want: testData,
}, {
name: "test bar extraBufSize=1",
input: testData,
resourceName: "bar",
extraBufSize: 1,
want: testData,
wantEOF: true,
}, {
name: "test bar extraBufSize=2",
input: testData,
resourceName: "bar",
extraBufSize: 2,
want: testData,
wantEOF: true,
}, {
name: "empty resource name",
input: testData,
resourceName: "",
extraBufSize: 1,
wantErr: true,
}, {
name: "read after error returns error again",
input: testData,
resourceName: "does not matter",
extraBufSize: 1,
extraError: errors.New("some error"),
wantErr: true,
},
}
for _, tc := range testCases {
bufSize := len(tc.want) + tc.extraBufSize
if bufSize == 0 {
t.Errorf("%s: This is probably wrong. Read returning 0 bytes?", tc.name)
continue
}
setup := newTestSetup(tc.input)
r, err := setup.client.NewReader(setup.ctx, tc.resourceName)
if err != nil {
t.Errorf("%s: NewReader(%q): %v", tc.name, tc.resourceName, err)
continue
}
if tc.extraError != nil {
r.err = tc.extraError
}
buf := make([]byte, bufSize)
gotEOF := false
total := 0
for total < bufSize && err == nil {
var n int
n, err = r.Read(buf[total:])
total += n
}
if err == io.EOF {
gotEOF = true
err = nil
doubleCheckBuf := make([]byte, bufSize)
n2, err2 := r.Read(doubleCheckBuf)
if err2 != io.EOF {
t.Errorf("%s: read and got EOF, double-check: read %d bytes got err=%v", n2, err2)
continue
}
}
setup.Close()
if gotErr := err != nil; tc.wantErr != gotErr {
t.Errorf("%s: read %d bytes, got err=%v, wantErr=%t", tc.name, total, err, tc.wantErr)
continue
}
if tc.wantEOF != gotEOF {
t.Errorf("%s: read %d bytes, gotEOF=%t, wantEOF=%t", tc.name, total, gotEOF, tc.wantEOF)
continue
}
if got := string(buf[:total]); got != tc.want {
t.Errorf("%s: read %q, want %q", tc.name, got, tc.want)
continue
}
}
}
func TestClientWrite(t *testing.T) {
testCases := []struct {
name string
resourceName string
maxBufSize int
data string
results []int
wantWriteErr bool
wantCloseErr bool
}{
{
name: "test foo",
resourceName: "foo",
data: testData,
results: []int{len(testData)},
}, {
name: "empty resource name",
resourceName: "",
data: testData,
results: []int{10},
//wantWriteErr: true,
wantCloseErr: true,
}, {
name: "test bar",
resourceName: "bar",
data: testData,
results: []int{len(testData)},
},
}
var setup *TestSetup
tcFor:
for _, tc := range testCases {
if setup != nil {
setup.Close()
}
setup = newTestSetup("")
buf := []byte(tc.data)
var ofs int
w, err := setup.client.NewWriter(setup.ctx, tc.resourceName)
if err != nil {
t.Errorf("%s: NewWriter(): %v", tc.name, err)
continue
}
for i := 0; i < len(tc.results); i++ {
if ofs >= len(tc.data) {
t.Errorf("%s [%d]: Attempting to write more than tc.input: ofs=%d len(buf)=%d",
tc.name, i, ofs, len(tc.data))
continue tcFor
}
n, err := w.Write(buf[ofs:])
ofs += n
if gotErr := err != nil; gotErr != tc.wantWriteErr {
t.Errorf("%s [%d]: Write() got n=%d err=%v, wantWriteErr=%t", tc.name, i, n, err, tc.wantWriteErr)
continue tcFor
} else if tc.wantWriteErr && i+1 < len(tc.results) {
t.Errorf("%s: wantWriteErr and got err after %d results, len(results)=%d is too long.", tc.name, i+1, len(tc.results))
continue tcFor
}
if n != tc.results[i] {
t.Errorf("%s [%d]: Write() wrote %d bytes, want %d bytes", tc.name, i, n, tc.results[i])
continue tcFor
}
}
err = w.Close()
if gotErr := err != nil; gotErr != tc.wantCloseErr {
t.Errorf("%s: Close() got err=%v, wantCloseErr=%t", tc.name, err, tc.wantCloseErr)
continue tcFor
}
}
setup.Close()
}
func TestClientRead_AfterSetupClose(t *testing.T) {
setup := newTestSetup("closed")
setup.Close()
_, err := setup.client.NewReader(setup.ctx, "should fail")
if err == nil {
t.Errorf("NewReader(%q): err=%v", "should fail", err)
}
}
func TestClientWrite_AfterSetupClose(t *testing.T) {
setup := newTestSetup("closed")
setup.Close()
_, err := setup.client.NewWriter(setup.ctx, "should fail")
if err == nil {
t.Fatalf("NewWriter(%q): err=%v", "shoudl fail", err)
}
}
type UnsendableWriteClient struct {
closeAndRecvWriteResponse *pb.WriteResponse
closeAndRecvError error
}
func (w *UnsendableWriteClient) Send(*pb.WriteRequest) error {
if w.closeAndRecvError != nil {
return nil
}
return errors.New("UnsendableWriteClient.Send() fails unless closeAndRecvError is set")
}
func (w *UnsendableWriteClient) CloseAndRecv() (*pb.WriteResponse, error) {
if w.closeAndRecvError == nil {
log.Fatalf("UnsendableWriteClient.Close() when closeAndRecvError == nil.")
}
return w.closeAndRecvWriteResponse, w.closeAndRecvError
}
func (w *UnsendableWriteClient) Context() context.Context {
log.Fatalf("UnsendableWriteClient.Context() should never be called")
return context.Background()
}
func (w *UnsendableWriteClient) CloseSend() error {
return errors.New("UnsendableWriteClient.CloseSend() should never be called")
}
func (w *UnsendableWriteClient) Header() (metadata.MD, error) {
log.Fatalf("UnsendableWriteClient.Header() should never be called")
return metadata.MD{}, nil
}
func (w *UnsendableWriteClient) Trailer() metadata.MD {
log.Fatalf("UnsendableWriteClient.Trailer() should never be called")
return metadata.MD{}
}
func (fake *UnsendableWriteClient) SendMsg(m interface{}) error {
log.Fatalf("UnsendableWriteClient.SendMsg() should never be called")
return nil
}
func (fake *UnsendableWriteClient) RecvMsg(m interface{}) error {
log.Fatalf("UnsendableWriteClient.RecvMsg() should never be called")
return nil
}
func TestClientWrite_WriteFails(t *testing.T) {
setup := newTestSetup("")
w, err := setup.client.NewWriter(setup.ctx, "")
if err != nil {
t.Fatalf("NewWriter(): %v", err)
}
defer setup.Close()
w.writeClient = &UnsendableWriteClient{}
_, err = w.Write([]byte(testData))
if err == nil {
t.Errorf("Write() should fail")
}
}
func TestClientWrite_CloseAndRecvFails(t *testing.T) {
setup := newTestSetup("")
w, err := setup.client.NewWriter(setup.ctx, "CloseAndRecvFails")
if err != nil {
t.Fatalf("NewWriter(): %v", err)
}
defer setup.Close()
n, err := w.Write([]byte(testData))
if err != nil {
t.Errorf("Write() failed: %v", err)
return
}
if n != len(testData) {
t.Errorf("Write() got n=%d, want n=%d", n, len(testData))
return
}
w.writeClient = &UnsendableWriteClient{
closeAndRecvError: errors.New("CloseAndRecv() must fail"),
}
if err = w.Close(); err == nil {
t.Errorf("Close() should fail")
return
}
}
type TestWriteHandler struct {
buf bytes.Buffer // bytes.Buffer implements io.Writer
name string // This service can handle one name only.
}
func (w *TestWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
if w.name == "" {
w.name = name
} else if w.name != name {
return nil, fmt.Errorf("writer already has name=%q, now a new name=%q confuses me", w.name, name)
}
// initOffset is ignored.
return &w.buf, nil
}
func (w *TestWriteHandler) Close(ctx context.Context, name string) error {
w.name = ""
w.buf.Reset()
return nil
}
type TestReadHandler struct {
buf string
name string // This service can handle one name only.
}
// GetWriter() returns an io.ReaderAt to accept reads from the given name.
func (r *TestReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
if r.name == "" {
r.name = name
} else if r.name != name {
return nil, fmt.Errorf("reader already has name=%q, now a new name=%q confuses me", r.name, name)
}
return bytes.NewReader([]byte(r.buf)), nil
}
// Close does nothing.
func (r *TestReadHandler) Close(ctx context.Context, name string) error {
return nil
}
// newGRPCServer creates a new grpcServer. The grpcServer will be listening for gRPC connections
// at the address named by the Addr field, without TLS.
func newGRPCServer() (*grpcServer, error) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, err
}
s := &grpcServer{
Addr: l.Addr().String(),
l: l,
Gsrv: grpc.NewServer(),
}
return s, nil
}
// Start causes the server to start accepting incoming connections.
// Call Start after registering handlers.
func (s *grpcServer) Start() {
go s.Gsrv.Serve(s.l)
}
// Close shuts down the server.
func (s *grpcServer) Close() {
s.Gsrv.Stop()
s.l.Close()
}
func newTestSetup(input string) *TestSetup {
testSetup := &TestSetup{
ctx: context.Background(),
}
testReadHandler := &TestReadHandler{
buf: input,
}
var err error
if testSetup.rpcTest, err = newGRPCServer(); err != nil {
log.Fatalf("newGRPCServer: %v", err)
}
if testSetup.server, err = internal.NewServer(testSetup.rpcTest.Gsrv, testReadHandler, &TestWriteHandler{}); err != nil {
log.Fatalf("internal.NewServer: %v", err)
}
testSetup.rpcTest.Start()
conn, err := grpc.Dial(testSetup.rpcTest.Addr, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dial: %v", err)
}
testSetup.client = NewClient(conn, grpc.FailFast(true))
return testSetup
}
func (testSetup *TestSetup) Close() {
testSetup.rpcTest.Close()
}

View File

@@ -0,0 +1,94 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bytestream
import (
"bytes"
"fmt"
"io"
"log"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
func ExampleNewClient(serverPort int, resourceName string) {
ctx := context.Background()
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", serverPort), grpc.WithInsecure())
if err != nil {
log.Printf("grpc.Dial: %v", err)
return
}
client := NewClient(conn)
reader, err := client.NewReader(ctx, resourceName)
if err != nil {
log.Printf("NewReader(%q): %v", resourceName, err)
}
var buf bytes.Buffer
n, err := buf.ReadFrom(reader)
if err != nil && err != io.EOF {
log.Printf("Read %d bytes, got err=%v", n, err)
}
log.Printf("read %q", string(buf.Bytes()))
}
func ExampleNewReader(serverPort int, resourceName string) {
ctx := context.Background()
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", serverPort), grpc.WithInsecure())
if err != nil {
log.Printf("grpc.Dial: %v", err)
return
}
client := NewClient(conn)
reader, err := client.NewReader(ctx, resourceName)
if err != nil {
log.Printf("NewReader(%q): %v", resourceName, err)
}
var buf bytes.Buffer
n, err := buf.ReadFrom(reader)
if err != nil && err != io.EOF {
log.Printf("Read %d bytes, got err=%v", n, err)
}
log.Printf("read %q", string(buf.Bytes()))
}
func ExampleNewWriter(serverPort int, resourceName string) {
ctx := context.Background()
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", serverPort), grpc.WithInsecure())
if err != nil {
log.Printf("grpc.Dial: %v", err)
return
}
client := NewClient(conn)
w, err := client.NewWriter(ctx, resourceName)
if err != nil {
log.Printf("NewWriter: %v", err)
return
}
defer func() {
err := w.Close()
if err != nil {
log.Printf("Close: %v", err)
}
}()
buf := []byte("hello world")
n, err := w.Write(buf)
if err != nil {
log.Printf("Write: %v", err)
}
log.Printf("Wrote %d bytes", n)
}

View File

@@ -0,0 +1,85 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"bytes"
"io"
"log"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
type ExampleReadHandler struct {
buf []byte
name string // In this example, the service can handle one name only.
}
func (mr *ExampleReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
if mr.name == "" {
mr.name = name
log.Printf("read from name: %q", name)
} else if mr.name != name {
return nil, grpc.Errorf(codes.NotFound, "reader has name %q, name %q not allowed", mr.name, name)
}
return bytes.NewReader(mr.buf), nil
}
// Close can be a no-op.
func (mr *ExampleReadHandler) Close(ctx context.Context, name string) error {
return nil
}
type ExampleWriteHandler struct {
buf bytes.Buffer // bytes.Buffer implements io.Writer
name string // In this example, the service can handle one name only.
}
// Handle writes to a given name.
func (mw *ExampleWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
if mw.name == "" {
mw.name = name
log.Printf("write to name: %q", name)
} else if mw.name != name {
return nil, grpc.Errorf(codes.NotFound, "reader has name %q, name=%q not allowed", mw.name, name)
}
// TODO: initOffset is ignored.
return &mw.buf, nil
}
// Close can be a no-op.
func (mw *ExampleWriteHandler) Close(ctx context.Context, name string) error {
return nil
}
func ExampleNewServer() {
reader := &ExampleReadHandler{
buf: []byte("Hello World!"),
name: "foo",
}
writer := &ExampleWriteHandler{}
gsrv := grpc.NewServer()
bytestreamServer, err := NewServer(gsrv, reader, writer)
if err != nil {
log.Printf("NewServer: %v", err)
return
}
// Start accepting incoming connections.
// See gRPC docs and newGRPCServer in google.golang.org/api/transport/bytestream/client_test.go.
_ = bytestreamServer
}

View File

@@ -0,0 +1,248 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
// This file contains the server implementation of Bytestream declared at:
// https://github.com/googleapis/googleapis/blob/master/google/bytestream/bytestream.proto
//
// Bytestream uses bidirectional streaming (http://grpc.io/docs/guides/concepts.html#bidirectional-streaming-rpc).
import (
"fmt"
"io"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
pb "google.golang.org/genproto/googleapis/bytestream"
)
// ReadHandler reads from the Bytestream.
// Note: error returns must return an instance of grpc.rpcError unless otherwise handled in grpc-go/rpc_util.go.
// http://google.golang.org/grpc provides Errorf(code, fmt, ...) to create instances of grpc.rpcError.
// Note: Cancelling the context will abort the stream ("drop the connection"). Consider returning a non-nil error instead.
type ReadHandler interface {
// GetReader provides an io.ReaderAt, which will not be retained by the Server after the pb.ReadRequest.
GetReader(ctx context.Context, name string) (io.ReaderAt, error)
// Close does not have to do anything, but is here for if the io.ReaderAt wants to call Close().
Close(ctx context.Context, name string) error
}
// WriteHandler handles writes from the Bytestream. For example:
// Note: error returns must return an instance of grpc.rpcError unless otherwise handled in grpc-go/rpc_util.go.
// grpc-go/rpc_util.go provides the helper func Errorf(code, fmt, ...) to create instances of grpc.rpcError.
// Note: Cancelling the context will abort the stream ("drop the connection"). Consider returning a non-nil error instead.
type WriteHandler interface {
// GetWriter provides an io.Writer that is ready to write at initOffset.
// The io.Writer will not be retained by the Server after the pb.WriteRequest.
GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error)
// Close does not have to do anything, but is related to Server.AllowOverwrite. Or if the io.Writer simply wants a Close() call.
// Close is called when the server receives a pb.WriteRequest with finish_write = true.
// If Server.AllowOverwrite == true then Close() followed by GetWriter() for the same name indicates the name is being overwritten, even if the initOffset is different.
Close(ctx context.Context, name string) error
}
// Internal service that implements pb.ByteStreamServer. Because the methods Write() and Read() are exported for grpc to link against,
// grpcService is deliberately not exported so go code cannot call grpcService.Write() or grpcService.Read().
type grpcService struct {
parent *Server
}
// Server wraps the RPCs in pb. Use bytestream.NewServer() to create a Server.
type Server struct {
status map[string]*pb.QueryWriteStatusResponse
readHandler ReadHandler
writeHandler WriteHandler
rpc *grpcService
// AllowOverwrite controls Server behavior when a WriteRequest with finish_write = true is followed by another WriteRequest.
AllowOverwrite bool
// Bytestream allows a WriteRequest to omit the resource name, in which case it will be appended to the last WriteRequest.
LastWrittenResource string
}
// NewServer creates a new bytestream.Server using gRPC.
// gsrv is the *grpc.Server this bytestream.Server will listen on.
// readHandler handles any incoming pb.ReadRequest or nil which means all pb.ReadRequests will be rejected.
// writeHandler handles any incoming pb.WriteRequest or nil which means all pb.WriteRequests will be rejected.
// readHandler and writeHandler cannot both be nil.
func NewServer(gsrv *grpc.Server, readHandler ReadHandler, writeHandler WriteHandler) (*Server, error) {
if readHandler == nil && writeHandler == nil {
return nil, fmt.Errorf("readHandler and writeHandler cannot both be nil")
}
server := &Server{
status: make(map[string]*pb.QueryWriteStatusResponse),
readHandler: readHandler,
writeHandler: writeHandler,
rpc: &grpcService{},
}
server.rpc.parent = server
// Register a server.
pb.RegisterByteStreamServer(gsrv, server.rpc)
return server, nil
}
// Write handles the pb.ByteStream_WriteServer and sends a pb.WriteResponse
// Implements bytestream.proto "rpc Write(stream WriteRequest) returns (WriteResponse)".
func (rpc *grpcService) Write(stream pb.ByteStream_WriteServer) error {
for {
writeReq, err := stream.Recv()
if err == io.EOF {
// io.EOF errors are a non-error for the Write() caller.
return nil
} else if err != nil {
return grpc.Errorf(codes.Unknown, "stream.Recv() failed: %v", err)
}
if rpc.parent.writeHandler == nil {
return grpc.Errorf(codes.Unimplemented, "instance of NewServer(writeHandler = nil) rejects all writes")
}
status, ok := rpc.parent.status[writeReq.ResourceName]
if !ok {
// writeReq.ResourceName is a new resource name.
if writeReq.ResourceName == "" {
return grpc.Errorf(codes.InvalidArgument, "WriteRequest: empty or missing resource_name")
}
status = &pb.QueryWriteStatusResponse{
CommittedSize: writeReq.WriteOffset,
}
rpc.parent.status[writeReq.ResourceName] = status
} else {
// writeReq.ResourceName has already been seen by this server.
if status.Complete {
if !rpc.parent.AllowOverwrite {
return grpc.Errorf(codes.InvalidArgument, "%q finish_write = true already, got %d byte WriteRequest and Server.AllowOverwrite = false",
writeReq.ResourceName, len(writeReq.Data))
}
// Truncate the resource stream.
status.Complete = false
status.CommittedSize = writeReq.WriteOffset
}
}
if writeReq.WriteOffset != status.CommittedSize {
return grpc.Errorf(codes.FailedPrecondition, "%q write_offset=%d differs from server internal committed_size=%d",
writeReq.ResourceName, writeReq.WriteOffset, status.CommittedSize)
}
// WriteRequest with empty data is ok.
if len(writeReq.Data) != 0 {
writer, err := rpc.parent.writeHandler.GetWriter(stream.Context(), writeReq.ResourceName, status.CommittedSize)
if err != nil {
return grpc.Errorf(codes.Internal, "GetWriter(%q): %v", writeReq.ResourceName, err)
}
wroteLen, err := writer.Write(writeReq.Data)
if err != nil {
return grpc.Errorf(codes.Internal, "Write(%q): %v", writeReq.ResourceName, err)
}
status.CommittedSize += int64(wroteLen)
}
if writeReq.FinishWrite {
r := &pb.WriteResponse{CommittedSize: status.CommittedSize}
// Note: SendAndClose does NOT close the server stream.
if err = stream.SendAndClose(r); err != nil {
return grpc.Errorf(codes.Internal, "stream.SendAndClose(%q, WriteResponse{ %d }): %v", writeReq.ResourceName, status.CommittedSize, err)
}
status.Complete = true
if status.CommittedSize == 0 {
return grpc.Errorf(codes.FailedPrecondition, "writeHandler.Close(%q): 0 bytes written", writeReq.ResourceName)
}
if err = rpc.parent.writeHandler.Close(stream.Context(), writeReq.ResourceName); err != nil {
return grpc.Errorf(codes.Internal, "writeHandler.Close(%q): %v", writeReq.ResourceName, err)
}
}
}
}
// QueryWriteStatus implements bytestream.proto "rpc QueryWriteStatus(QueryWriteStatusRequest) returns (QueryWriteStatusResponse)".
// QueryWriteStatus returns the CommittedSize known to the server.
func (rpc *grpcService) QueryWriteStatus(ctx context.Context, request *pb.QueryWriteStatusRequest) (*pb.QueryWriteStatusResponse, error) {
s, ok := rpc.parent.status[request.ResourceName]
if !ok {
return nil, grpc.Errorf(codes.NotFound, "resource_name not found: QueryWriteStatusRequest %v", request)
}
return s, nil
}
func (rpc *grpcService) readFrom(request *pb.ReadRequest, reader io.ReaderAt, stream pb.ByteStream_ReadServer) error {
limit := int(request.ReadLimit)
if limit < 0 {
return grpc.Errorf(codes.InvalidArgument, "Read(): read_limit=%d is invalid", limit)
}
offset := request.ReadOffset
if offset < 0 {
return grpc.Errorf(codes.InvalidArgument, "Read(): offset=%d is invalid", offset)
}
var buf []byte
if limit > 0 {
buf = make([]byte, limit)
} else {
buf = make([]byte, 1024*1024) // 1M buffer is reasonable.
}
bytesSent := 0
for limit == 0 || bytesSent < limit {
n, err := reader.ReadAt(buf, offset)
if n > 0 {
if err := stream.Send(&pb.ReadResponse{Data: buf[:n]}); err != nil {
return grpc.Errorf(grpc.Code(err), "Send(resourceName=%q offset=%d): %v", request.ResourceName, offset, grpc.ErrorDesc(err))
}
} else if err == nil {
return grpc.Errorf(codes.Internal, "nil error on empty read: io.ReaderAt contract violated")
}
offset += int64(n)
bytesSent += n
if err == io.EOF {
break
}
if err != nil {
return grpc.Errorf(codes.Unknown, "ReadAt(resourceName=%q offset=%d): %v", request.ResourceName, offset, err)
}
}
return nil
}
// Read handles a pb.ReadRequest sending bytes to the pb.ByteStream_ReadServer
// Implements bytestream.proto "rpc Read(ReadRequest) returns (stream ReadResponse)"
func (rpc *grpcService) Read(request *pb.ReadRequest, stream pb.ByteStream_ReadServer) error {
if rpc.parent.readHandler == nil {
return grpc.Errorf(codes.Unimplemented, "instance of NewServer(readHandler = nil) rejects all reads")
}
if request == nil {
return grpc.Errorf(codes.Internal, "Read(ReadRequest == nil)")
}
if request.ResourceName == "" {
return grpc.Errorf(codes.InvalidArgument, "ReadRequest: empty or missing resource_name")
}
reader, err := rpc.parent.readHandler.GetReader(stream.Context(), request.ResourceName)
if err != nil {
return err
}
if err = rpc.readFrom(request, reader, stream); err != nil {
rpc.parent.readHandler.Close(stream.Context(), request.ResourceName)
return err
}
if err = rpc.parent.readHandler.Close(stream.Context(), request.ResourceName); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,850 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"sync"
"testing"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "google.golang.org/genproto/googleapis/bytestream"
)
const (
testName = "testName"
testData = "0123456789"
)
var (
setupServerOnce sync.Once
server *Server
)
func TestNewServerWithInvalidInputs(t *testing.T) {
_, err := NewServer(grpc.NewServer(), nil, nil)
if err == nil {
t.Fatal("NewServer(nil, nil) should not succeed")
}
}
func TestServerWrite(t *testing.T) {
testCases := []struct {
name string
writeHandler WriteHandler
input []interface{}
writeCount int
allowEmptyCommits bool
allowOverwrite bool
wantErr bool
wantResponse int
}{
{
name: "empty resource name",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
FinishWrite: true,
Data: []byte(testData),
},
},
writeCount: 1,
wantErr: true,
wantResponse: 0,
}, {
name: "Recv returns io.EOF",
writeHandler: &TestWriteHandler{},
input: []interface{}{
io.EOF,
},
writeCount: 1,
wantErr: false,
wantResponse: 0,
}, {
name: "Recv returns error, 0 WriteRequests",
writeHandler: &TestWriteHandler{},
input: []interface{}{
errors.New("Recv returns error, 0 WriteRequests"),
},
writeCount: 1,
wantErr: true,
wantResponse: 0,
}, {
name: "simple test",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
io.EOF,
},
writeCount: 1,
wantResponse: 1,
}, {
name: "Recv returns error, 1 WriteRequests",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: false,
Data: []byte(testData),
},
errors.New("Recv returns error, 1 WriteRequests"),
},
writeCount: 1,
wantErr: true,
wantResponse: 0,
}, {
name: "attempt to overwrite the same name",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
io.EOF,
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
},
writeCount: 2,
wantErr: true,
wantResponse: 1,
}, {
name: "overwrite with the same name + AllowOverwrite",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
io.EOF,
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
io.EOF,
},
writeCount: 2,
allowOverwrite: true,
wantResponse: 2,
}, {
name: "two WriteRequests - 1st is empty",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: false,
Data: nil,
},
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
io.EOF,
},
writeCount: 1,
wantResponse: 1,
allowEmptyCommits: true,
}, {
name: "two WriteRequests - 2nd is empty",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: false,
Data: []byte(testData),
},
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: int64(len(testData)),
FinishWrite: true,
Data: nil,
},
io.EOF,
},
writeCount: 1,
wantResponse: 1,
}, {
name: "two WriteRequests - all empty",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: false,
Data: nil,
},
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: nil,
},
},
writeCount: 1,
wantErr: true,
wantResponse: 1,
allowEmptyCommits: true,
}, {
name: "two WriteRequests - varying offset",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 100,
FinishWrite: false,
Data: []byte(testData),
},
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 100 + int64(len(testData)),
FinishWrite: true,
Data: []byte(testData),
},
io.EOF,
},
writeCount: 1,
wantResponse: 1,
}, {
name: "two WriteRequests - disjoint offset",
writeHandler: &TestWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 100,
FinishWrite: false,
Data: []byte(testData),
},
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 200,
FinishWrite: true,
Data: []byte(testData),
},
},
writeCount: 1,
wantErr: true,
wantResponse: 0,
}, {
name: "fails with UngettableWriteHandler",
writeHandler: &UngettableWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
},
writeCount: 1,
wantErr: true,
}, {
name: "fails with UnwritableWriteHandler",
writeHandler: &UnwritableWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
},
writeCount: 1,
wantErr: true,
}, {
name: "fails with UnclosableWriteHandler",
writeHandler: &UnclosableWriteHandler{},
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
},
writeCount: 1,
wantErr: true,
wantResponse: 1,
}, {
name: "fails with nil WriteHandler",
writeHandler: nil,
input: []interface{}{
&pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
},
},
writeCount: 1,
wantErr: true,
},
}
ctx := context.Background()
for _, tc := range testCases {
readHandler := &TestReadHandler{}
if tc.writeHandler != nil {
readHandler = nil
}
setupServer(readHandler, tc.writeHandler)
server.AllowOverwrite = tc.allowOverwrite
var requestCount, responseCount int
var err error
for i := 0; i < tc.writeCount; i++ {
err = server.rpc.Write(&fakeWriteServerImpl{
ctx: ctx,
receiver: func() (*pb.WriteRequest, error) {
if requestCount >= len(tc.input) {
t.Fatalf("%s: got %d call(s) to Recv, want %d from len(input)", tc.name, requestCount+1, len(tc.input))
}
v := tc.input[requestCount]
requestCount++
request, ok := v.(*pb.WriteRequest)
if ok {
return request, nil
}
err, ok := v.(error)
if !ok {
t.Fatalf("%s: unknown input: %v", tc.name, v)
}
return nil, err
},
sender: func(response *pb.WriteResponse) error {
if !tc.allowEmptyCommits && response.CommittedSize == 0 {
t.Fatalf("%s: invalid response: WriteResponse %v", tc.name, response)
}
responseCount++
return nil
},
})
gotErr := (err != nil)
if i+1 < tc.writeCount {
if gotErr {
t.Errorf("%s: Write got err=%v, wantErr=%t, but on Write[%d/%d]. Error should not happen until last call to Write.", tc.name, err, tc.wantErr, i+1, tc.writeCount)
break // The t.Errorf conditions below may erroneously fire, pay them no mind.
}
} else if gotErr != tc.wantErr {
t.Errorf("%s: Write got err=%v, wantErr=%t", tc.name, err, tc.wantErr)
break // The t.Errorf conditions below may erroneously fire, pay them no mind.
}
}
if requestCount != len(tc.input) {
t.Errorf("%s: got %d call(s) to Recv, want %d", tc.name, requestCount, len(tc.input))
}
if responseCount != tc.wantResponse {
t.Errorf("%s: got %d call(s) to SendProto, want %d", tc.name, responseCount, tc.wantResponse)
}
}
}
func TestServerWrite_SendAndCloseError(t *testing.T) {
const (
wantRequest = 2
wantResponse = 1
)
ctx := context.Background()
setupServer(nil, &TestWriteHandler{})
var requestCount, responseCount int
err := server.rpc.Write(&fakeWriteServerImpl{
ctx: ctx,
receiver: func() (*pb.WriteRequest, error) {
if requestCount >= wantRequest {
t.Fatalf("got %d call(s) to Recv, want %d", requestCount+1, wantRequest)
}
requestCount++
return &pb.WriteRequest{
ResourceName: testName,
WriteOffset: 0,
FinishWrite: true,
Data: []byte(testData),
}, nil
},
sender: func(response *pb.WriteResponse) error {
responseCount++
return errors.New("TestServerWrite SendProto error")
},
})
if err == nil {
t.Errorf("Write should have failed, but succeeded")
}
if requestCount != wantRequest {
t.Errorf("got %d call(s) to Recv, want %d", requestCount, wantRequest)
}
if responseCount != wantResponse {
t.Errorf("got %d call(s) to SendProto, want %d", responseCount, wantResponse)
}
}
func TestQueryWriteStatus(t *testing.T) {
testCases := []struct {
name string
existingName string
requestName string
wantErr bool
}{
{
name: "existing name should work",
existingName: testName,
requestName: testName,
}, {
name: "missing name should break",
existingName: testName,
requestName: "invalidName",
wantErr: true,
},
}
ctx := context.Background()
for _, tc := range testCases {
setupServer(nil, &TestWriteHandler{})
server.status[tc.existingName] = &pb.QueryWriteStatusResponse{}
_, err := server.rpc.QueryWriteStatus(ctx, &pb.QueryWriteStatusRequest{
ResourceName: tc.requestName,
})
if gotErr := (err != nil); gotErr != tc.wantErr {
t.Errorf("%s: QueryWriteStatus(%q) got err=%v, wantErr=%t", tc.name, tc.requestName, err, tc.wantErr)
}
}
}
func TestServerRead(t *testing.T) {
testCases := []struct {
name string
readHandler ReadHandler
input *pb.ReadRequest
readCount int
wantErr bool
wantResponse []string
}{
{
name: "empty resource name",
readHandler: &TestReadHandler{},
input: &pb.ReadRequest{
ReadLimit: 1,
},
readCount: 1,
wantErr: true,
wantResponse: []string{},
}, {
name: "test ReadLimit=-1",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: -1,
},
readCount: 1,
wantErr: true,
}, {
name: "test ReadLimit=1",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: 1,
},
readCount: 1,
wantResponse: []string{"0"},
}, {
name: "test ReadLimit=2",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: 2,
},
readCount: 1,
wantResponse: []string{"01"},
}, {
name: "test ReadOffset=1 ReadLimit=2",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 1,
ReadLimit: 2,
},
readCount: 1,
wantResponse: []string{"12"},
}, {
name: "test ReadOffset=2 ReadLimit=2",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 2,
ReadLimit: 2,
},
readCount: 1,
wantResponse: []string{"23"},
}, {
name: "read all testData at exactly the limit",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: int64(len(testData)),
},
readCount: 1,
wantResponse: []string{"0123456789"},
}, {
name: "read all testData",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: int64(len(testData)) * 2,
},
readCount: 1,
wantResponse: []string{"0123456789"},
}, {
name: "read all testData 2 times",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: int64(len(testData)) * 2,
},
readCount: 2,
wantResponse: []string{"0123456789", "0123456789"},
}, {
name: "test ReadLimit=0",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: 0,
},
readCount: 1,
wantResponse: []string{"0123456789"},
}, {
name: "test ReadLimit=1000",
readHandler: &TestReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: 1000,
},
readCount: 1,
wantResponse: []string{"0123456789"},
}, {
name: "fails with UngettableReadHandler",
readHandler: &UngettableReadHandler{},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: int64(len(testData)),
},
readCount: 1,
wantErr: true,
}, {
name: "fails with UnreadableReadHandler",
readHandler: &UnreadableReadHandler{},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: int64(len(testData)),
},
readCount: 1,
wantErr: true,
}, {
name: "fails with UnclosableReadHandler",
readHandler: &UnclosableReadHandler{buf: testData},
input: &pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: int64(len(testData)) * 2,
},
readCount: 1,
wantErr: true,
wantResponse: []string{"0123456789"},
}, {
name: "fails with nil ReadRequest",
readHandler: &TestReadHandler{buf: testData},
readCount: 1,
wantErr: true,
}, {
name: "fails with nil ReadHandler",
readHandler: nil,
readCount: 1,
wantErr: true,
},
}
ctx := context.Background()
for _, tc := range testCases {
var writeHandler WriteHandler
if tc.readHandler == nil {
writeHandler = &TestWriteHandler{}
}
setupServer(tc.readHandler, writeHandler)
var responseCount int
var err error
for i := 0; i < tc.readCount; i++ {
err = server.rpc.Read(tc.input, &fakeReadServerImpl{
ctx: ctx,
sender: func(response *pb.ReadResponse) error {
if responseCount >= len(tc.wantResponse) {
t.Fatalf("%s: got %d call(s) to Send(), want %d", tc.name, responseCount+1, len(tc.wantResponse))
}
if got, want := string(response.Data), tc.wantResponse[responseCount]; got != want {
t.Fatalf("%s: response[%d] got %q, want %q", tc.name, responseCount, got, want)
}
responseCount++
return nil
},
})
gotErr := (err != nil)
if i+1 < tc.readCount {
if gotErr {
t.Errorf("%s: Read got err=%v, wantErr=%t, but on Read[%d/%d]. Error should not happen until last call to Read", tc.name, err, tc.wantErr, i+1, tc.readCount)
break
}
} else if gotErr != tc.wantErr {
t.Errorf("%s: Read got err=%v, wantErr=%t", tc.name, err, tc.wantErr)
break
}
}
if responseCount != len(tc.wantResponse) {
t.Errorf("%s: got %d call(s) to Send, want %d", tc.name, responseCount, len(tc.wantResponse))
}
}
}
func TestServerRead_SendError(t *testing.T) {
setupServer(&TestReadHandler{buf: testData}, nil)
err := server.rpc.Read(&pb.ReadRequest{
ResourceName: testName,
ReadOffset: 0,
ReadLimit: int64(len(testData)) * 2,
}, &fakeReadServerImpl{
ctx: context.Background(),
sender: func(response *pb.ReadResponse) error {
if string(response.Data) != testData {
t.Fatalf("Send: got %v, want %q", response, testData)
}
return errors.New("TestServerRead Send() error")
},
})
if err == nil {
t.Fatal("Read() should have failed, but succeeded")
}
}
type fakeWriteServerImpl struct {
pb.ByteStream_WriteServer
ctx context.Context
receiver func() (*pb.WriteRequest, error)
sender func(*pb.WriteResponse) error
}
func (fake *fakeWriteServerImpl) Context() context.Context {
return fake.ctx
}
func (fake *fakeWriteServerImpl) Recv() (*pb.WriteRequest, error) {
return fake.receiver()
}
func (fake *fakeWriteServerImpl) SendMsg(m interface{}) error {
return fake.sender(m.(*pb.WriteResponse))
}
func (fake *fakeWriteServerImpl) SendAndClose(m *pb.WriteResponse) error {
fake.sender(m)
return nil
}
type fakeReadServerImpl struct {
pb.ByteStream_ReadServer
ctx context.Context
sender func(*pb.ReadResponse) error
}
func (fake *fakeReadServerImpl) Context() context.Context {
return fake.ctx
}
func (fake *fakeReadServerImpl) Send(response *pb.ReadResponse) error {
return fake.sender(response)
}
type TestWriteHandler struct {
buf bytes.Buffer // bytes.Buffer implements io.Writer
name string // This service can handle one name only.
}
func (w *TestWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
if w.name == "" {
w.name = name
} else if w.name != name {
return nil, fmt.Errorf("writer already has name=%q, now a new name=%q confuses me", w.name, name)
}
// initOffset is ignored.
return &w.buf, nil
}
func (w *TestWriteHandler) Close(ctx context.Context, name string) error {
w.name = ""
w.buf.Reset()
return nil
}
type UngettableWriteHandler struct{}
func (w *UngettableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
return nil, errors.New("UngettableWriteHandler.GetWriter() always fails")
}
func (w *UngettableWriteHandler) Close(ctx context.Context, name string) error {
return nil
}
type UnwritableWriter struct{}
func (w *UnwritableWriter) Write(p []byte) (int, error) {
return 0, errors.New("UnwritableWriter.Write() always fails")
}
type UnwritableWriteHandler struct{}
func (w *UnwritableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
return &UnwritableWriter{}, nil
}
func (w *UnwritableWriteHandler) Close(ctx context.Context, name string) error {
return nil
}
type UnclosableWriter struct{}
func (w *UnclosableWriter) Write(p []byte) (int, error) {
return len(p), nil
}
type UnclosableWriteHandler struct{}
func (w *UnclosableWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
return &UnclosableWriter{}, nil
}
func (w *UnclosableWriteHandler) Close(ctx context.Context, name string) error {
return errors.New("UnclosableWriteHandler.Close() always fails")
}
type TestReadHandler struct {
buf string
name string // This service can handle one name only.
}
// GetWriter() returns an io.ReaderAt to accept reads from the given name.
func (r *TestReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
if r.name == "" {
r.name = name
} else if r.name != name {
return nil, fmt.Errorf("reader already has name=%q, now a new name=%q confuses me", r.name, name)
}
return bytes.NewReader([]byte(r.buf)), nil
}
// Close does nothing.
func (r *TestReadHandler) Close(ctx context.Context, name string) error {
return nil
}
type UngettableReadHandler struct{}
func (r *UngettableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
return nil, errors.New("UngettableReadHandler.GetReader() always fails")
}
func (r *UngettableReadHandler) Close(ctx context.Context, name string) error {
return nil
}
type UnreadableReader struct{}
func (r *UnreadableReader) ReadAt(p []byte, offset int64) (int, error) {
return 0, errors.New("UnreadableReader.ReadAt() always fails")
}
type UnreadableReadHandler struct{}
func (r *UnreadableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
return &UnreadableReader{}, nil
}
func (r *UnreadableReadHandler) Close(ctx context.Context, name string) error {
return nil
}
type UnclosableReadHandler struct {
buf string
}
func (r *UnclosableReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
return bytes.NewReader([]byte(r.buf)), nil
}
func (r *UnclosableReadHandler) Close(ctx context.Context, name string) error {
return fmt.Errorf("UnclosableReader.Close(%s) always fails", name)
}
func registerServer() {
gsrv := grpc.NewServer()
var err error
server, err = NewServer(gsrv, &TestReadHandler{}, &TestWriteHandler{})
if err != nil {
log.Fatalf("NewServer() failed: %v", err)
}
}
func setupServer(readHandler ReadHandler, writeHandler WriteHandler) {
setupServerOnce.Do(registerServer)
server.status = make(map[string]*pb.QueryWriteStatusResponse)
server.readHandler = readHandler
server.writeHandler = writeHandler
}

61
vendor/google.golang.org/api/transport/dial.go generated vendored Normal file
View File

@@ -0,0 +1,61 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package transport supports network connections to HTTP and GRPC servers.
// This package is not intended for use by end developers. Use the
// google.golang.org/api/option package to configure API clients.
package transport
import (
"net/http"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/api/internal"
"google.golang.org/api/option"
gtransport "google.golang.org/api/transport/grpc"
htransport "google.golang.org/api/transport/http"
)
// NewHTTPClient returns an HTTP client for use communicating with a Google cloud
// service, configured with the given ClientOptions. It also returns the endpoint
// for the service as specified in the options.
func NewHTTPClient(ctx context.Context, opts ...option.ClientOption) (*http.Client, string, error) {
return htransport.NewClient(ctx, opts...)
}
// DialGRPC returns a GRPC connection for use communicating with a Google cloud
// service, configured with the given ClientOptions.
func DialGRPC(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
return gtransport.Dial(ctx, opts...)
}
// DialGRPCInsecure returns an insecure GRPC connection for use communicating
// with fake or mock Google cloud service implementations, such as emulators.
// The connection is configured with the given ClientOptions.
func DialGRPCInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
return gtransport.DialInsecure(ctx, opts...)
}
// Creds constructs a google.DefaultCredentials from the information in the options,
// or obtains the default credentials in the same way as google.FindDefaultCredentials.
func Creds(ctx context.Context, opts ...option.ClientOption) (*google.DefaultCredentials, error) {
var ds internal.DialSettings
for _, opt := range opts {
opt.Apply(&ds)
}
return internal.Creds(ctx, &ds)
}

86
vendor/google.golang.org/api/transport/grpc/dial.go generated vendored Normal file
View File

@@ -0,0 +1,86 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package transport/grpc supports network connections to GRPC servers.
// This package is not intended for use by end developers. Use the
// google.golang.org/api/option package to configure API clients.
package transport
import (
"errors"
"golang.org/x/net/context"
"google.golang.org/api/internal"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
)
// Set at init time by dial_appengine.go. If nil, we're not on App Engine.
var appengineDialerHook func(context.Context) grpc.DialOption
// Dial returns a GRPC connection for use communicating with a Google cloud
// service, configured with the given ClientOptions.
func Dial(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
var o internal.DialSettings
for _, opt := range opts {
opt.Apply(&o)
}
if o.HTTPClient != nil {
return nil, errors.New("unsupported HTTP client specified")
}
if o.GRPCConn != nil {
return o.GRPCConn, nil
}
creds, err := internal.Creds(ctx, &o)
if err != nil {
return nil, err
}
grpcOpts := []grpc.DialOption{
grpc.WithPerRPCCredentials(oauth.TokenSource{creds.TokenSource}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
if appengineDialerHook != nil {
// Use the Socket API on App Engine.
grpcOpts = append(grpcOpts, appengineDialerHook(ctx))
}
grpcOpts = append(grpcOpts, o.GRPCDialOpts...)
if o.UserAgent != "" {
grpcOpts = append(grpcOpts, grpc.WithUserAgent(o.UserAgent))
}
return grpc.DialContext(ctx, o.Endpoint, grpcOpts...)
}
// DialInsecure returns an insecure GRPC connection for use communicating
// with fake or mock Google cloud service implementations, such as emulators.
// The connection is configured with the given ClientOptions.
func DialInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
var o internal.DialSettings
for _, opt := range opts {
opt.Apply(&o)
}
if o.HTTPClient != nil {
return nil, errors.New("unsupported HTTP client specified")
}
if o.GRPCConn != nil {
return o.GRPCConn, nil
}
grpcOpts := []grpc.DialOption{grpc.WithInsecure()}
grpcOpts = append(grpcOpts, o.GRPCDialOpts...)
if o.UserAgent != "" {
grpcOpts = append(grpcOpts, grpc.WithUserAgent(o.UserAgent))
}
return grpc.DialContext(ctx, o.Endpoint, grpcOpts...)
}

View File

@@ -0,0 +1,34 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build appengine
package transport
import (
"net"
"time"
"golang.org/x/net/context"
"google.golang.org/appengine/socket"
"google.golang.org/grpc"
)
func init() {
appengineDialerHook = func(ctx context.Context) grpc.DialOption {
return grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return socket.DialTimeout(ctx, "tcp", addr, timeout)
})
}
}

View File

@@ -0,0 +1,66 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transport
import (
"errors"
"net"
"testing"
"time"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
// Check that user optioned grpc.WithDialer option overrides the App Engine hook.
func TestGRPCHook(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
expected := false
appengineDialerHook = (func(ctx context.Context) grpc.DialOption {
return grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
t.Error("did not expect a call to notExpected dialer, got one")
cancel()
return nil, errors.New("not expected")
})
})
defer func() {
appengineDialerHook = nil
}()
expectedDialer := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
expected = true
cancel()
return nil, errors.New("expected")
})
conn, err := Dial(ctx,
option.WithTokenSource(oauth2.StaticTokenSource(nil)), // No creds.
option.WithGRPCDialOption(expectedDialer),
option.WithEndpoint("example.google.com:443"))
if err != nil {
t.Errorf("DialGRPC: error %v, want nil", err)
}
// gRPC doesn't connect before the first call.
grpc.Invoke(ctx, "foo", nil, nil, conn)
conn.Close()
if !expected {
t.Error("expected a call to expected dialer, didn't get one")
}
}

107
vendor/google.golang.org/api/transport/http/dial.go generated vendored Normal file
View File

@@ -0,0 +1,107 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package transport/http supports network connections to HTTP servers.
// This package is not intended for use by end developers. Use the
// google.golang.org/api/option package to configure API clients.
package transport
import (
"errors"
"net/http"
"golang.org/x/net/context"
"golang.org/x/oauth2"
gtransport "google.golang.org/api/googleapi/transport"
"google.golang.org/api/internal"
"google.golang.org/api/option"
)
// NewClient returns an HTTP client for use communicating with a Google cloud
// service, configured with the given ClientOptions. It also returns the endpoint
// for the service as specified in the options.
func NewClient(ctx context.Context, opts ...option.ClientOption) (*http.Client, string, error) {
var o internal.DialSettings
for _, opt := range opts {
opt.Apply(&o)
}
if o.GRPCConn != nil {
return nil, "", errors.New("unsupported gRPC connection specified")
}
// TODO(cbro): consider injecting the User-Agent even if an explicit HTTP client is provided?
if o.HTTPClient != nil {
return o.HTTPClient, o.Endpoint, nil
}
if o.APIKey != "" {
hc := &http.Client{
Transport: &gtransport.APIKey{
Key: o.APIKey,
Transport: userAgentTransport{
base: baseTransport(ctx),
userAgent: o.UserAgent,
},
},
}
return hc, o.Endpoint, nil
}
creds, err := internal.Creds(ctx, &o)
if err != nil {
return nil, "", err
}
hc := &http.Client{
Transport: &oauth2.Transport{
Source: creds.TokenSource,
Base: userAgentTransport{
base: baseTransport(ctx),
userAgent: o.UserAgent,
},
},
}
return hc, o.Endpoint, nil
}
type userAgentTransport struct {
userAgent string
base http.RoundTripper
}
func (t userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) {
rt := t.base
if rt == nil {
return nil, errors.New("transport: no Transport specified")
}
if t.userAgent == "" {
return rt.RoundTrip(req)
}
newReq := *req
newReq.Header = make(http.Header)
for k, vv := range req.Header {
newReq.Header[k] = vv
}
// TODO(cbro): append to existing User-Agent header?
newReq.Header["User-Agent"] = []string{t.userAgent}
return rt.RoundTrip(&newReq)
}
// Set at init time by dial_appengine.go. If nil, we're not on App Engine.
var appengineUrlfetchHook func(context.Context) http.RoundTripper
// baseTransport returns the base HTTP transport.
// On App Engine, this is urlfetch.Transport, otherwise it's http.DefaultTransport.
func baseTransport(ctx context.Context) http.RoundTripper {
if appengineUrlfetchHook != nil {
return appengineUrlfetchHook(ctx)
}
return http.DefaultTransport
}

View File

@@ -0,0 +1,30 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build appengine
package transport
import (
"net/http"
"golang.org/x/net/context"
"google.golang.org/appengine/urlfetch"
)
func init() {
appengineUrlfetchHook = func(ctx context.Context) http.RoundTripper {
return &urlfetch.Transport{Context: ctx}
}
}