Developing Microservices & APIs using gRPC

Every microservice is expected to serve some part of the business function and can be built using multiple approaches. One of the proven approach is REST (HTTP-JSON). For example, below chart shows the 4 different microservices working in conjugation with each other.

These microservices must exchange data amongst themselves and following are aspects which we need to thoroughly pen-down before making any microservice being live :-

  • API to exchange data. e.g. Below can be APIs formats :-

POST /api/v1/customer

GET /api/v1/customer/876/posts/3409

  • The data format itself. e.g. it can be JSON, XML or binary.
  • Efficiency of the API and Load-balancing. There can be instances when we have too much of data and also instances, when data transferred is too less.
  • The error patterns.
  • Rate limiting and API authentication.
  • Scaling for millions of requests.
  • Latency of the APIs.
  • Inter-operability across many platforms & programming languages.
  • Efficient Monitoring, logging and Alerting.

Introducing gRPC

The grPC is such an framework which can help us to solve many of the problems we have listed above. As a developer of business APIs, we need to think about the data and contract and we should be done. The gRPC is a free and open-source framework developed by google. Its now the foundation of Cloud Native Computation Foundation, just like Docker & Kubernetes.

  • It allows us to define the REQUEST / RESPONSE for RPC(Remote Procedure Calls) and handles everything out of the box.
  • Its an efficient and fast way of APIs invocation.
  • Its build on top of HTTP/2.
  • It have low latency and also is language independent.
  • It supports plugging of authentication, load-balancing, logging and monitoring in an easy style.

Although the RPC looks like that, we are just invoking a method at client-side, however this call happens over the network. For e.g. In below picture, ‘createUser’ call looks like below :-

Below is how gRPC supports the language agnostic calls :-

Using gRPC, we can generate the stubs which can sit at client’s end and helps in invoking the services. Thus, interacting with-in microservices becomes easy. For e.g. Say Mobile-App (being written in Java) can easily invoke the ‘Purchase-service’ being implemented using gRPC, as shown below :-

While working with gRPC, following is the way :-

  • We first define the messages and services using Protocol-Buffers and rest of the code is generated for us automatically.
  • One .proto file works for over 12 Programming Languages and can scale to millions of requests per second.
  • Client basically sends the Proto-request to the server and server in-turn sends back the proto-response to client.

Who is using gRPCs at production ?

  • Google for cloud-services like Pub/Sub.
  • Netflix.
  • CockroachDB.
  • Square.
  • Netflix.
  • CoreOs (etcd3 is built on top of grPC, used for transporting the data)

Basic semantics of protocol-Buffers :-

The ‘message’ is term used here with Protobuf, which shall be transferred over the network. Basically, this is a type of Object that we define. We can have numerous ‘Message’ inside one ‘.proto’ file.

Advantage of working with Protocol-Buffers :-

  • Its language-agnostic.
  • Code can be generated for any language.
  • Data is binary and efficiently serialised.
  • Its easy and convenient for transporting a lot of data.
  • Its allows for easy API evolution using Rules.

How does Protocol-buffers works at all ?

  • Using protocol-buffers, we can have same serialisation and deserialisation for all the languages.

Here is how the process of serialisation happens. The Message-format(in .proto) along with the message gets converted to Byte-Array.

Here is how the process of de-serialisation happens. The Message is formed using the Byte-Array and Message-format(in .proto).

How does a simplest ‘.proto’ looks like ;-

Supported-Field-types while using Protobuf :-

  • Supported scalar type NUMBERS are as following: int32, int64, double, float, sint32, sint64, uint32, uint64, fixed32, fixed64, sfixed32, sfixed64. Pl note here that, fixed32 uses 4 bytes constantly, whereas int32 and sint32 uses variable encoding, wherein, if it can use less space, it shall use for small values.
  • Supported scalar type BOOLEAN are: Its represented as ‘bool’ in protobuf.
  • Supported scalar type STRING are: Its represented as ‘string’ in protobuf. It should always contain the UTF-8 encoded or 7-bit-ASCII text.
  • Supported scalar type BYTES are: Its represented as ‘bytes’ in protobuf. It represents sequence of the byte-array. e.g. → We can use this type to represent an Image.
  • Supported LIST type: Its represented as ‘repeated’ in protobuf. It represents list of the corresponding scalar-type. e.g. → We can use this type to represent list-of-phone-numbers which Human holds.
  • Supported Enum type: Its represented as ‘enum’ in protobuf. It represents a variable, whose all the values are pre-known. e.g. → We can use this type to represent color of the eye of the Human.

Here is how we can create a sample .proto file, containing the ‘‘message’’ of Human type, having some attributes.

syntax = "proto3";

/*
Human represents a User of our system.
*/
message Human {
int32 age = 1;
string first_name = 2;
string last_name = 3;
bytes small_picture = 4;
bool is_profile_verified = 5;
float height = 6;
repeated string phone_numbers = 7;

enum EyeColor {
UNKNOWN_COLOR = 0;
GREEN = 1;
BROWN = 2;
BLACK = 3;
}

EyeColor eyecolor = 8;
my.Date.Date birthday = 9; message Address {
string address_line_1 = 0;
string address_line_2 = 1;
string zip_code = 2 ;
string city = 3;
string country = 4;
}

repeated Address addressOfHuman = 10;
}

In above ‘message’, we also created a nested message of type Address. This is very much possible with protobuf.

Now, Lets see an example of creating a message of type Date, as we have defined above inside the ‘Human’ message. Pl note here that, there is no way for the constraints on the ranges to be adhered by protobuf, its something code has to take care of. e.g. value of month can very well go beyond the value as 12 as well, but protobuf has no option to take control of the same. Also, Pl observe below that, we are creating the DATE ‘message’ in an separate file called as date.proto and then importing this DATE type of message to the aforesaid ‘Human’ however, we can even create multiple ‘message’ types in the same file as well :-

syntax = "proto3";package my.Date;message Date {
// Year of date. Must be from 1 to 9999, or 0 if specifying a date without
// a year.
int32 year = 1;

// Month of year. Must be from 1 to 12.
int32 month = 2;

// Day of month. Must be from 1 to 31 and valid for the year and month, or 0
// if specifying a year/month where the day is not significant.
int32 day = 3;
}

Advanced data-types supported by the Protobuf :-

  • map → It can be used to map scalars(except float/double) to values of any type. Map fields can not be repeated. example →
map<string, CustomValue> = 2
  • Timestamp → Below is an example of usage of TimeStamp, which comes ready-made from google :-
syntax = "proto3";

import "google/protobuf/timestamp.proto";

package
example.simple;

message SimpleMessage {
int32 id = 1;
google.protobuf.Timestamp created_date = 2;
}

Whats a TAG ? Here, pay attention to the numbers used for every field of the ‘message’ object. This number is also called as TAG.

  • Range of values that TAG can take is: {1 TO 53,68,70,911}.
  • The values 19,000 TO 19,999 can’t be used, as these are reserved by Google.
  • Tag-numbers 1 to 15 uses ONE byte of space, post the message is encoded. So, preferably use these tag-numbers for frequently populated fields.
  • Tag-numbers from 16 to 2047 uses TWO bytes of space.

We generally define the packages, in which our ‘protocol-buffer-messages’ lives. After the code gets compiled, it shall be placed at the indicated package.

What is ‘protoc’The ‘protoc’ is a way for us to generate the code. We specify the .proto files as input and we can generate code for following languages: C++/Java/C#/Python/Ruby/PHP/Objective-C/etc.

Lets see an example of generating the code from ‘.proto’ files and writing that MessageObject to the File. Below is a simple.proto file, where we have defined a SimpleMessage.

syntax = "proto3";

package example.simple;

message SimpleMessage {
int32 id = 1;
bool is_simple = 2;
string name = 3;
repeated int32 sample_list = 4;
}

Upon compiling this code, it shall auto-generate the java source-code for us.

Now, Lets create a SimpleMessage object and write it to a text file. We shall then read the same object. Please note here that, object once written to the file can be read in any language.

public static void main(String[] args) throws IOException {
System.out.println("Hello world!");

SimpleMessage.Builder simpleMessageBuilder = SimpleMessage.newBuilder();
simpleMessageBuilder
.setId(4567)
.setIsSimple(true)
.setName("Honesty is the best policy.")
.addAllSampleList(Arrays.asList(1,2,3));

SimpleMessage simpleMessage = simpleMessageBuilder.build();
FileOutputStream fileOutputStream = new FileOutputStream("simpleMessage_bin");
simpleMessage.writeTo(fileOutputStream);


FileInputStream fileInputStream = new FileInputStream("simpleMessage_bin");
SimpleMessage messageAsReadFromInputStream = SimpleMessage.parseFrom(fileInputStream);

System.out.println(messageAsReadFromInputStream);
}

Lets now see an example of an ENUM proto and its usage. Below is the sample proto file we have created. Using protoc compiler, it shall auto-generate the java source code for us.

syntax = "proto3";
package example.enumerations;
message WeekDay {
int32 id = 1;
DayOfTheWeek day_of_the_week = 2;
}
enum DayOfTheWeek {
UNKNOWN = 0;
MONDAY = 1;
TUESDAY = 2;
WEDNESDAY = 3;
THURSDAY = 4;
FRIDAY = 5;
SATURDAY = 6;
SUNDAY = 7;
}

Below is how we shall be using the auto-generated code of POJO :-

WeekDay.Builder weekDayBuilder = WeekDay.newBuilder();
weekDayBuilder
.setId(1)
.setDayOfTheWeek(DayOfTheWeek.SATURDAY);

WeekDay weekDayBuiltIs = weekDayBuilder.build();
System.out.println(weekDayBuiltIs);
}

We can also enforce the java package-name of the auto-generate class using below command inside the proto files :-

option java_package = "com.example.options";

Say, if our 1 proto file contains multiple Messages and for every ‘Message’, we want to auto-generate the different/separate POJO, we can use the below command inside the proto files :-

option java_multiple_files = true;

The protocol-buffers also gets full-marks when it comes to upgradation of ‘Message’ as defined in the ‘.proto’ files. We might have a scenario, where one application is writing the Message Object in enhanced format and some application is still reading the old format of the ‘Message’. Other scenario is vice-versa.

Rules for updating the Protocol (i.e. format of Message inside the .proto file):-

  • Numeric tags for any existing fields must not be changed. Now, when we add newer fields, older-code would just ignore those newer fields.
  • Vice-versa, if newer code reads any Message(which was being written using older-code), it would initiate the differing fields with default values.
  • The renaming of the fields is very simple, until and unless tag-numbers are not being changed for renamed fields. Only Tag-Number is important for the Protobuf.
  • Fields in the .proto files can be deleted as long as the tag-numbers(belonging to the deleted fields) are not being re-used by the newly added fields. To avoid this by happening accidentally, its better to still keep the deleted fields, but we can suffix them with _OBSOLETE. OR another option is by reserving the deleted fields :-

Point to note here is, Never-ever remove the reserved tags, as it can cause conflicts in case future developer uses the reserved tag-number.

Protocol-Buffer-Services :- Using Protocol-buffers, we can also define the services using the Messages. A Service is a set of end-points exposed by server. Below is how services(i.e. RPC call) can be made to sit on top of Messages. The SearchRequest takes in ‘person_id’ in request and returns SearchResponse containing the (‘person_id’ and ‘person_name’). For every rpc call, we need to define a request-message and a response-message.

Efficiency of Protobuf over JSON:-

First, Protobuf formats are more efficient in terms of the payload-size. We know all of this data has to travel over the network eventually and it can help us to save the lot of Bandwidth. Lets see the comparison :-

  • Say we have this JSON :- The size of below JSON is around 55 bytes.
{
"age":29,
"firstname": "Aditya",
"lastname":"Goel"
}
  • In contrast say, we have this protobuf format :- The size of protobuf is 20 bytes.
message Person {
int32 age = 1;
string first_name = 2;
string last_name = 3;
}

Second, parsing JSON is more CPU-intensive (since its human-readable) as compared to the protobuf parsing (since its little more closer to the way, memory understands the data).

Thus, using Protobuf means faster and more efficient communications and hence it comes as first choice for mobile devices which have slower CPU.

Conclusions: Protocol Buffer (protobuf3) :-

  • The API can be defined in an simple and easy manner.
  • The definition is different from implementation.
  • Lot of boiler-plate code can be auto-generated by proto-compiler.
  • Since the payload is binary, its lot more efficient to receive/send on network and deserialize/serialize on CPU.

HTTP/2:- Its a revolutionary technology and have almost become the new standard for internet communications, that improves the communication-over the network tremendously. Any given image or video loads extremely fast on the HTTP/2 as compared to the HTTP/1.1. Similarly, loading 50 API-calls would take lot lesser on HTTP/2 in contrast to loading the same 50 API-calls over the HTTP/1.1. Bottom-line is that, With HTTP2, we get: Reduced latency, Increased Security, More Efficiency. Lets see some differences in HTTP/2 and HTTP/1.1 :-

HTTP/1.1 :→

# It was introduced in 1997 and even being used widely.

# It opens up a new TCP connection to the server for each new request.

# It doesn’t compress the headers and keep them in plaintext.

# It works with only request/response mechanism. It doesn’t supports server-push.

# Its composed of 2 commands primarily : GET and POST. With advent of time, many more commands/verbs have come up like PATCH, DELETE, PUT etc.

Because of aforesaid mentioned points, this architecture (HTTP/1.1) is way less efficient and adds latency to the API calls. Below is how it appears :-

HTTP/2 :→

# It was introduced in 2015 and backed by Google.

# It supports multiplexing i.e. client & server can push messages in parallel over the same TCP connection and it can greatly help in reducing the latency.

# It does supports the compression of the headers and therefore saves a lot in network-bandwidth, as the packet-size shall be smaller.

# It does supports server-push i.e. Servers can now push the streams (multiple messages) for one request from the client. it saves lot of unwanted round-trips and hence again save few bucks on latency.

# Its binary in nature and secure by default.

Types of APIs in gRPC :-

  • Unary → This is how a traditional API looks like with usual request/response. The client shall send one message to the server and would receive one response from the server. Unary RPC calls are well suited, when data to be transferred is small.
  • Server/Client/Bidirectional Streaming → With HTTP/2, we have streaming capabilities i.e. server and client can push multiple messages as part of one request. We can use these streaming RPC calls , if we have scaling and optimisation issues.

Below is how we define the APIs (Protobuf Services) using gRPC (Note: We saw the Unary API above) :-

Massive Scalability using gRPCs :- (Google have >10 billion of requests being made per second internally).

  • gRPC servers are Asynchronous by default i.e. they doesn’t blocks the threads on request. Therefore each gRPC server can serve millions of requests per second in parallel.
  • gRPC clients can be either synchronous OR Asynchronous. Also, gRPC clients are well positioned to perform the client side load-balancing as well.

Differences between REST & gRPC :-

  • REST shall be using JSON as data-transfer standard which would be slower, bigger and text-based while gRPC uses the protobuf as data-transfer standard which are smaller and faster in nature.
  • REST uses the HTTP/1.1 while gRPC uses the HTTP/2 which is lot more faster and efficient.
  • REST only supports Client to server calls while gRPC supports bidirectional and async calls as well.
  • REST only supports request/response while gRPC supports streaming capabilities as well.
  • REST is purely Resource-oriented while gRPC is purely API oriented with free-design.
  • REST supports auto-code-generation using swagger and openAPI as 2nd class citizens while gRPC supports auto-code-generation using prtobuf as 1st class citizens.
  • REST is Verbs based and thus we have to basic plumbing ourselves while gRPC is RPC based i.e. we can invoke the functions at server easily.

Launching the gRPC client :- Lets see an example of launching the client of the protobuf service. Here is sample ‘.proto’ file we are considering :-

syntax = "proto3";

package test;

option java_package = "com.proto.test";

option java_multiple_files = true;

message TestMessage {}

service TestService {}

Here is how we can create the client :-

public static void main(String[] args) throws SSLException {
System.out.println("Hello I'm a gRPC client");
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();

TestServiceGrpc.TestServiceBlockingStub synchronousClient = TestServiceGrpc.newBlockingStub(channel);
TestServiceGrpc.TestServiceFutureStub asynchronousClient = TestServiceGrpc.newFutureStub(channel);

System.out.println("Shutting down the channel.");
channel.shutdown();

}

Full end-to-end gRPC Unary API :- Lets see an example of creating a full end-to-end API with well defined request and response. We define the below proto file :-

syntax = "proto3";

package greet;

option java_package = "com.proto.greet";
option java_multiple_files = true;

message Greeting {
string first_name = 1;
string last_name = 2;
}

message GreetRequest {
Greeting greeting = 1;
}

message GreetResponse {
string result = 1;
}

service GreetService {
// Unary API
rpc Greet(GreetRequest) returns (GreetResponse) {};
}

We now run the protoc compiler through gradle, in order to auto-generate the code and here is how it looks like :-

Some of the important convention regarding the naming are :-

  • Every service should have a suffix ‘Service’ in its name.
  • Every request should have a suffix ‘Request’ and every response should have ‘Response’ in its name.
  • In case, the name of the fields of any message are multi-word, we should have ‘_’ as a separator.

Lets now define the crux of our service i.e. the business logic of what our service shall be doing, whenever it receives a request :-

public class GreetServiceImpl extends GreetServiceGrpc.GreetServiceImplBase {

@Override
public void greet(GreetRequest request, StreamObserver<GreetResponse> responseObserver) {
// Extract information from Request as it is received.
Greeting greetRequest = request.getGreeting();
String firstName = greetRequest.getFirstName();

// Frame the response, to be sent back
String framedResponse = "Welcome " + firstName;

GreetResponse greetResponse = GreetResponse.newBuilder().setResult(framedResponse).build();

// Sending back the response to the client
responseObserver.onNext(greetResponse);

// complete the call
responseObserver.onCompleted();
}
}

Lets now define a server to host our above service :-

public class GreetingServer {

public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("Starting the gRPC Server with GreetService hossted in it.");

// plaintext server
Server server = ServerBuilder.forPort(50051)
.addService(new GreetServiceImpl())
.build();

server.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Received Shutdown Request");
server.shutdown();
System.out.println("Successfully stopped the server");
}));

server.awaitTermination();
}
}

Let’s now define a client now, which shall be invoking the API(or method) that we just exposed :-

public static void main(String[] args) throws SSLException {
System.out.println("Starting our gRPC client");

ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext() // Not recommended for production.
.build();

// Build the protobuf Greeting message.
Greeting greetingRequestMessage = Greeting.newBuilder()
.setFirstName("Harey")
.setLastName("Krishna")
.build();

// Build the GreetingRequest, which has to be sent to the server.
GreetRequest greetRequest = GreetRequest.newBuilder()
.setGreeting(greetingRequestMessage)
.build();

// Create the Synchronous blocking client.
GreetServiceGrpc.GreetServiceBlockingStub greetClient = GreetServiceGrpc.newBlockingStub(channel);

// Invoke the function (being implemented at server).
// This is actually a N/w call, but looks like method call.
GreetResponse greetResponse = greetClient.greet(greetRequest);

System.out.println("The result thus obtained here is: "+ greetResponse.getResult());
channel.shutdown();
}

Please note here that, It just appears here that we are invoking the method-call, but actually its a server call which goes over the network. Thus, our client has invoked the ‘greet()’ method at the server over the network. Lets now launch our server and then also launch client and here is response received :-

Full end-to-end gRPC Server-Streaming API :- Here client shall send one message and client shall be receiving many messages in streaming-fashion, possibly infinite no. of messages from server-end. These are well suited in following circumstances :-

  • Server has to send big-data back to the client. Say example, the response consists of 10 GB, then sending all of this data at once presents the risk of n/w failure or call being failed eternally. Solution to this can be, if server sends stream of messages each of 100 KB each. In this case, there are high chances that, this transfer shall succeed.
  • Server needs to PUSH the data to client, without having the client-request. e.g. Live feed, etc. Basically, Server is pushing the data to client, without the client being asking for it.

Lets see, how can we define an Streaming-server API using gRPC. Below is how the code syntax looks like :-

service GreetService { 
// Server Streaming API
rpc GreetServerStreaming(GreetRequest) returns (stream GreetResponse) {};

}

Lets now define the crux of our service i.e. the business logic of what our service shall be doing, whenever it receives a request. Inn our case, we are sending the multiple responses each after a second to the client :-

public class GreetServerStreamingServiceImpl extends GreetServiceGrpc.GreetServiceImplBase {

@Override
public void greetServerStreaming(GreetRequest request, StreamObserver<GreetResponse> responseObserver) {
// Extract information from Request as it is received.
String firstName = request.getGreeting().getFirstName();

// Server shall be sending stream of responnses.
try {
for(int i=1; i <10; i++) {
// Frame the response
String framedResponse = "Welcome " + firstName + " for the " + i + " time.";
GreetResponse greetResponse = GreetResponse.newBuilder()
.setResult(framedResponse).build();

// Send back the response
responseObserver.onNext(greetResponse);

Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// complete the call
responseObserver.onCompleted();
}
}
}

Please note that server to host our above service would remain same as in case of Unary API. Let’s now define a client now, which shall be invoking the API(or method) that we just exposed :-

// Receive the Streaming response in blocking manner.
greetClient.greetServerStreaming(greetRequest)
.forEachRemaining(greetResponse -> {
System.out.println("Good-Morning: "+ greetResponse.getResult());
});

Full end-to-end gRPC Client-Streaming API :- Here client shall be sending many messages to the server in a streaming fashion and would be receiving one response message. It purely depends upon the server that, when it wants to send the response back. It may send it after it has received all messages or just after receiving single message. These are well suited in following circumstances :-

  • When the client needs to send big-data, like uploading something.
  • When the server processing is quite expensive and should happen as and when the client sends the data. For e.g. Say client has to send 10K messages to server, then Server can start processing the data one by one, rather than it starts processing after client sends all 10K messages.
  • When the client needs to PUSH the data to the server, without really expecting a response back from it.

Lets see, how can we define an Client-Streaming API using gRPC. Below is how the code syntax looks like :-

// Client Streaming API
rpc GreetClientStreaming(stream GreetRequest) returns (GreetResponse) {};

Lets now define the crux of our service i.e. the business logic of what our service shall be doing, whenever it receives a request. In our case, our client is sending multiple requests. We define the logic of processing/”what-to-do” inside ‘onNext()’ method, upon every single message is being received by the client. When the client is done sending messages, we define that logic inside ‘onCompleted()’ method as shown below :-

@Override
public StreamObserver<GreetRequest> greetClientStreaming(StreamObserver<GreetResponse> responseObserver) {
StreamObserver<GreetRequest> greetRequestStreamObserver = new StreamObserver<GreetRequest>() {

String result = "";

@Override
public void onNext(GreetRequest value) {
// Client Sends a messageRecord
result += "Hello " + value.getGreeting().getFirstName() + " !.";
}

@Override
public void onError(Throwable t) {
// Client Sends an Error
}

@Override
public void onCompleted() {
// Client is done and we want to send the response back.
responseObserver.onNext(
GreetResponse.newBuilder()
.setResult(result)
.build()
);
responseObserver.onCompleted();
}
};
return greetRequestStreamObserver;
}

Please note that server to host our above service would remain same as in case of Unary API. Let’s now define a client now, which shall be invoking the API(or method) that we just exposed in an client-streaming fashion. Please note here that, we used CountDownLatch in order to wait for response. We are sending stream of messages to the server and server sends us an single message.

private static void doClientStreamingCall(ManagedChannel channel) throws InterruptedException {
// Create the ASynchronous non-blocking client.
GreetServiceGrpc.GreetServiceStub greetClient = GreetServiceGrpc.newStub(channel);

CountDownLatch countDownLatch = new CountDownLatch(1);

StreamObserver<GreetRequest> requestStreamObserver = greetClient.greetClientStreaming(
new StreamObserver<GreetResponse>() {
@Override
public void onNext(GreetResponse value) {
// We get a response back from server. It shall be called ONCE.
System.out.println("Received some response from Server.");
System.out.println(value.getResult());
}

@Override
public void onError(Throwable t) {
// We get a Error back from server. It shall be called ONCE.

}

@Override
public void onCompleted() {
// The server is done. Shall be called after onNext method.
countDownLatch.countDown();

}
}
);

System.out.println("Sending Message1 to Server.");
requestStreamObserver.onNext(GreetRequest.newBuilder()
.setGreeting(Greeting.newBuilder()
.setFirstName("Harey")
.build())
.build());

System.out.println("Sending Message2 to Server.");
requestStreamObserver.onNext(GreetRequest.newBuilder()
.setGreeting(Greeting.newBuilder()
.setFirstName("Krishna")
.build())
.build());

// Tell the server that client is done sending the data.
requestStreamObserver.onCompleted();

countDownLatch.await(3L, TimeUnit.SECONDS);
}

Full end-to-end gRPC Bi-directional-Streaming API :- Here client shall be sending many messages to the server in a streaming fashion and also server shall be sending many messages in response to the client. The numbers of messages from each side need not to match. These are well suited in following circumstances :-

  • When the client and server both have to send a lot of data asynchronously.
  • “Chat” Protocol. Here both client and server sends messages to each other.
  • Long running connections, where data can be streamed back and forth between client and server.

Lets see, how can we define an Client-Streaming API using gRPC. Below is how the code syntax looks like :-

// Bidirectional Streaming API
rpc GreetBidorectionalStreaming(stream GreetRequest) returns (stream GreetResponse) {};

Lets now define the crux of our service i.e. the business logic of what our service shall be doing, whenever it receives a request. In our case, our client is sending multiple requests. We define the logic of processing/”what-to-do” inside ‘onNext()’ method, upon every single message is being received by the client. Here in this example, we are responding back to every client request and sending the response back. When the client is done sending messages, we define that logic inside ‘onCompleted()’ method as shown below. :-

@Override
public StreamObserver<GreetRequest> greetBidorectionalStreaming(StreamObserver<GreetResponse> responseObserver) {
StreamObserver<GreetRequest> greetRequestStreamObserver = new StreamObserver<GreetRequest>() {

String result = "";

@Override
public void onNext(GreetRequest value) {
// Client Sends a messageRecord and In turn, Server sends back a response.
result += "Hello " + value.getGreeting().getFirstName() + " !.";
GreetResponse greetResponse = GreetResponse.newBuilder()
.setResult(result)
.build();
responseObserver.onNext(greetResponse);
}

@Override
public void onError(Throwable t) {
// Client Sends an Error
}

@Override
public void onCompleted() {
// Client is done now.
responseObserver.onCompleted();
}
};
return greetRequestStreamObserver;
}

Please note that server to host our above service would remain same as in case of Unary API. Let’s now define a client now, which shall be invoking the API(or method) that we just exposed in an client-streaming fashion. Please note here that, we used CountDownLatch in order to wait for response. We are sending stream of messages to the server and server sends us an single message.

private static void doBidirectionalStreamingCall(ManagedChannel channel) throws InterruptedException {
// Create the ASynchronous non-blocking client.
GreetServiceGrpc.GreetServiceStub greetClient = GreetServiceGrpc.newStub(channel);

CountDownLatch countDownLatch = new CountDownLatch(1);

StreamObserver<GreetRequest> requestStreamObserver = greetClient.greetBidorectionalStreaming(
new StreamObserver<GreetResponse>() {
@Override
public void onNext(GreetResponse value) {
// Everytime the response comes back from server, this method is going to be called.
System.out.println("Received some response from Server." + value.getResult());
}

@Override
public void onError(Throwable t) {
countDownLatch.countDown();
}

@Override
public void onCompleted() {
// The server is done sending all the responses for 1 cycle.
countDownLatch.countDown();
}
}
);

System.out.println("Sending Stream of messages to the Server.");
Arrays.asList("Honesty", "is", "the", "best", "policy").forEach(
item -> {
System.out.println("Sending message to the server now.");
requestStreamObserver.onNext(GreetRequest.newBuilder().setGreeting(Greeting.newBuilder().setFirstName(item).build()).build());
}
);

// Tell the server that client is done sending the data.
requestStreamObserver.onCompleted();

countDownLatch.await(3L, TimeUnit.SECONDS);
}

Example use-case of Bidirectional Streaming API :- Say we have an infinite stream of numbers coming in and we have to find out return the maximum out of this stream as and when any maximum number comes-in. We can use the Bidirectional Streaming API to send multiple messages to the server and also receive multiple responses back from the server.

Error Handling with gRPC :- There are inherent error-codes provided by gRPC in order to handle multiple use-cases. Here are possible codes, direct from gRPC website:-

Lets see the sample code-base for handling the error-codes :- We have a simple unary API for finding the square-root of the given number.

service CalculatorService {
// Unary API
rpc SquareRootFinder(SquareRootRequest) returns (SquareRootResponse) {};
}

Next, here is the crux of doing the error-handling with ‘onError’ method.

@Override
public void squareRootFinder(SquareRootRequest request, StreamObserver<SquareRootResponse> responseObserver) {
// Extract information from Request as it is received.
int inputNumber = request.getInputNumber();

if(inputNumber > 0) {
// Send back the response
double squareRootNumber = Math.sqrt(inputNumber);
responseObserver.onNext(SquareRootResponse.newBuilder()
.setOutputNumber(squareRootNumber).build());
} else {
// Send back the response not possible.
responseObserver
.onError(Status.INVALID_ARGUMENT.withDescription("Number is Not positive.").asRuntimeException());
}
// complete the call
responseObserver.onCompleted();

}

Below is how the output looks like at client side :-

Timeout handling(Deadlines) at client-end with gRPC :- With Deadlines, we can specify that, how long our client is willing to wait for an RPC call to be completed, before RPC is terminated with error DEADLINE_EXCEEDED. Lets say, our above SquareRoot finder service takes more time, than the say specified limit of 200 ms, we shall then be cutting off the connection. Lets see this in example for service implementation :-

@Override
public void squareRootFinder(SquareRootRequest request, StreamObserver<SquareRootResponse> responseObserver) {
System.out.println("Method control reached to this RPC method.");
// Fetches the current context.
Context context = Context.current();

// Extract information from Request as it is received.
int inputNumber = request.getInputNumber();

if(inputNumber > 0) {
// Send back the response
try {
System.out.println("Computation under progress still.");
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}

if(context.isCancelled()) {
System.out.println("Client has cancelled the request");
} else {
double squareRootNumber = Math.sqrt(inputNumber);
responseObserver.onNext(SquareRootResponse.newBuilder()
.setOutputNumber(squareRootNumber).build());
}
} else {
// Send back the response not possible.
responseObserver
.onError(Status.INVALID_ARGUMENT.withDescription("Number is Not positive.").asRuntimeException());
}
// complete the call
responseObserver.onCompleted();

}

Below is how our client implementation looks like. Here, response should come within 700 ms and if it doesn’t comes, a run time exception shall be triggered as shown below :-

// Invoke the function with Deadline being set.
// This is actually a N/w call, but looks like method call.
try {
System.out.println("Sending request with a deadline of 500 ms.");
SquareRootResponse squareRootResponse = calculatorServiceBlockingStub
.withDeadline(Deadline.after(700, TimeUnit.MILLISECONDS))
.squareRootFinder(squareRootRequest);

System.out.println("Square root response thus received is : " + squareRootResponse.getOutputNumber());
} catch(StatusRuntimeException e) {
if(e.getStatus().equals(Status.DEADLINE_EXCEEDED)) {
System.out.println("Deadline has been exceeded and response not received wven within 500 ms.");
}
e.printStackTrace();
}

SSL Encryption with gRPC :- In production, gRPC calls should be running with encryption enabled. This is done by generating the SSL certificates. SSL allows communication to be secured end-to-end and ensuring no man in middle attack can be performed.

Why at all we need the SSL encryption :- While we communicate over the Internet, data thus transferred is visible to all the servers involved in the service-path. Thus, Any hop/router/machine can intercept the traffic and read contents, if its being sent using PLAINTEXT over HTTP.

With SSL, the data gets encrypted at client’s end and same data would be decrypted at server’s end, thus any intermediate router/hop wouldn’t be able to decipher the packets being transferred through it.

TLS is an successor to SSL and it encrypts the connection between 2 end-points for secure data exchange. Below is how the gRPC server would generally request to some Certifying Authority(like Google) for getting its certificate signed and it then sends this same signed certificate to the client. Once Client verifies this SSL certificate, it then establishes secure SSL communication.

We would now create the certificates which we can use locally :-

  • Step 1.) Lets create our “Certificate Authority private key file” (this shouldn’t be shared in real-life) :-
openssl genrsa -passout pass:1111 -des3 -out ca.key 4096
  • Step 2.) Lets create our “Certificate Authority trust certificate” (this can be shared with users in real-life, as this is a public certificate :-
openssl req -passin pass:1111 -new -x509 -days 365 -key ca.key -out ca.crt -subj "/CN=${SERVER_CN}"
  • Step 3.) Lets create our “Server private key” :-
openssl genrsa -passout pass:1111 -des3 -out server.key 4096
  • Step 4.) Now, Lets Get a certificate signing request from the CA :-
openssl req -passin pass:1111 -new -key server.key -out server.csr -subj "/CN=${SERVER_CN}"
  • Step 5.) We now sign the certificate with the CA we created :-
openssl x509 -req -passin pass:1111 -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt
  • Step 6.) We convert the server certificate to .pem format (usable by gRPC) :-
openssl pkcs8 -topk8 -nocrypt -passin pass:1111 -in server.key -out server.pem

Lets see this in example for service implementation. Below is the gradle dependency required for using the SSL feature :-

implementation 'io.grpc:grpc-netty-shaded:1.32.1' // Includes SSL Libraries.

Here is how our server would now look like. While initialising the Server, we shall be using the Self-signed server certificate and server private key.

Server server = ServerBuilder.forPort(50051)
.addService(new CalculatorServiceImpl())
.useTransportSecurity(
new File("ssl/server.crt"),
new File("ssl/server.pem"))
.build();

With our server all set now, if we try to start our client, it would fail, since we still don’t have security setting being implemented for client :-

io.grpc.StatusRuntimeException: UNAVAILABLE: Network closed for unknown reason
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)

Lets now implement the security at client’s end. Here we shall be needing the Trust certificate of CA :-

ManagedChannel securedChannel = NettyChannelBuilder.forAddress("localhost", 50051)
.sslContext(GrpcSslContexts.forClient().trustManager(new File("ssl/ca.crt")).build())
.build();

Reflection with gRPC :- Lets see an example for implementing the Reflections with gRPC. Following dependency is required for the same :-

compile 'io.grpc:grpc-services:1.32.1' // Reflections.

We also need to add following command while initialising the server as well :-

Server server = ServerBuilder.forPort(50051)
.addService(new CalculatorServiceImpl())
.addService(ProtoReflectionService.newInstance())
.build();

Lets now install the “evans” command-line-tool and verify the same. Next we shall try to see the signature of request/response and services :-

brew tap ktr0731/evans
brew install evans

aditya-MAC:~ aditya$ evans

evans 0.9.1

Below is how we can invoke the RPC through command-line and verify the results :-

References :-