Hands-on with gRPC with Java & Mongo

aditya goel
5 min readOct 18, 2020

--

We shall be building a Blog application which shall support CRUD features with help of On-Premises MongoDB. So,

Step 1.) Lets begin with installation of MongoDB at our local machine :-

aditya-MAC:bin aditya$ pwd

/Users/aditya/Documents/LEARNING/MongoDB/mongodb-macos-x86_64–4.4.1/bin

aditya-MAC:bin aditya$ ./mongod — dbpath ../data/db

{“t”:{“$date”:”2020–10–18T13:42:27.926+05:30"},”s”:”I”, “c”:”CONTROL”, “id”:23285, “ctx”:”main”,”msg”:”Automatically disabling TLS 1.0, to force-enable TLS 1.0 specify — sslDisabledProtocols ‘none’”}

{“t”:{“$date”:”2020–10–18T13:42:28.867+05:30"},”s”:”I”, “c”:”NETWORK”, “id”:23015, “ctx”:”listener”,”msg”:”Listening on”,”attr”:{“address”:”/tmp/mongodb-27017.sock”}}

{“t”:{“$date”:”2020–10–18T13:42:28.867+05:30"},”s”:”I”, “c”:”NETWORK”, “id”:23015, “ctx”:”listener”,”msg”:”Listening on”,”attr”:{“address”:”127.0.0.1"}}

Part1.) Create Records API :-

Step 1.) Next course of action is to define the service and message structures :-

syntax = "proto3";

package greet;

option java_package = "com.proto.blog";
option java_multiple_files = true;

message Blog {
string id = 1;
string author_id = 2;
string title = 3;
string content = 4;
}

message CreateBlogRequest {
Blog blog = 1;
}

message CreateBlogResponse {
Blog blog = 1;
}

service BlogService {
// Unary API
rpc CreateBlog(CreateBlogRequest) returns (CreateBlogResponse) {};
}

Step 2.) Next course of action is to define the logic of our service :-

public class BlogServiceImpl extends BlogServiceGrpc.BlogServiceImplBase {

private MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
private MongoDatabase mongoDatabase = mongoClient.getDatabase("myDB");
private MongoCollection<Document> mongoCollection = mongoDatabase.getCollection("blog");

@Override
public void createBlog(CreateBlogRequest request, StreamObserver<CreateBlogResponse> responseObserver) {
System.out.println("Received request for indexing a Blog to MongoDB.");
Blog blogReceivedInRequest = request.getBlog();

Document mongoDocToBeSaved = new Document("author_id", blogReceivedInRequest.getAuthorId())
.append("title", blogReceivedInRequest.getTitle())
.append("content", blogReceivedInRequest.getContent());

System.out.println("Inserting record to MongoDB.");

// Save the Blog received in Request to the MongoDB local.
mongoCollection.insertOne(mongoDocToBeSaved);

String id = mongoDocToBeSaved.getObjectId("_id").toString();

CreateBlogResponse createBlogResponse = CreateBlogResponse.newBuilder()
.setBlog(blogReceivedInRequest.toBuilder().setId(id).build())
.build();

responseObserver.onNext(createBlogResponse);

responseObserver.onCompleted();

}

Step 3.) Next, Lets see a basic way to write the servers the gRPC way :-

public class BlogServer {

public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("Starting the gRPC Server with BlogService hosted in it.");

// plaintext server
Server server = ServerBuilder.forPort(50051)
.addService(new BlogServiceImpl())
.build();

server.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Received Shutdown Request");
server.shutdown();
System.out.println("Successfully stopped the server");
}));

server.awaitTermination();
}
}

Step 4.) Next, Lets see a simple implementation for client :-

public class BlogClient {

public static void main(String[] args) throws SSLException {
System.out.println("Starting our gRPC client");

ManagedChannel unsecuredChannel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext() // Not recommended for production.
.build();

// Build the Blog request, which has to be sent to the server.
Blog blogCreated = Blog.newBuilder()
.setAuthorId("123")
.setContent("GRPC Blog COntent for CRUD application.")
.setTitle("GRPC Blog Title for CRUD application.")
.build();

CreateBlogRequest createBlogRequest = CreateBlogRequest.newBuilder()
.setBlog(blogCreated)
.build();

// Create the Synchronous blocking client.
BlogServiceGrpc.BlogServiceBlockingStub blogServiceBlockingStub =
BlogServiceGrpc.newBlockingStub(unsecuredChannel);

// Invoke the function (being implemented at server). This is actually a N/w call, but looks like method call.
CreateBlogResponse createBlogResponse = blogServiceBlockingStub.createBlog(createBlogRequest);

System.out.println("Received response is :" + createBlogResponse.toString());

unsecuredChannel.shutdown();
}
}

Part2.) Read Records API :-

Step 1.) Lets define the service and Messages formats :-

message ReadBlogRequest {
string blog_id = 1;
}

message ReadBlogResponse {
Blog blog = 1;
}


service BlogService {
// Unary API
rpc ReadBlog(ReadBlogRequest) returns (ReadBlogResponse) {};
}

Step 2.) Next course of action is to define the logic of our service :-

@Override
public void readBlog(ReadBlogRequest request, StreamObserver<ReadBlogResponse> responseObserver) {
System.out.println("Received request for Fetching a Blog from MongoDB.");

Document fetchedDocFromMongo =
mongoCollection.find(Filters.eq("_id", new ObjectId(request.getBlogId()))).first();

if(fetchedDocFromMongo == null) {
responseObserver
.onError(Status.NOT_FOUND.withDescription("No blog exists with this Id.").asRuntimeException());
} else {
ReadBlogResponse fetchedBlogResponse = ReadBlogResponse.newBuilder()
.setBlog(Blog.newBuilder()
.setContent(fetchedDocFromMongo.getString("content"))
.setAuthorId(fetchedDocFromMongo.getString("author_id"))
.setTitle(fetchedDocFromMongo.getString("title"))
.setId(request.getBlogId())
.build())
.build();

responseObserver.onNext(fetchedBlogResponse);
}
responseObserver.onCompleted();
}

Step 3.) Next, The server shall remain same as we defined above.

Step 4.) Next, Lets see a how the Client shall look like for this use-case :-

private static Blog readTheBlog(String id, ManagedChannel unsecuredChannel) {
// Create the Synchronous blocking client.
BlogServiceGrpc.BlogServiceBlockingStub blogServiceBlockingStub =
BlogServiceGrpc.newBlockingStub(unsecuredChannel);


// Build the Blog request, which has to be sent to the server.
ReadBlogRequest readBlogRequest = ReadBlogRequest.newBuilder()
.setBlogId(id).build();

// Invoke the function (being implemented at server). This is actually a N/w call, but looks like method call.
ReadBlogResponse readBlogResponse = blogServiceBlockingStub.readBlog(readBlogRequest);

System.out.println("Received response from ReadBlogCall is :" + readBlogResponse.toString());

return readBlogResponse.getBlog();
}

Part3.) Update Records API :-

Step 1.) Lets define the service and Messages formats :-

message UpdateBlogRequest {
Blog blog = 1;
}

message UpdateBlogResponse {
Blog blog = 1;
}

service BlogService {
rpc UpdateBlog(UpdateBlogRequest) returns (UpdateBlogResponse) {};

}

Step 2.) Next course of action is to define the logic of our service :-

@Override
public void updateBlog(UpdateBlogRequest request, StreamObserver<UpdateBlogResponse> responseObserver) {
System.out.println("Received request for Updating a Blog from MongoDB.");

Document fetchedDocFromMongo =
mongoCollection.find(Filters.eq("_id", new ObjectId(request.getBlog().getId()))).first();

if(fetchedDocFromMongo == null) {
responseObserver
.onError(Status.NOT_FOUND.withDescription("No blog exists with this Id.").asRuntimeException());
} else {
Document updatedMongoDocToBeSaved = new Document("author_id", request.getBlog().getAuthorId())
.append("title", request.getBlog().getTitle())
.append("content", request.getBlog().getContent());

System.out.println("Updating record to MongoDB.");

// Update the Blog (as received) in the Mongo local.
mongoCollection
.replaceOne(
Filters.eq("_id", fetchedDocFromMongo.getObjectId("_id")),
updatedMongoDocToBeSaved);

responseObserver.onNext(UpdateBlogResponse.newBuilder().setBlog(Blog.newBuilder()
.setContent(updatedMongoDocToBeSaved.getString("content"))
.setAuthorId(updatedMongoDocToBeSaved.getString("author_id"))
.setTitle(updatedMongoDocToBeSaved.getString("title"))
.setId(fetchedDocFromMongo.getObjectId("_id").toString())
.build())
.build());
}
responseObserver.onCompleted();
}

Step 3.) Next, The server shall remain same as we defined above.

Step 4.) Next, Lets see a how the Client shall look like for this use-case :-

private static Blog updateTheBlog(ManagedChannel unsecuredChannel) {
// Create the Synchronous blocking client.
BlogServiceGrpc.BlogServiceBlockingStub blogServiceBlockingStub =
BlogServiceGrpc.newBlockingStub(unsecuredChannel);

// Build the Blog request, which has to be sent to the server.
UpdateBlogRequest updateBlogRequest = UpdateBlogRequest.newBuilder()
.setBlog(Blog.newBuilder()
.setId("5f8c11bd8f7bc8bf15927ca9")
.setAuthorId("ChangedAuthorId")
.setContent("Updated Content for this Blog")
.setTitle("Updated title for this blog")
.build())
.build();

// Invoke the function (being implemented at server). This is actually a N/w call, but looks like method call.
UpdateBlogResponse updateBlogResponse = blogServiceBlockingStub.updateBlog(updateBlogRequest);

System.out.println("Received response from UpdateBlogCall is :" + updateBlogResponse.toString());

return updateBlogResponse.getBlog();
}

Part4.) Delete Records API :-

Step 1.) Lets define the service and Messages formats :-

message DeleteBlogRequest {
string blog_id = 1;
}

message DeleteBlogResponse {
string blog_id = 1;
}


service BlogService {
rpc DeleteBlog(DeleteBlogRequest) returns (DeleteBlogResponse) {};
}

Step 2.) Next course of action is to define the logic of our service :-

@Override
public void deleteBlog(DeleteBlogRequest request, StreamObserver<DeleteBlogResponse> responseObserver) {
System.out.println("Received request for Deleting a Blog from MongoDB.");

DeleteResult deleteResult =
mongoCollection.deleteOne(Filters.eq("_id", new ObjectId(request.getBlogId())));

if(deleteResult.getDeletedCount() == 0) {
responseObserver
.onError(Status.NOT_FOUND.withDescription("No blog exists with this Id.").asRuntimeException());
} else {
DeleteBlogResponse deleteBlogResponse = DeleteBlogResponse.newBuilder()
.setBlogId(request.getBlogId())
.build();

responseObserver.onNext(deleteBlogResponse);
}
responseObserver.onCompleted();
}

Step 3.) Next, The server shall remain same as we defined above.

Step 4.) Next, Lets see a how the Client shall look like for this use-case :-

private static String deleteTheBlog(ManagedChannel unsecuredChannel) {
// Create the Synchronous blocking client.
BlogServiceGrpc.BlogServiceBlockingStub blogServiceBlockingStub =
BlogServiceGrpc.newBlockingStub(unsecuredChannel);

// Build the Blog request, which has to be sent to the server.
DeleteBlogRequest deleteBlogRequest = DeleteBlogRequest.newBuilder()
.setBlogId("5f8c11bd8f7bc8bf15927ca9")
.build();

// Invoke the function (being implemented at server). This is actually a N/w call, but looks like method call.
DeleteBlogResponse deleteBlogResponse = blogServiceBlockingStub.deleteBlog(deleteBlogRequest);

System.out.println("Received response from Deleting BlogCall is :" + deleteBlogResponse.toString());

return deleteBlogResponse.getBlogId();
}

Lets now play with these APIs through evans CLI :-

References :-

--

--