Commit b98f5db7 authored by 张雷雨's avatar 张雷雨

tj

parents
language: go
matrix:
include:
- go: 1.12.x
env: GO111MODULE=on
- go: 1.11.x
env: VET=1 GO111MODULE=on
- go: 1.11.x
env: RACE=1 GO111MODULE=on
- go: 1.11.x
env: RUN386=1
- go: 1.11.x
env: GRPC_GO_RETRY=on
- go: 1.10.x
- go: 1.9.x
- go: 1.9.x
env: GAE=1
go_import_path: google.golang.org/grpc
before_install:
- if [[ "${GO111MODULE}" = "on" ]]; then mkdir "${HOME}/go"; export GOPATH="${HOME}/go"; fi
- if [[ -n "${RUN386}" ]]; then export GOARCH=386; fi
- if [[ "${TRAVIS_EVENT_TYPE}" = "cron" && -z "${RUN386}" ]]; then RACE=1; fi
- if [[ "${TRAVIS_EVENT_TYPE}" != "cron" ]]; then VET_SKIP_PROTO=1; fi
install:
- try3() { eval "$*" || eval "$*" || eval "$*"; }
- try3 'if [[ "${GO111MODULE}" = "on" ]]; then go mod download; else make testdeps; fi'
- if [[ "${GAE}" = 1 ]]; then source ./install_gae.sh; make testappenginedeps; fi
- if [[ "${VET}" = 1 ]]; then ./vet.sh -install; fi
script:
- set -e
- if [[ "${VET}" = 1 ]]; then ./vet.sh; fi
- if [[ "${GAE}" = 1 ]]; then make testappengine; exit 0; fi
- if [[ "${RACE}" = 1 ]]; then make testrace; exit 0; fi
- make test
Google Inc.
# How to contribute
We definitely welcome your patches and contributions to gRPC!
If you are new to github, please start by reading [Pull Request howto](https://help.github.com/articles/about-pull-requests/)
## Legal requirements
In order to protect both you and ourselves, you will need to sign the
[Contributor License Agreement](https://identity.linuxfoundation.org/projects/cncf).
## Guidelines for Pull Requests
How to get your contributions merged smoothly and quickly.
- Create **small PRs** that are narrowly focused on **addressing a single concern**. We often times receive PRs that are trying to fix several things at a time, but only one fix is considered acceptable, nothing gets merged and both author's & review's time is wasted. Create more PRs to address different concerns and everyone will be happy.
- For speculative changes, consider opening an issue and discussing it first. If you are suggesting a behavioral or API change, consider starting with a [gRFC proposal](https://github.com/grpc/proposal).
- Provide a good **PR description** as a record of **what** change is being made and **why** it was made. Link to a github issue if it exists.
- Don't fix code style and formatting unless you are already changing that line to address an issue. PRs with irrelevant changes won't be merged. If you do want to fix formatting or style, do that in a separate PR.
- Unless your PR is trivial, you should expect there will be reviewer comments that you'll need to address before merging. We expect you to be reasonably responsive to those comments, otherwise the PR will be closed after 2-3 weeks of inactivity.
- Maintain **clean commit history** and use **meaningful commit messages**. PRs with messy commit history are difficult to review and won't be merged. Use `rebase -i upstream/master` to curate your commit history and/or to bring in latest changes from master (but avoid rebasing in the middle of a code review).
- Keep your PR up to date with upstream/master (if there are merge conflicts, we can't really merge your change).
- **All tests need to be passing** before your change can be merged. We recommend you **run tests locally** before creating your PR to catch breakages early on.
- `make all` to test everything, OR
- `make vet` to catch vet errors
- `make test` to run the tests
- `make testrace` to run tests in race mode
- optional `make testappengine` to run tests with appengine
- Exceptions to the rules can be made if there's a compelling reason for doing so.
# Compression
The preferred method for configuring message compression on both clients and
servers is to use
[`encoding.RegisterCompressor`](https://godoc.org/google.golang.org/grpc/encoding#RegisterCompressor)
to register an implementation of a compression algorithm. See
`grpc/encoding/gzip/gzip.go` for an example of how to implement one.
Once a compressor has been registered on the client-side, RPCs may be sent using
it via the
[`UseCompressor`](https://godoc.org/google.golang.org/grpc#UseCompressor)
`CallOption`. Remember that `CallOption`s may be turned into defaults for all
calls from a `ClientConn` by using the
[`WithDefaultCallOptions`](https://godoc.org/google.golang.org/grpc#WithDefaultCallOptions)
`DialOption`. If `UseCompressor` is used and the corresponding compressor has
not been installed, an `Internal` error will be returned to the application
before the RPC is sent.
Server-side, registered compressors will be used automatically to decode request
messages and encode the responses. Servers currently always respond using the
same compression method specified by the client. If the corresponding
compressor has not been registered, an `Unimplemented` status will be returned
to the client.
## Deprecated API
There is a deprecated API for setting compression as well. It is not
recommended for use. However, if you were previously using it, the following
section may be helpful in understanding how it works in combination with the new
API.
### Client-Side
There are two legacy functions and one new function to configure compression:
```go
func WithCompressor(grpc.Compressor) DialOption {}
func WithDecompressor(grpc.Decompressor) DialOption {}
func UseCompressor(name) CallOption {}
```
For outgoing requests, the following rules are applied in order:
1. If `UseCompressor` is used, messages will be compressed using the compressor
named.
* If the compressor named is not registered, an Internal error is returned
back to the client before sending the RPC.
* If UseCompressor("identity"), no compressor will be used, but "identity"
will be sent in the header to the server.
1. If `WithCompressor` is used, messages will be compressed using that
compressor implementation.
1. Otherwise, outbound messages will be uncompressed.
For incoming responses, the following rules are applied in order:
1. If `WithDecompressor` is used and it matches the message's encoding, it will
be used.
1. If a registered compressor matches the response's encoding, it will be used.
1. Otherwise, the stream will be closed and an `Unimplemented` status error will
be returned to the application.
### Server-Side
There are two legacy functions to configure compression:
```go
func RPCCompressor(grpc.Compressor) ServerOption {}
func RPCDecompressor(grpc.Decompressor) ServerOption {}
```
For incoming requests, the following rules are applied in order:
1. If `RPCDecompressor` is used and that decompressor matches the request's
encoding: it will be used.
1. If a registered compressor matches the request's encoding, it will be used.
1. Otherwise, an `Unimplemented` status will be returned to the client.
For outgoing responses, the following rules are applied in order:
1. If `RPCCompressor` is used, that compressor will be used to compress all
response messages.
1. If compression was used for the incoming request and a registered compressor
supports it, that same compression method will be used for the outgoing
response.
1. Otherwise, no compression will be used for the outgoing response.
# Concurrency
In general, gRPC-go provides a concurrency-friendly API. What follows are some
guidelines.
## Clients
A [ClientConn][client-conn] can safely be accessed concurrently. Using
[helloworld][helloworld] as an example, one could share the `ClientConn` across
multiple goroutines to create multiple `GreeterClient` types. In this case, RPCs
would be sent in parallel.
## Streams
When using streams, one must take care to avoid calling either `SendMsg` or
`RecvMsg` multiple times against the same [Stream][stream] from different
goroutines. In other words, it's safe to have a goroutine calling `SendMsg` and
another goroutine calling `RecvMsg` on the same stream at the same time. But it
is not safe to call `SendMsg` on the same stream in different goroutines, or to
call `RecvMsg` on the same stream in different goroutines.
## Servers
Each RPC handler attached to a registered server will be invoked in its own
goroutine. For example, [SayHello][say-hello] will be invoked in its own
goroutine. The same is true for service handlers for streaming RPCs, as seen
in the route guide example [here][route-guide-stream].
[helloworld]: https://github.com/grpc/grpc-go/blob/master/examples/helloworld/greeter_client/main.go#L43
[client-conn]: https://godoc.org/google.golang.org/grpc#ClientConn
[stream]: https://godoc.org/google.golang.org/grpc#Stream
[say-hello]: https://github.com/grpc/grpc-go/blob/master/examples/helloworld/greeter_server/main.go#L41
[route-guide-stream]: https://github.com/grpc/grpc-go/blob/master/examples/route_guide/server/server.go#L126
# Encoding
The gRPC API for sending and receiving is based upon *messages*. However,
messages cannot be transmitted directly over a network; they must first be
converted into *bytes*. This document describes how gRPC-Go converts messages
into bytes and vice-versa for the purposes of network transmission.
## Codecs (Serialization and Deserialization)
A `Codec` contains code to serialize a message into a byte slice (`Marshal`) and
deserialize a byte slice back into a message (`Unmarshal`). `Codec`s are
registered by name into a global registry maintained in the `encoding` package.
### Implementing a `Codec`
A typical `Codec` will be implemented in its own package with an `init` function
that registers itself, and is imported anonymously. For example:
```go
package proto
import "google.golang.org/grpc/encoding"
func init() {
encoding.RegisterCodec(protoCodec{})
}
// ... implementation of protoCodec ...
```
For an example, gRPC's implementation of the `proto` codec can be found in
[`encoding/proto`](https://godoc.org/google.golang.org/grpc/encoding/proto).
### Using a `Codec`
By default, gRPC registers and uses the "proto" codec, so it is not necessary to
do this in your own code to send and receive proto messages. To use another
`Codec` from a client or server:
```go
package myclient
import _ "path/to/another/codec"
```
`Codec`s, by definition, must be symmetric, so the same desired `Codec` should
be registered in both client and server binaries.
On the client-side, to specify a `Codec` to use for message transmission, the
`CallOption` `CallContentSubtype` should be used as follows:
```go
response, err := myclient.MyCall(ctx, request, grpc.CallContentSubtype("mycodec"))
```
As a reminder, all `CallOption`s may be converted into `DialOption`s that become
the default for all RPCs sent through a client using `grpc.WithDefaultCallOptions`:
```go
myclient := grpc.Dial(ctx, target, grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mycodec")))
```
When specified in either of these ways, messages will be encoded using this
codec and sent along with headers indicating the codec (`content-type` set to
`application/grpc+<codec name>`).
On the server-side, using a `Codec` is as simple as registering it into the
global registry (i.e. `import`ing it). If a message is encoded with the content
sub-type supported by a registered `Codec`, it will be used automatically for
decoding the request and encoding the response. Otherwise, for
backward-compatibility reasons, gRPC will attempt to use the "proto" codec. In
an upcoming change (tracked in [this
issue](https://github.com/grpc/grpc-go/issues/1824)), such requests will be
rejected with status code `Unimplemented` instead.
## Compressors (Compression and Decompression)
Sometimes, the resulting serialization of a message is not space-efficient, and
it may be beneficial to compress this byte stream before transmitting it over
the network. To facilitate this operation, gRPC supports a mechanism for
performing compression and decompression.
A `Compressor` contains code to compress and decompress by wrapping `io.Writer`s
and `io.Reader`s, respectively. (The form of `Compress` and `Decompress` were
chosen to most closely match Go's standard package
[implementations](https://golang.org/pkg/compress/) of compressors. Like
`Codec`s, `Compressor`s are registered by name into a global registry maintained
in the `encoding` package.
### Implementing a `Compressor`
A typical `Compressor` will be implemented in its own package with an `init`
function that registers itself, and is imported anonymously. For example:
```go
package gzip
import "google.golang.org/grpc/encoding"
func init() {
encoding.RegisterCompressor(compressor{})
}
// ... implementation of compressor ...
```
An implementation of a `gzip` compressor can be found in
[`encoding/gzip`](https://godoc.org/google.golang.org/grpc/encoding/gzip).
### Using a `Compressor`
By default, gRPC does not register or use any compressors. To use a
`Compressor` from a client or server:
```go
package myclient
import _ "google.golang.org/grpc/encoding/gzip"
```
`Compressor`s, by definition, must be symmetric, so the same desired
`Compressor` should be registered in both client and server binaries.
On the client-side, to specify a `Compressor` to use for message transmission,
the `CallOption` `UseCompressor` should be used as follows:
```go
response, err := myclient.MyCall(ctx, request, grpc.UseCompressor("gzip"))
```
As a reminder, all `CallOption`s may be converted into `DialOption`s that become
the default for all RPCs sent through a client using `grpc.WithDefaultCallOptions`:
```go
myclient := grpc.Dial(ctx, target, grpc.WithDefaultCallOptions(grpc.UseCompresor("gzip")))
```
When specified in either of these ways, messages will be compressed using this
compressor and sent along with headers indicating the compressor
(`content-coding` set to `<compressor name>`).
On the server-side, using a `Compressor` is as simple as registering it into the
global registry (i.e. `import`ing it). If a message is compressed with the
content coding supported by a registered `Compressor`, it will be used
automatically for decompressing the request and compressing the response.
Otherwise, the request will be rejected with status code `Unimplemented`.
# Mocking Service for gRPC
[Example code unary RPC](https://github.com/grpc/grpc-go/tree/master/examples/helloworld/mock_helloworld)
[Example code streaming RPC](https://github.com/grpc/grpc-go/tree/master/examples/route_guide/mock_routeguide)
## Why?
To test client-side logic without the overhead of connecting to a real server. Mocking enables users to write light-weight unit tests to check functionalities on client-side without invoking RPC calls to a server.
## Idea: Mock the client stub that connects to the server.
We use Gomock to mock the client interface (in the generated code) and programmatically set its methods to expect and return pre-determined values. This enables users to write tests around the client logic and use this mocked stub while making RPC calls.
## How to use Gomock?
Documentation on Gomock can be found [here](https://github.com/golang/mock).
A quick reading of the documentation should enable users to follow the code below.
Consider a gRPC service based on following proto file:
```proto
//helloworld.proto
package helloworld;
message HelloRequest {
string name = 1;
}
message HelloReply {
string name = 1;
}
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
```
The generated file helloworld.pb.go will have a client interface for each service defined in the proto file. This interface will have methods corresponding to each rpc inside that service.
```Go
type GreeterClient interface {
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}
```
The generated code also contains a struct that implements this interface.
```Go
type greeterClient struct {
cc *grpc.ClientConn
}
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error){
// ...
// gRPC specific code here
// ...
}
```
Along with this the generated code has a method to create an instance of this struct.
```Go
func NewGreeterClient(cc *grpc.ClientConn) GreeterClient
```
The user code uses this function to create an instance of the struct greeterClient which then can be used to make rpc calls to the server.
We will mock this interface GreeterClient and use an instance of that mock to make rpc calls. These calls instead of going to server will return pre-determined values.
To create a mock we’ll use [mockgen](https://github.com/golang/mock#running-mockgen).
From the directory ``` examples/helloworld/ ``` run ``` mockgen google.golang.org/grpc/examples/helloworld/helloworld GreeterClient > mock_helloworld/hw_mock.go ```
Notice that in the above command we specify GreeterClient as the interface to be mocked.
The user test code can import the package generated by mockgen along with library package gomock to write unit tests around client-side logic.
```Go
import "github.com/golang/mock/gomock"
import hwmock "google.golang.org/grpc/examples/helloworld/mock_helloworld"
```
An instance of the mocked interface can be created as:
```Go
mockGreeterClient := hwmock.NewMockGreeterClient(ctrl)
```
This mocked object can be programmed to expect calls to its methods and return pre-determined values. For instance, we can program mockGreeterClient to expect a call to its method SayHello and return a HelloReply with message “Mocked RPC”.
```Go
mockGreeterClient.EXPECT().SayHello(
gomock.Any(), // expect any value for first parameter
gomock.Any(), // expect any value for second parameter
).Return(&helloworld.HelloReply{Message: “Mocked RPC”}, nil)
```
gomock.Any() indicates that the parameter can have any value or type. We can indicate specific values for built-in types with gomock.Eq().
However, if the test code needs to specify the parameter to have a proto message type, we can replace gomock.Any() with an instance of a struct that implements gomock.Matcher interface.
```Go
type rpcMsg struct {
msg proto.Message
}
func (r *rpcMsg) Matches(msg interface{}) bool {
m, ok := msg.(proto.Message)
if !ok {
return false
}
return proto.Equal(m, r.msg)
}
func (r *rpcMsg) String() string {
return fmt.Sprintf("is %s", r.msg)
}
...
req := &helloworld.HelloRequest{Name: "unit_test"}
mockGreeterClient.EXPECT().SayHello(
gomock.Any(),
&rpcMsg{msg: req},
).Return(&helloworld.HelloReply{Message: "Mocked Interface"}, nil)
```
## Mock streaming RPCs:
For our example we consider the case of bi-directional streaming RPCs. Concretely, we'll write a test for RouteChat function from the route guide example to demonstrate how to write mocks for streams.
RouteChat is a bi-directional streaming RPC, which means calling RouteChat returns a stream that can __Send__ and __Recv__ messages to and from the server, respectively. We'll start by creating a mock of this stream interface returned by RouteChat and then we'll mock the client interface and set expectation on the method RouteChat to return our mocked stream.
### Generating mocking code:
Like before we'll use [mockgen](https://github.com/golang/mock#running-mockgen). From the `examples/route_guide` directory run: `mockgen google.golang.org/grpc/examples/route_guide/routeguide RouteGuideClient,RouteGuide_RouteChatClient > mock_route_guide/rg_mock.go`
Notice that we are mocking both client(`RouteGuideClient`) and stream(`RouteGuide_RouteChatClient`) interfaces here.
This will create a file `rg_mock.go` under directory `mock_route_guide`. This file contains all the mocking code we need to write our test.
In our test code, like before, we import the this mocking code along with the generated code
```go
import (
rgmock "google.golang.org/grpc/examples/route_guide/mock_routeguide"
rgpb "google.golang.org/grpc/examples/route_guide/routeguide"
)
```
Now considering a test that takes the RouteGuide client object as a parameter, makes a RouteChat rpc call and sends a message on the resulting stream. Furthermore, this test expects to see the same message to be received on the stream.
```go
var msg = ...
// Creates a RouteChat call and sends msg on it.
// Checks if the received message was equal to msg.
func testRouteChat(client rgb.RouteChatClient) error{
...
}
```
We can inject our mock in here by simply passing it as an argument to the method.
Creating mock for stream interface:
```go
stream := rgmock.NewMockRouteGuide_RouteChatClient(ctrl)
}
```
Setting Expectations:
```go
stream.EXPECT().Send(gomock.Any()).Return(nil)
stream.EXPECT().Recv().Return(msg, nil)
```
Creating mock for client interface:
```go
rgclient := rgmock.NewMockRouteGuideClient(ctrl)
```
Setting Expectations:
```go
rgclient.EXPECT().RouteChat(gomock.Any()).Return(stream, nil)
```
# Authentication
As outlined in the [gRPC authentication guide](https://grpc.io/docs/guides/auth.html) there are a number of different mechanisms for asserting identity between an client and server. We'll present some code-samples here demonstrating how to provide TLS support encryption and identity assertions as well as passing OAuth2 tokens to services that support it.
# Enabling TLS on a gRPC client
```Go
conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")))
```
# Enabling TLS on a gRPC server
```Go
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
if err != nil {
log.Fatalf("Failed to generate credentials %v", err)
}
lis, err := net.Listen("tcp", ":0")
server := grpc.NewServer(grpc.Creds(creds))
...
server.Serve(lis)
```
# OAuth2
For an example of how to configure client and server to use OAuth2 tokens, see
[here](https://github.com/grpc/grpc-go/tree/master/examples/features/authentication).
## Validating a token on the server
Clients may use
[metadata.MD](https://godoc.org/google.golang.org/grpc/metadata#MD)
to store tokens and other authentication-related data. To gain access to the
`metadata.MD` object, a server may use
[metadata.FromIncomingContext](https://godoc.org/google.golang.org/grpc/metadata#FromIncomingContext).
With a reference to `metadata.MD` on the server, one needs to simply lookup the
`authorization` key. Note, all keys stored within `metadata.MD` are normalized
to lowercase. See [here](https://godoc.org/google.golang.org/grpc/metadata#New).
It is possible to configure token validation for all RPCs using an interceptor.
A server may configure either a
[grpc.UnaryInterceptor](https://godoc.org/google.golang.org/grpc#UnaryInterceptor)
or a
[grpc.StreamInterceptor](https://godoc.org/google.golang.org/grpc#StreamInterceptor).
## Adding a token to all outgoing client RPCs
To send an OAuth2 token with each RPC, a client may configure the
`grpc.DialOption`
[grpc.WithPerRPCCredentials](https://godoc.org/google.golang.org/grpc#WithPerRPCCredentials).
Alternatively, a client may also use the `grpc.CallOption`
[grpc.PerRPCCredentials](https://godoc.org/google.golang.org/grpc#PerRPCCredentials)
on each invocation of an RPC.
To create a `credentials.PerRPCCredentials`, use
[oauth.NewOauthAccess](https://godoc.org/google.golang.org/grpc/credentials/oauth#NewOauthAccess).
Note, the OAuth2 implementation of `grpc.PerRPCCredentials` requires a client to use
[grpc.WithTransportCredentials](https://godoc.org/google.golang.org/grpc#WithTransportCredentials)
to prevent any insecure transmission of tokens.
# Authenticating with Google
## Google Compute Engine (GCE)
```Go
conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), grpc.WithPerRPCCredentials(oauth.NewComputeEngine()))
```
## JWT
```Go
jwtCreds, err := oauth.NewServiceAccountFromFile(*serviceAccountKeyFile, *oauthScope)
if err != nil {
log.Fatalf("Failed to create JWT credentials: %v", err)
}
conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), grpc.WithPerRPCCredentials(jwtCreds))
```
# Metadata
gRPC supports sending metadata between client and server.
This doc shows how to send and receive metadata in gRPC-go.
## Background
Four kinds of service method:
- [Unary RPC](https://grpc.io/docs/guides/concepts.html#unary-rpc)
- [Server streaming RPC](https://grpc.io/docs/guides/concepts.html#server-streaming-rpc)
- [Client streaming RPC](https://grpc.io/docs/guides/concepts.html#client-streaming-rpc)
- [Bidirectional streaming RPC](https://grpc.io/docs/guides/concepts.html#bidirectional-streaming-rpc)
And concept of [metadata](https://grpc.io/docs/guides/concepts.html#metadata).
## Constructing metadata
A metadata can be created using package [metadata](https://godoc.org/google.golang.org/grpc/metadata).
The type MD is actually a map from string to a list of strings:
```go
type MD map[string][]string
```
Metadata can be read like a normal map.
Note that the value type of this map is `[]string`,
so that users can attach multiple values using a single key.
### Creating a new metadata
A metadata can be created from a `map[string]string` using function `New`:
```go
md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})
```
Another way is to use `Pairs`.
Values with the same key will be merged into a list:
```go
md := metadata.Pairs(
"key1", "val1",
"key1", "val1-2", // "key1" will have map value []string{"val1", "val1-2"}
"key2", "val2",
)
```
__Note:__ all the keys will be automatically converted to lowercase,
so "key1" and "kEy1" will be the same key and their values will be merged into the same list.
This happens for both `New` and `Pairs`.
### Storing binary data in metadata
In metadata, keys are always strings. But values can be strings or binary data.
To store binary data value in metadata, simply add "-bin" suffix to the key.
The values with "-bin" suffixed keys will be encoded when creating the metadata:
```go
md := metadata.Pairs(
"key", "string value",
"key-bin", string([]byte{96, 102}), // this binary data will be encoded (base64) before sending
// and will be decoded after being transferred.
)
```
## Retrieving metadata from context
Metadata can be retrieved from context using `FromIncomingContext`:
```go
func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
```
## Sending and receiving metadata - client side
Client side metadata sending and receiving examples are available [here](../examples/features/metadata/client/main.go).
### Sending metadata
There are two ways to send metadata to the server. The recommended way is to append kv pairs to the context using
`AppendToOutgoingContext`. This can be used with or without existing metadata on the context. When there is no prior
metadata, metadata is added; when metadata already exists on the context, kv pairs are merged in.
```go
// create a new context with some metadata
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
// later, add some more metadata to the context (e.g. in an interceptor)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
// make unary RPC
response, err := client.SomeRPC(ctx, someRequest)
// or make streaming RPC
stream, err := client.SomeStreamingRPC(ctx)
```
Alternatively, metadata may be attached to the context using `NewOutgoingContext`. However, this
replaces any existing metadata in the context, so care must be taken to preserve the existing
metadata if desired. This is slower than using `AppendToOutgoingContext`. An example of this
is below:
```go
// create a new context with some metadata
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)
// later, add some more metadata to the context (e.g. in an interceptor)
md, _ := metadata.FromOutgoingContext(ctx)
newMD := metadata.Pairs("k3", "v3")
ctx = metadata.NewContext(ctx, metadata.Join(metadata.New(send), newMD))
// make unary RPC
response, err := client.SomeRPC(ctx, someRequest)
// or make streaming RPC
stream, err := client.SomeStreamingRPC(ctx)
```
### Receiving metadata
Metadata that a client can receive includes header and trailer.
#### Unary call
Header and trailer sent along with a unary call can be retrieved using function [Header](https://godoc.org/google.golang.org/grpc#Header) and [Trailer](https://godoc.org/google.golang.org/grpc#Trailer) in [CallOption](https://godoc.org/google.golang.org/grpc#CallOption):
```go
var header, trailer metadata.MD // variable to store header and trailer
r, err := client.SomeRPC(
ctx,
someRequest,
grpc.Header(&header), // will retrieve header
grpc.Trailer(&trailer), // will retrieve trailer
)
// do something with header and trailer
```
#### Streaming call
For streaming calls including:
- Server streaming RPC
- Client streaming RPC
- Bidirectional streaming RPC
Header and trailer can be retrieved from the returned stream using function `Header` and `Trailer` in interface [ClientStream](https://godoc.org/google.golang.org/grpc#ClientStream):
```go
stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
// retrieve trailer
trailer := stream.Trailer()
```
## Sending and receiving metadata - server side
Server side metadata sending and receiving examples are available [here](../examples/features/metadata/server/main.go).
### Receiving metadata
To read metadata sent by the client, the server needs to retrieve it from RPC context.
If it is a unary call, the RPC handler's context can be used.
For streaming calls, the server needs to get context from the stream.
#### Unary call
```go
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
```
#### Streaming call
```go
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
// do something with metadata
}
```
### Sending metadata
#### Unary call
To send header and trailer to client in unary call, the server can call [SendHeader](https://godoc.org/google.golang.org/grpc#SendHeader) and [SetTrailer](https://godoc.org/google.golang.org/grpc#SetTrailer) functions in module [grpc](https://godoc.org/google.golang.org/grpc).
These two functions take a context as the first parameter.
It should be the RPC handler's context or one derived from it:
```go
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// create and send header
header := metadata.Pairs("header-key", "val")
grpc.SendHeader(ctx, header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
```
#### Streaming call
For streaming calls, header and trailer can be sent using function `SendHeader` and `SetTrailer` in interface [ServerStream](https://godoc.org/google.golang.org/grpc#ServerStream):
```go
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// create and send header
header := metadata.Pairs("header-key", "val")
stream.SendHeader(header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}
```
# Keepalive
gRPC sends http2 pings on the transport to detect if the connection is down. If
the ping is not acknowledged by the other side within a certain period, the
connection will be close. Note that pings are only necessary when there's no
activity on the connection.
For how to configure keepalive, see
https://godoc.org/google.golang.org/grpc/keepalive for the options.
## What should I set?
It should be sufficient for most users to set [client
parameters](https://godoc.org/google.golang.org/grpc/keepalive) as a [dial
option](https://godoc.org/google.golang.org/grpc#WithKeepaliveParams).
## What will happen?
(The behavior described here is specific for gRPC-go, it might be slightly
different in other languages.)
When there's no activity on a connection (note that an ongoing stream results in
__no activity__ when there's no message being sent), after `Time`, a ping will
be sent by the client and the server will send a ping ack when it gets the ping.
Client will wait for `Timeout`, and check if there's any activity on the
connection during this period (a ping ack is an activity).
## What about server side?
Server has similar `Time` and `Timeout` settings as client. Server can also
configure connection max-age. See [server
parameters](https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters)
for details.
### Enforcement policy
[Enforcement
policy](https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters) is
a special setting on server side to protect server from malicious or misbehaving
clients.
Server sends GOAWAY with ENHANCE_YOUR_CALM and close the connection when bad
behaviors are detected:
- Client sends too frequent pings
- Client sends pings when there's no stream and this is disallowed by server
config
# Log Levels
This document describes the different log levels supported by the grpc-go
library, and under what conditions they should be used.
### Info
Info messages are for informational purposes and may aid in the debugging of
applications or the gRPC library.
Examples:
- The name resolver received an update.
- The balancer updated its picker.
- Significant gRPC state is changing.
At verbosity of 0 (the default), any single info message should not be output
more than once every 5 minutes under normal operation.
### Warning
Warning messages indicate problems that are non-fatal for the application, but
could lead to unexpected behavior or subsequent errors.
Examples:
- Resolver could not resolve target name.
- Error received while connecting to a server.
- Lost or corrupt connection with remote endpoint.
### Error
Error messages represent errors in the usage of gRPC that cannot be returned to
the application as errors, or internal gRPC-Go errors that are recoverable.
Internal errors are detected during gRPC tests and will result in test failures.
Examples:
- Invalid arguments passed to a function that cannot return an error.
- An internal error that cannot be returned or would be inappropriate to return
to the user.
### Fatal
Fatal errors are severe internal errors that are unrecoverable. These lead
directly to panics, and are avoided as much as possible.
Example:
- Internal invariant was violated.
- User attempted an action that cannot return an error gracefully, but would
lead to an invalid state if performed.
# Proxy
HTTP CONNECT proxies are supported by default in gRPC. The proxy address can be
specified by the environment variables HTTP_PROXY, HTTPS_PROXY and NO_PROXY (or
the lowercase versions thereof).
## Custom proxy
Currently, proxy support is implemented in the default dialer. It does one more
handshake (a CONNECT handshake in the case of HTTP CONNECT proxy) on the
connection before giving it to gRPC.
If the default proxy doesn't work for you, replace the default dialer with your
custom proxy dialer. This can be done using
[`WithDialer`](https://godoc.org/google.golang.org/grpc#WithDialer).
\ No newline at end of file
# RPC Errors
All service method handlers should return `nil` or errors from the
`status.Status` type. Clients have direct access to the errors.
Upon encountering an error, a gRPC server method handler should create a
`status.Status`. In typical usage, one would use [status.New][new-status]
passing in an appropriate [codes.Code][code] as well as a description of the
error to produce a `status.Status`. Calling [status.Err][status-err] converts
the `status.Status` type into an `error`. As a convenience method, there is also
[status.Error][status-error] which obviates the conversion step. Compare:
```
st := status.New(codes.NotFound, "some description")
err := st.Err()
// vs.
err := status.Error(codes.NotFound, "some description")
```
## Adding additional details to errors
In some cases, it may be necessary to add details for a particular error on the
server side. The [status.WithDetails][with-details] method exists for this
purpose. Clients may then read those details by first converting the plain
`error` type back to a [status.Status][status] and then using
[status.Details][details].
## Example
The [example][example] demonstrates the API discussed above and shows how to add
information about rate limits to the error message using `status.Status`.
To run the example, first start the server:
```
$ go run examples/rpc_errors/server/main.go
```
In a separate session, run the client:
```
$ go run examples/rpc_errors/client/main.go
```
On the first run of the client, all is well:
```
2018/03/12 19:39:33 Greeting: Hello world
```
Upon running the client a second time, the client exceeds the rate limit and
receives an error with details:
```
2018/03/19 16:42:01 Quota failure: violations:<subject:"name:world" description:"Limit one greeting per person" >
exit status 1
```
[status]: https://godoc.org/google.golang.org/grpc/status#Status
[new-status]: https://godoc.org/google.golang.org/grpc/status#New
[code]: https://godoc.org/google.golang.org/grpc/codes#Code
[with-details]: https://godoc.org/google.golang.org/grpc/status#Status.WithDetails
[details]: https://godoc.org/google.golang.org/grpc/status#Status.Details
[status-err]: https://godoc.org/google.golang.org/grpc/status#Status.Err
[status-error]: https://godoc.org/google.golang.org/grpc/status#Error
[example]: https://github.com/grpc/grpc-go/blob/master/examples/rpc_errors
# gRPC Server Reflection Tutorial
gRPC Server Reflection provides information about publicly-accessible gRPC
services on a server, and assists clients at runtime to construct RPC requests
and responses without precompiled service information. It is used by gRPC CLI,
which can be used to introspect server protos and send/receive test RPCs.
## Enable Server Reflection
gRPC-go Server Reflection is implemented in package
[reflection](https://github.com/grpc/grpc-go/tree/master/reflection). To enable
server reflection, you need to import this package and register reflection
service on your gRPC server.
For example, to enable server reflection in `example/helloworld`, we need to
make the following changes:
```diff
--- a/examples/helloworld/greeter_server/main.go
+++ b/examples/helloworld/greeter_server/main.go
@@ -40,6 +40,7 @@ import (
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
+ "google.golang.org/grpc/reflection"
)
const (
@@ -61,6 +62,8 @@ func main() {
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
+ // Register reflection service on gRPC server.
+ reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
```
An example server with reflection registered can be found at
`examples/features/reflection/server`.
## gRPC CLI
After enabling Server Reflection in a server application, you can use gRPC CLI
to check its services. gRPC CLI is only available in c++. Instructions on how to
use gRPC CLI can be found at
[command_line_tool.md](https://github.com/grpc/grpc/blob/master/doc/command_line_tool.md).
To build gRPC CLI:
```sh
git clone https://github.com/grpc/grpc
cd grpc
make grpc_cli
cd bins/opt # grpc_cli is in directory bins/opt/
```
## Use gRPC CLI to check services
First, start the helloworld server in grpc-go directory:
```sh
$ cd <grpc-go-directory>
$ go run examples/features/reflection/server/main.go
```
Open a new terminal and make sure you are in the directory where grpc_cli lives:
```sh
$ cd <grpc-cpp-dirctory>/bins/opt
```
### List services
`grpc_cli ls` command lists services and methods exposed at a given port:
- List all the services exposed at a given port
```sh
$ ./grpc_cli ls localhost:50051
```
output:
```sh
grpc.examples.echo.Echo
grpc.reflection.v1alpha.ServerReflection
helloworld.Greeter
```
- List one service with details
`grpc_cli ls` command inspects a service given its full name (in the format of
\<package\>.\<service\>). It can print information with a long listing format
when `-l` flag is set. This flag can be used to get more details about a
service.
```sh
$ ./grpc_cli ls localhost:50051 helloworld.Greeter -l
```
output:
```sh
filename: helloworld.proto
package: helloworld;
service Greeter {
rpc SayHello(helloworld.HelloRequest) returns (helloworld.HelloReply) {}
}
```
### List methods
- List one method with details
`grpc_cli ls` command also inspects a method given its full name (in the
format of \<package\>.\<service\>.\<method\>).
```sh
$ ./grpc_cli ls localhost:50051 helloworld.Greeter.SayHello -l
```
output:
```sh
rpc SayHello(helloworld.HelloRequest) returns (helloworld.HelloReply) {}
```
### Inspect message types
We can use`grpc_cli type` command to inspect request/response types given the
full name of the type (in the format of \<package\>.\<type\>).
- Get information about the request type
```sh
$ ./grpc_cli type localhost:50051 helloworld.HelloRequest
```
output:
```sh
message HelloRequest {
optional string name = 1[json_name = "name"];
}
```
### Call a remote method
We can send RPCs to a server and get responses using `grpc_cli call` command.
- Call a unary method
```sh
$ ./grpc_cli call localhost:50051 SayHello "name: 'gRPC CLI'"
```
output:
```sh
message: "Hello gRPC CLI"
```
# Versioning and Releases
Note: This document references terminology defined at http://semver.org.
## Release Frequency
Regular MINOR releases of gRPC-Go are performed every six weeks. Patch releases
to the previous two MINOR releases may be performed on demand or if serious
security problems are discovered.
## Versioning Policy
The gRPC-Go versioning policy follows the Semantic Versioning 2.0.0
specification, with the following exceptions:
- A MINOR version will not _necessarily_ add new functionality.
- MINOR releases will not break backward compatibility, except in the following
circumstances:
- An API was marked as EXPERIMENTAL upon its introduction.
- An API was marked as DEPRECATED in the initial MAJOR release.
- An API is inherently flawed and cannot provide correct or secure behavior.
In these cases, APIs MAY be changed or removed without a MAJOR release.
Otherwise, backward compatibility will be preserved by MINOR releases.
For an API marked as DEPRECATED, an alternative will be available (if
appropriate) for at least three months prior to its removal.
## Release History
Please see our release history on GitHub:
https://github.com/grpc/grpc-go/releases
This diff is collapsed.
all: vet test testrace
build: deps
go build google.golang.org/grpc/...
clean:
go clean -i google.golang.org/grpc/...
deps:
go get -d -v google.golang.org/grpc/...
proto:
@ if ! which protoc > /dev/null; then \
echo "error: protoc not installed" >&2; \
exit 1; \
fi
go generate google.golang.org/grpc/...
test: testdeps
go test -cpu 1,4 -timeout 7m google.golang.org/grpc/...
testappengine: testappenginedeps
goapp test -cpu 1,4 -timeout 7m google.golang.org/grpc/...
testappenginedeps:
goapp get -d -v -t -tags 'appengine appenginevm' google.golang.org/grpc/...
testdeps:
go get -d -v -t google.golang.org/grpc/...
testrace: testdeps
go test -race -cpu 1,4 -timeout 7m google.golang.org/grpc/...
updatedeps:
go get -d -v -u -f google.golang.org/grpc/...
updatetestdeps:
go get -d -v -t -u -f google.golang.org/grpc/...
vet: vetdeps
./vet.sh
vetdeps:
./vet.sh -install
.PHONY: \
all \
build \
clean \
deps \
proto \
test \
testappengine \
testappenginedeps \
testdeps \
testrace \
updatedeps \
updatetestdeps \
vet \
vetdeps
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// See internal/backoff package for the backoff implementation. This file is
// kept for the exported types and API backward compatibility.
package grpc
import (
"time"
)
// DefaultBackoffConfig uses values specified for backoff in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
var DefaultBackoffConfig = BackoffConfig{
MaxDelay: 120 * time.Second,
}
// BackoffConfig defines the parameters for the default gRPC backoff strategy.
type BackoffConfig struct {
// MaxDelay is the upper bound of backoff delay.
MaxDelay time.Duration
}
This diff is collapsed.
This diff is collapsed.
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package base
import (
"context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
type baseBuilder struct {
name string
pickerBuilder PickerBuilder
config Config
}
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
return &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
// Initialize picker to a picker that always return
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
// may call UpdateBalancerState with this picker.
picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
config: bb.config,
}
}
func (bb *baseBuilder) Name() string {
return bb.name
}
type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]balancer.SubConn
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
}
func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
panic("not implemented")
}
func (b *baseBalancer) UpdateResolverState(s resolver.State) {
// TODO: handle s.Err (log if not nil) once implemented.
// TODO: handle s.ServiceConfig?
grpclog.Infoln("base.baseBalancer: got new resolver state: ", s)
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range s.Addresses {
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[a] = sc
b.scStates[sc] = connectivity.Idle
sc.Connect()
}
}
for a, sc := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(sc)
delete(b.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
}
}
}
// regeneratePicker takes a snapshot of the balancer, and generates a picker
// from it. The picker is
// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
// - built by the pickerBuilder with all READY SubConns otherwise.
func (b *baseBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = NewErrPicker(balancer.ErrTransientFailure)
return
}
readySCs := make(map[resolver.Address]balancer.SubConn)
// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[addr] = sc
}
}
b.picker = b.pickerBuilder.Build(readySCs)
}
func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
panic("not implemented")
}
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
oldS, ok := b.scStates[sc]
if !ok {
grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
return
}
b.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}
oldAggrState := b.state
b.state = b.csEvltr.RecordTransition(oldS, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
b.regeneratePicker()
}
b.cc.UpdateBalancerState(b.state, b.picker)
}
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (b *baseBalancer) Close() {
}
// NewErrPicker returns a picker that always returns err on Pick().
func NewErrPicker(err error) balancer.Picker {
return &errPicker{err: err}
}
type errPicker struct {
err error // Pick() always returns this err.
}
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
}
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package base defines a balancer base that can be used to build balancers with
// different picking algorithms.
//
// The base balancer creates a new SubConn for each resolved address. The
// provided picker will only be notified about READY SubConns.
//
// This package is the base of round_robin balancer, its purpose is to be used
// to build round_robin like balancers with complex picking algorithms.
// Balancers with more complicated logic should try to implement a balancer
// builder from scratch.
//
// All APIs in this package are experimental.
package base
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
// Build takes a slice of ready SubConns, and returns a picker that will be
// used by gRPC to pick a SubConn.
Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
}
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
return NewBalancerBuilderWithConfig(name, pb, Config{})
}
// Config contains the config info about the base balancer builder.
type Config struct {
// HealthCheck indicates whether health checking should be enabled for this specific balancer.
HealthCheck bool
}
// NewBalancerBuilderWithConfig returns a base balancer builder configured by the provided config.
func NewBalancerBuilderWithConfig(name string, pb PickerBuilder, config Config) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
config: config,
}
}
This diff is collapsed.
This diff is collapsed.
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"context"
"sync"
"sync/atomic"
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/status"
)
// rpcStats is same as lbmpb.ClientStats, except that numCallsDropped is a map
// instead of a slice.
type rpcStats struct {
// Only access the following fields atomically.
numCallsStarted int64
numCallsFinished int64
numCallsFinishedWithClientFailedToSend int64
numCallsFinishedKnownReceived int64
mu sync.Mutex
// map load_balance_token -> num_calls_dropped
numCallsDropped map[string]int64
}
func newRPCStats() *rpcStats {
return &rpcStats{
numCallsDropped: make(map[string]int64),
}
}
// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
func (s *rpcStats) toClientStats() *lbpb.ClientStats {
stats := &lbpb.ClientStats{
NumCallsStarted: atomic.SwapInt64(&s.numCallsStarted, 0),
NumCallsFinished: atomic.SwapInt64(&s.numCallsFinished, 0),
NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.numCallsFinishedWithClientFailedToSend, 0),
NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.numCallsFinishedKnownReceived, 0),
}
s.mu.Lock()
dropped := s.numCallsDropped
s.numCallsDropped = make(map[string]int64)
s.mu.Unlock()
for token, count := range dropped {
stats.CallsFinishedWithDrop = append(stats.CallsFinishedWithDrop, &lbpb.ClientStatsPerToken{
LoadBalanceToken: token,
NumCalls: count,
})
}
return stats
}
func (s *rpcStats) drop(token string) {
atomic.AddInt64(&s.numCallsStarted, 1)
s.mu.Lock()
s.numCallsDropped[token]++
s.mu.Unlock()
atomic.AddInt64(&s.numCallsFinished, 1)
}
func (s *rpcStats) failedToSend() {
atomic.AddInt64(&s.numCallsStarted, 1)
atomic.AddInt64(&s.numCallsFinishedWithClientFailedToSend, 1)
atomic.AddInt64(&s.numCallsFinished, 1)
}
func (s *rpcStats) knownReceived() {
atomic.AddInt64(&s.numCallsStarted, 1)
atomic.AddInt64(&s.numCallsFinishedKnownReceived, 1)
atomic.AddInt64(&s.numCallsFinished, 1)
}
type errPicker struct {
// Pick always returns this err.
err error
}
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
}
// rrPicker does roundrobin on subConns. It's typically used when there's no
// response from remote balancer, and grpclb falls back to the resolved
// backends.
//
// It guaranteed that len(subConns) > 0.
type rrPicker struct {
mu sync.Mutex
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
subConnsNext int
}
func newRRPicker(readySCs []balancer.SubConn) *rrPicker {
return &rrPicker{
subConns: readySCs,
subConnsNext: grpcrand.Intn(len(readySCs)),
}
}
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
defer p.mu.Unlock()
sc := p.subConns[p.subConnsNext]
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
return sc, nil, nil
}
// lbPicker does two layers of picks:
//
// First layer: roundrobin on all servers in serverList, including drops and backends.
// - If it picks a drop, the RPC will fail as being dropped.
// - If it picks a backend, do a second layer pick to pick the real backend.
//
// Second layer: roundrobin on all READY backends.
//
// It's guaranteed that len(serverList) > 0.
type lbPicker struct {
mu sync.Mutex
serverList []*lbpb.Server
serverListNext int
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot.
subConnsNext int
stats *rpcStats
}
func newLBPicker(serverList []*lbpb.Server, readySCs []balancer.SubConn, stats *rpcStats) *lbPicker {
return &lbPicker{
serverList: serverList,
subConns: readySCs,
subConnsNext: grpcrand.Intn(len(readySCs)),
stats: stats,
}
}
func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
defer p.mu.Unlock()
// Layer one roundrobin on serverList.
s := p.serverList[p.serverListNext]
p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
// If it's a drop, return an error and fail the RPC.
if s.Drop {
p.stats.drop(s.LoadBalanceToken)
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
}
// If not a drop but there's no ready subConns.
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
// Return the next ready subConn in the list, also collect rpc stats.
sc := p.subConns[p.subConnsNext]
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
done := func(info balancer.DoneInfo) {
if !info.BytesSent {
p.stats.failedToSend()
} else if info.BytesReceived {
p.stats.knownReceived()
}
}
return sc, done, nil
}
func (p *lbPicker) updateReadySCs(readySCs []balancer.SubConn) {
p.mu.Lock()
defer p.mu.Unlock()
p.subConns = readySCs
p.subConnsNext = p.subConnsNext % len(readySCs)
}
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"context"
"fmt"
"io"
"net"
"reflect"
"time"
timestamppb "github.com/golang/protobuf/ptypes/timestamp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
// processServerList updates balaner's internal state, create/remove SubConns
// and regenerates picker using the received serverList.
func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: processing server list: %+v", l)
}
lb.mu.Lock()
defer lb.mu.Unlock()
// Set serverListReceived to true so fallback will not take effect if it has
// not hit timeout.
lb.serverListReceived = true
// If the new server list == old server list, do nothing.
if reflect.DeepEqual(lb.fullServerList, l.Servers) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
}
return
}
lb.fullServerList = l.Servers
var backendAddrs []resolver.Address
for i, s := range l.Servers {
if s.Drop {
continue
}
md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken)
ip := net.IP(s.IpAddress)
ipStr := ip.String()
if ip.To4() == nil {
// Add square brackets to ipv6 addresses, otherwise net.Dial() and
// net.SplitHostPort() will return too many colons error.
ipStr = fmt.Sprintf("[%s]", ipStr)
}
addr := resolver.Address{
Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
Metadata: &md,
}
if grpclog.V(2) {
grpclog.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
i, ipStr, s.Port, s.LoadBalanceToken)
}
backendAddrs = append(backendAddrs, addr)
}
// Call refreshSubConns to create/remove SubConns.
lb.refreshSubConns(backendAddrs, true)
// Regenerate and update picker no matter if there's update on backends (if
// any SubConn will be newed/removed). Because since the full serverList was
// different, there might be updates in drops or pick weights(different
// number of duplicates). We need to update picker with the fulllist.
//
// Now with cache, even if SubConn was newed/removed, there might be no
// state changes.
lb.regeneratePicker(true)
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}
// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
// indicating whether the backendAddrs are different from the cached
// backendAddrs (whether any SubConn was newed/removed).
// Caller must hold lb.mu.
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCLBServer bool) {
opts := balancer.NewSubConnOptions{}
if fromGRPCLBServer {
opts.CredsBundle = lb.grpclbBackendCreds
}
lb.backendAddrs = nil
if lb.usePickFirst {
var sc balancer.SubConn
for _, sc = range lb.subConns {
break
}
if sc != nil {
sc.UpdateAddresses(backendAddrs)
sc.Connect()
return
}
// This bypasses the cc wrapper with SubConn cache.
sc, err := lb.cc.cc.NewSubConn(backendAddrs, opts)
if err != nil {
grpclog.Warningf("grpclb: failed to create new SubConn: %v", err)
return
}
sc.Connect()
lb.subConns[backendAddrs[0]] = sc
lb.scStates[sc] = connectivity.Idle
return
}
// addrsSet is the set converted from backendAddrs, it's used to quick
// lookup for an address.
addrsSet := make(map[resolver.Address]struct{})
// Create new SubConns.
for _, addr := range backendAddrs {
addrWithoutMD := addr
addrWithoutMD.Metadata = nil
addrsSet[addrWithoutMD] = struct{}{}
lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD)
if _, ok := lb.subConns[addrWithoutMD]; !ok {
// Use addrWithMD to create the SubConn.
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
grpclog.Warningf("grpclb: failed to create new SubConn: %v", err)
continue
}
lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
if _, ok := lb.scStates[sc]; !ok {
// Only set state of new sc to IDLE. The state could already be
// READY for cached SubConns.
lb.scStates[sc] = connectivity.Idle
}
sc.Connect()
}
}
for a, sc := range lb.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
lb.cc.RemoveSubConn(sc)
delete(lb.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
}
}
}
func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
for {
reply, err := s.Recv()
if err != nil {
if err == io.EOF {
return errServerTerminatedConnection
}
return fmt.Errorf("grpclb: failed to recv server list: %v", err)
}
if serverList := reply.GetServerList(); serverList != nil {
lb.processServerList(serverList)
}
}
}
func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-s.Context().Done():
return
}
stats := lb.clientStats.toClientStats()
t := time.Now()
stats.Timestamp = &timestamppb.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
if err := s.Send(&lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
ClientStats: stats,
},
}); err != nil {
return
}
}
}
func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbClient.BalanceLoad(ctx, grpc.WaitForReady(true))
if err != nil {
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}
// grpclb handshake on the stream.
initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: &lbpb.InitialLoadBalanceRequest{
Name: lb.target,
},
},
}
if err := stream.Send(initReq); err != nil {
return true, fmt.Errorf("grpclb: failed to send init request: %v", err)
}
reply, err := stream.Recv()
if err != nil {
return true, fmt.Errorf("grpclb: failed to recv init response: %v", err)
}
initResp := reply.GetInitialResponse()
if initResp == nil {
return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
}
if initResp.LoadBalancerDelegate != "" {
return true, fmt.Errorf("grpclb: Delegation is not supported")
}
go func() {
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d)
}
}()
// No backoff if init req/resp handshake was successful.
return false, lb.readServerList(stream)
}
func (lb *lbBalancer) watchRemoteBalancer() {
var retryCount int
for {
doBackoff, err := lb.callRemoteBalancer()
select {
case <-lb.doneCh:
return
default:
if err != nil {
if err == errServerTerminatedConnection {
grpclog.Info(err)
} else {
grpclog.Warning(err)
}
}
}
// Trigger a re-resolve when the stream errors.
lb.cc.cc.ResolveNow(resolver.ResolveNowOption{})
if !doBackoff {
retryCount = 0
continue
}
timer := time.NewTimer(lb.backoff.Backoff(retryCount))
select {
case <-timer.C:
case <-lb.doneCh:
timer.Stop()
return
}
retryCount++
}
}
func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
var dopts []grpc.DialOption
if creds := lb.opt.DialCreds; creds != nil {
if err := creds.OverrideServerName(remoteLBName); err == nil {
dopts = append(dopts, grpc.WithTransportCredentials(creds))
} else {
grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
dopts = append(dopts, grpc.WithInsecure())
}
} else if bundle := lb.grpclbClientConnCreds; bundle != nil {
dopts = append(dopts, grpc.WithCredentialsBundle(bundle))
} else {
dopts = append(dopts, grpc.WithInsecure())
}
if lb.opt.Dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption)
dopts = append(dopts, wrb(lb.manualResolver))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
}
// DialContext using manualResolver.Scheme, which is a random scheme
// generated when init grpclb. The target scheme here is not important.
//
// The grpc dial target will be used by the creds (ALTS) as the authority,
// so it has to be set to remoteLBName that comes from resolver.
cc, err := grpc.DialContext(context.Background(), remoteLBName, dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
}
lb.ccRemoteLB = cc
go lb.watchRemoteBalancer()
}
This diff is collapsed.
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"fmt"
"sync"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
)
// The parent ClientConn should re-resolve when grpclb loses connection to the
// remote balancer. When the ClientConn inside grpclb gets a TransientFailure,
// it calls lbManualResolver.ResolveNow(), which calls parent ClientConn's
// ResolveNow, and eventually results in re-resolve happening in parent
// ClientConn's resolver (DNS for example).
//
// parent
// ClientConn
// +-----------------------------------------------------------------+
// | parent +---------------------------------+ |
// | DNS ClientConn | grpclb | |
// | resolver balancerWrapper | | |
// | + + | grpclb grpclb | |
// | | | | ManualResolver ClientConn | |
// | | | | + + | |
// | | | | | | Transient | |
// | | | | | | Failure | |
// | | | | | <--------- | | |
// | | | <--------------- | ResolveNow | | |
// | | <--------- | ResolveNow | | | | |
// | | ResolveNow | | | | | |
// | | | | | | | |
// | + + | + + | |
// | +---------------------------------+ |
// +-----------------------------------------------------------------+
// lbManualResolver is used by the ClientConn inside grpclb. It's a manual
// resolver with a special ResolveNow() function.
//
// When ResolveNow() is called, it calls ResolveNow() on the parent ClientConn,
// so when grpclb client lose contact with remote balancers, the parent
// ClientConn's resolver will re-resolve.
type lbManualResolver struct {
scheme string
ccr resolver.ClientConn
ccb balancer.ClientConn
}
func (r *lbManualResolver) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
r.ccr = cc
return r, nil
}
func (r *lbManualResolver) Scheme() string {
return r.scheme
}
// ResolveNow calls resolveNow on the parent ClientConn.
func (r *lbManualResolver) ResolveNow(o resolver.ResolveNowOption) {
r.ccb.ResolveNow(o)
}
// Close is a noop for Resolver.
func (*lbManualResolver) Close() {}
// UpdateState calls cc.UpdateState.
func (r *lbManualResolver) UpdateState(s resolver.State) {
r.ccr.UpdateState(s)
}
const subConnCacheTime = time.Second * 10
// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache.
// SubConns will be kept in cache for subConnCacheTime before being removed.
//
// Its new and remove methods are updated to do cache first.
type lbCacheClientConn struct {
cc balancer.ClientConn
timeout time.Duration
mu sync.Mutex
// subConnCache only keeps subConns that are being deleted.
subConnCache map[resolver.Address]*subConnCacheEntry
subConnToAddr map[balancer.SubConn]resolver.Address
}
type subConnCacheEntry struct {
sc balancer.SubConn
cancel func()
abortDeleting bool
}
func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn {
return &lbCacheClientConn{
cc: cc,
timeout: subConnCacheTime,
subConnCache: make(map[resolver.Address]*subConnCacheEntry),
subConnToAddr: make(map[balancer.SubConn]resolver.Address),
}
}
func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len(addrs) != 1 {
return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs))
}
addrWithoutMD := addrs[0]
addrWithoutMD.Metadata = nil
ccc.mu.Lock()
defer ccc.mu.Unlock()
if entry, ok := ccc.subConnCache[addrWithoutMD]; ok {
// If entry is in subConnCache, the SubConn was being deleted.
// cancel function will never be nil.
entry.cancel()
delete(ccc.subConnCache, addrWithoutMD)
return entry.sc, nil
}
scNew, err := ccc.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
ccc.subConnToAddr[scNew] = addrWithoutMD
return scNew, nil
}
func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
ccc.mu.Lock()
defer ccc.mu.Unlock()
addr, ok := ccc.subConnToAddr[sc]
if !ok {
return
}
if entry, ok := ccc.subConnCache[addr]; ok {
if entry.sc != sc {
// This could happen if NewSubConn was called multiple times for the
// same address, and those SubConns are all removed. We remove sc
// immediately here.
delete(ccc.subConnToAddr, sc)
ccc.cc.RemoveSubConn(sc)
}
return
}
entry := &subConnCacheEntry{
sc: sc,
}
ccc.subConnCache[addr] = entry
timer := time.AfterFunc(ccc.timeout, func() {
ccc.mu.Lock()
if entry.abortDeleting {
return
}
ccc.cc.RemoveSubConn(sc)
delete(ccc.subConnToAddr, sc)
delete(ccc.subConnCache, addr)
ccc.mu.Unlock()
})
entry.cancel = func() {
if !timer.Stop() {
// If stop was not successful, the timer has fired (this can only
// happen in a race). But the deleting function is blocked on ccc.mu
// because the mutex was held by the caller of this function.
//
// Set abortDeleting to true to abort the deleting function. When
// the lock is released, the deleting function will acquire the
// lock, check the value of abortDeleting and return.
entry.abortDeleting = true
}
}
}
func (ccc *lbCacheClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
ccc.cc.UpdateBalancerState(s, p)
}
func (ccc *lbCacheClientConn) close() {
ccc.mu.Lock()
// Only cancel all existing timers. There's no need to remove SubConns.
for _, entry := range ccc.subConnCache {
entry.cancel()
}
ccc.mu.Unlock()
}
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"fmt"
"sync"
"testing"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
type mockSubConn struct {
balancer.SubConn
}
type mockClientConn struct {
balancer.ClientConn
mu sync.Mutex
subConns map[balancer.SubConn]resolver.Address
}
func newMockClientConn() *mockClientConn {
return &mockClientConn{
subConns: make(map[balancer.SubConn]resolver.Address),
}
}
func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &mockSubConn{}
mcc.mu.Lock()
defer mcc.mu.Unlock()
mcc.subConns[sc] = addrs[0]
return sc, nil
}
func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
mcc.mu.Lock()
defer mcc.mu.Unlock()
delete(mcc.subConns, sc)
}
const testCacheTimeout = 100 * time.Millisecond
func checkMockCC(mcc *mockClientConn, scLen int) error {
mcc.mu.Lock()
defer mcc.mu.Unlock()
if len(mcc.subConns) != scLen {
return fmt.Errorf("mcc = %+v, want len(mcc.subConns) = %v", mcc.subConns, scLen)
}
return nil
}
func checkCacheCC(ccc *lbCacheClientConn, sccLen, sctaLen int) error {
ccc.mu.Lock()
defer ccc.mu.Unlock()
if len(ccc.subConnCache) != sccLen {
return fmt.Errorf("ccc = %+v, want len(ccc.subConnCache) = %v", ccc.subConnCache, sccLen)
}
if len(ccc.subConnToAddr) != sctaLen {
return fmt.Errorf("ccc = %+v, want len(ccc.subConnToAddr) = %v", ccc.subConnToAddr, sctaLen)
}
return nil
}
// Test that SubConn won't be immediately removed.
func TestLBCacheClientConnExpire(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
t.Fatal(err)
}
ccc := newLBCacheClientConn(mcc)
ccc.timeout = testCacheTimeout
if err := checkCacheCC(ccc, 0, 0); err != nil {
t.Fatal(err)
}
sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
// One subconn in MockCC.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// No subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 0, 1); err != nil {
t.Fatal(err)
}
ccc.RemoveSubConn(sc)
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// One subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 1, 1); err != nil {
t.Fatal(err)
}
// Should all become empty after timeout.
var err error
for i := 0; i < 2; i++ {
time.Sleep(testCacheTimeout)
err = checkMockCC(mcc, 0)
if err != nil {
continue
}
err = checkCacheCC(ccc, 0, 0)
if err != nil {
continue
}
}
if err != nil {
t.Fatal(err)
}
}
// Test that NewSubConn with the same address of a SubConn being removed will
// reuse the SubConn and cancel the removing.
func TestLBCacheClientConnReuse(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
t.Fatal(err)
}
ccc := newLBCacheClientConn(mcc)
ccc.timeout = testCacheTimeout
if err := checkCacheCC(ccc, 0, 0); err != nil {
t.Fatal(err)
}
sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
// One subconn in MockCC.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// No subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 0, 1); err != nil {
t.Fatal(err)
}
ccc.RemoveSubConn(sc)
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// One subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 1, 1); err != nil {
t.Fatal(err)
}
// Recreate the old subconn, this should cancel the deleting process.
sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
// One subconn in MockCC.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// No subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 0, 1); err != nil {
t.Fatal(err)
}
var err error
// Should not become empty after 2*timeout.
time.Sleep(2 * testCacheTimeout)
err = checkMockCC(mcc, 1)
if err != nil {
t.Fatal(err)
}
err = checkCacheCC(ccc, 0, 1)
if err != nil {
t.Fatal(err)
}
// Call remove again, will delete after timeout.
ccc.RemoveSubConn(sc)
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
}
// One subconn being deleted, and one in CacheCC.
if err := checkCacheCC(ccc, 1, 1); err != nil {
t.Fatal(err)
}
// Should all become empty after timeout.
for i := 0; i < 2; i++ {
time.Sleep(testCacheTimeout)
err = checkMockCC(mcc, 0)
if err != nil {
continue
}
err = checkCacheCC(ccc, 0, 0)
if err != nil {
continue
}
}
if err != nil {
t.Fatal(err)
}
}
#!/bin/bash
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -eux -o pipefail
TMP=$(mktemp -d)
function finish {
rm -rf "$TMP"
}
trap finish EXIT
pushd "$TMP"
mkdir -p grpc/lb/v1
curl https://raw.githubusercontent.com/grpc/grpc-proto/master/grpc/lb/v1/load_balancer.proto > grpc/lb/v1/load_balancer.proto
protoc --go_out=plugins=grpc,paths=source_relative:. -I. grpc/lb/v1/*.proto
popd
rm -f grpc_lb_v1/*.pb.go
cp "$TMP"/grpc/lb/v1/*.pb.go grpc_lb_v1/
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package roundrobin defines a roundrobin balancer. Roundrobin balancer is
// installed as one of the default balancers in gRPC, users don't need to
// explicitly install this balancer.
package roundrobin
import (
"context"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
)
// Name is the name of round_robin balancer.
const Name = "round_robin"
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilderWithConfig(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}
func init() {
balancer.Register(newBuilder())
}
type rrPickerBuilder struct{}
func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
if len(readySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var scs []balancer.SubConn
for _, sc := range readySCs {
scs = append(scs, sc)
}
return &rrPicker{
subConns: scs,
// Start at a random index, as the same RR balancer rebuilds a new
// picker when SubConn states change, and we don't want to apply excess
// load to the first server in the list.
next: grpcrand.Intn(len(scs)),
}
}
type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
mu sync.Mutex
next int
}
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return sc, nil, nil
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"sync"
)
type dropper struct {
// Drop rate will be numerator/denominator.
numerator uint32
denominator uint32
mu sync.Mutex
i uint32
}
func newDropper(numerator, denominator uint32) *dropper {
return &dropper{
numerator: numerator,
denominator: denominator,
}
}
func (d *dropper) drop() (ret bool) {
d.mu.Lock()
defer d.mu.Unlock()
// TODO: the drop algorithm needs a design.
// Currently, for drop rate 3/5:
// 0 1 2 3 4
// d d d n n
if d.i < d.numerator {
ret = true
}
d.i++
if d.i >= d.denominator {
d.i = 0
}
return
}
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"testing"
)
func TestDropper(t *testing.T) {
const repeat = 2
type args struct {
numerator uint32
denominator uint32
}
tests := []struct {
name string
args args
}{
{
name: "2_3",
args: args{
numerator: 2,
denominator: 3,
},
},
{
name: "4_8",
args: args{
numerator: 4,
denominator: 8,
},
},
{
name: "7_20",
args: args{
numerator: 7,
denominator: 20,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := newDropper(tt.args.numerator, tt.args.denominator)
var (
dCount int
wantCount = int(tt.args.numerator) * repeat
loopCount = int(tt.args.denominator) * repeat
)
for i := 0; i < loopCount; i++ {
if d.drop() {
dCount++
}
}
if dCount != (wantCount) {
t.Errorf("with numerator %v, denominator %v repeat %v, got drop count: %v, want %v",
tt.args.numerator, tt.args.denominator, repeat, dCount, wantCount)
}
})
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package grpc.testing;
message ByteBufferParams {
int32 req_size = 1;
int32 resp_size = 2;
}
message SimpleProtoParams {
int32 req_size = 1;
int32 resp_size = 2;
}
message ComplexProtoParams {
// TODO (vpai): Fill this in once the details of complex, representative
// protos are decided
}
message PayloadConfig {
oneof payload {
ByteBufferParams bytebuf_params = 1;
SimpleProtoParams simple_params = 2;
ComplexProtoParams complex_params = 3;
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// +build !linux appengine
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package service
import (
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
"google.golang.org/grpc/internal/channelz"
)
func sockoptToProto(skopts *channelz.SocketOptionData) []*channelzpb.SocketOption {
return nil
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment