Hands-on with gRPC with Java & Mongo

Part1.) Create Records API :-

syntax = "proto3";

package greet;

option java_package = "com.proto.blog";
option java_multiple_files = true;

message Blog {
string id = 1;
string author_id = 2;
string title = 3;
string content = 4;
}

message CreateBlogRequest {
Blog blog = 1;
}

message CreateBlogResponse {
Blog blog = 1;
}

service BlogService {
// Unary API
rpc CreateBlog(CreateBlogRequest) returns (CreateBlogResponse) {};
}
public class BlogServiceImpl extends BlogServiceGrpc.BlogServiceImplBase {

private MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
private MongoDatabase mongoDatabase = mongoClient.getDatabase("myDB");
private MongoCollection<Document> mongoCollection = mongoDatabase.getCollection("blog");

@Override
public void createBlog(CreateBlogRequest request, StreamObserver<CreateBlogResponse> responseObserver) {
System.out.println("Received request for indexing a Blog to MongoDB.");
Blog blogReceivedInRequest = request.getBlog();

Document mongoDocToBeSaved = new Document("author_id", blogReceivedInRequest.getAuthorId())
.append("title", blogReceivedInRequest.getTitle())
.append("content", blogReceivedInRequest.getContent());

System.out.println("Inserting record to MongoDB.");

// Save the Blog received in Request to the MongoDB local.
mongoCollection.insertOne(mongoDocToBeSaved);

String id = mongoDocToBeSaved.getObjectId("_id").toString();

CreateBlogResponse createBlogResponse = CreateBlogResponse.newBuilder()
.setBlog(blogReceivedInRequest.toBuilder().setId(id).build())
.build();

responseObserver.onNext(createBlogResponse);

responseObserver.onCompleted();

}
public class BlogServer {

public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("Starting the gRPC Server with BlogService hosted in it.");

// plaintext server
Server server = ServerBuilder.forPort(50051)
.addService(new BlogServiceImpl())
.build();

server.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Received Shutdown Request");
server.shutdown();
System.out.println("Successfully stopped the server");
}));

server.awaitTermination();
}
}
public class BlogClient {

public static void main(String[] args) throws SSLException {
System.out.println("Starting our gRPC client");

ManagedChannel unsecuredChannel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext() // Not recommended for production.
.build();

// Build the Blog request, which has to be sent to the server.
Blog blogCreated = Blog.newBuilder()
.setAuthorId("123")
.setContent("GRPC Blog COntent for CRUD application.")
.setTitle("GRPC Blog Title for CRUD application.")
.build();

CreateBlogRequest createBlogRequest = CreateBlogRequest.newBuilder()
.setBlog(blogCreated)
.build();

// Create the Synchronous blocking client.
BlogServiceGrpc.BlogServiceBlockingStub blogServiceBlockingStub =
BlogServiceGrpc.newBlockingStub(unsecuredChannel);

// Invoke the function (being implemented at server). This is actually a N/w call, but looks like method call.
CreateBlogResponse createBlogResponse = blogServiceBlockingStub.createBlog(createBlogRequest);

System.out.println("Received response is :" + createBlogResponse.toString());

unsecuredChannel.shutdown();
}
}

Part2.) Read Records API :-

message ReadBlogRequest {
string blog_id = 1;
}

message ReadBlogResponse {
Blog blog = 1;
}


service BlogService {
// Unary API
rpc ReadBlog(ReadBlogRequest) returns (ReadBlogResponse) {};
}
@Override
public void readBlog(ReadBlogRequest request, StreamObserver<ReadBlogResponse> responseObserver) {
System.out.println("Received request for Fetching a Blog from MongoDB.");

Document fetchedDocFromMongo =
mongoCollection.find(Filters.eq("_id", new ObjectId(request.getBlogId()))).first();

if(fetchedDocFromMongo == null) {
responseObserver
.onError(Status.NOT_FOUND.withDescription("No blog exists with this Id.").asRuntimeException());
} else {
ReadBlogResponse fetchedBlogResponse = ReadBlogResponse.newBuilder()
.setBlog(Blog.newBuilder()
.setContent(fetchedDocFromMongo.getString("content"))
.setAuthorId(fetchedDocFromMongo.getString("author_id"))
.setTitle(fetchedDocFromMongo.getString("title"))
.setId(request.getBlogId())
.build())
.build();

responseObserver.onNext(fetchedBlogResponse);
}
responseObserver.onCompleted();
}
private static Blog readTheBlog(String id, ManagedChannel unsecuredChannel) {
// Create the Synchronous blocking client.
BlogServiceGrpc.BlogServiceBlockingStub blogServiceBlockingStub =
BlogServiceGrpc.newBlockingStub(unsecuredChannel);


// Build the Blog request, which has to be sent to the server.
ReadBlogRequest readBlogRequest = ReadBlogRequest.newBuilder()
.setBlogId(id).build();

// Invoke the function (being implemented at server). This is actually a N/w call, but looks like method call.
ReadBlogResponse readBlogResponse = blogServiceBlockingStub.readBlog(readBlogRequest);

System.out.println("Received response from ReadBlogCall is :" + readBlogResponse.toString());

return readBlogResponse.getBlog();
}

Part3.) Update Records API :-

message UpdateBlogRequest {
Blog blog = 1;
}

message UpdateBlogResponse {
Blog blog = 1;
}

service BlogService {
rpc UpdateBlog(UpdateBlogRequest) returns (UpdateBlogResponse) {};

}
@Override
public void updateBlog(UpdateBlogRequest request, StreamObserver<UpdateBlogResponse> responseObserver) {
System.out.println("Received request for Updating a Blog from MongoDB.");

Document fetchedDocFromMongo =
mongoCollection.find(Filters.eq("_id", new ObjectId(request.getBlog().getId()))).first();

if(fetchedDocFromMongo == null) {
responseObserver
.onError(Status.NOT_FOUND.withDescription("No blog exists with this Id.").asRuntimeException());
} else {
Document updatedMongoDocToBeSaved = new Document("author_id", request.getBlog().getAuthorId())
.append("title", request.getBlog().getTitle())
.append("content", request.getBlog().getContent());

System.out.println("Updating record to MongoDB.");

// Update the Blog (as received) in the Mongo local.
mongoCollection
.replaceOne(
Filters.eq("_id", fetchedDocFromMongo.getObjectId("_id")),
updatedMongoDocToBeSaved);

responseObserver.onNext(UpdateBlogResponse.newBuilder().setBlog(Blog.newBuilder()
.setContent(updatedMongoDocToBeSaved.getString("content"))
.setAuthorId(updatedMongoDocToBeSaved.getString("author_id"))
.setTitle(updatedMongoDocToBeSaved.getString("title"))
.setId(fetchedDocFromMongo.getObjectId("_id").toString())
.build())
.build());
}
responseObserver.onCompleted();
}
private static Blog updateTheBlog(ManagedChannel unsecuredChannel) {
// Create the Synchronous blocking client.
BlogServiceGrpc.BlogServiceBlockingStub blogServiceBlockingStub =
BlogServiceGrpc.newBlockingStub(unsecuredChannel);

// Build the Blog request, which has to be sent to the server.
UpdateBlogRequest updateBlogRequest = UpdateBlogRequest.newBuilder()
.setBlog(Blog.newBuilder()
.setId("5f8c11bd8f7bc8bf15927ca9")
.setAuthorId("ChangedAuthorId")
.setContent("Updated Content for this Blog")
.setTitle("Updated title for this blog")
.build())
.build();

// Invoke the function (being implemented at server). This is actually a N/w call, but looks like method call.
UpdateBlogResponse updateBlogResponse = blogServiceBlockingStub.updateBlog(updateBlogRequest);

System.out.println("Received response from UpdateBlogCall is :" + updateBlogResponse.toString());

return updateBlogResponse.getBlog();
}

Part4.) Delete Records API :-

message DeleteBlogRequest {
string blog_id = 1;
}

message DeleteBlogResponse {
string blog_id = 1;
}


service BlogService {
rpc DeleteBlog(DeleteBlogRequest) returns (DeleteBlogResponse) {};
}
@Override
public void deleteBlog(DeleteBlogRequest request, StreamObserver<DeleteBlogResponse> responseObserver) {
System.out.println("Received request for Deleting a Blog from MongoDB.");

DeleteResult deleteResult =
mongoCollection.deleteOne(Filters.eq("_id", new ObjectId(request.getBlogId())));

if(deleteResult.getDeletedCount() == 0) {
responseObserver
.onError(Status.NOT_FOUND.withDescription("No blog exists with this Id.").asRuntimeException());
} else {
DeleteBlogResponse deleteBlogResponse = DeleteBlogResponse.newBuilder()
.setBlogId(request.getBlogId())
.build();

responseObserver.onNext(deleteBlogResponse);
}
responseObserver.onCompleted();
}
private static String deleteTheBlog(ManagedChannel unsecuredChannel) {
// Create the Synchronous blocking client.
BlogServiceGrpc.BlogServiceBlockingStub blogServiceBlockingStub =
BlogServiceGrpc.newBlockingStub(unsecuredChannel);

// Build the Blog request, which has to be sent to the server.
DeleteBlogRequest deleteBlogRequest = DeleteBlogRequest.newBuilder()
.setBlogId("5f8c11bd8f7bc8bf15927ca9")
.build();

// Invoke the function (being implemented at server). This is actually a N/w call, but looks like method call.
DeleteBlogResponse deleteBlogResponse = blogServiceBlockingStub.deleteBlog(deleteBlogRequest);

System.out.println("Received response from Deleting BlogCall is :" + deleteBlogResponse.toString());

return deleteBlogResponse.getBlogId();
}

--

--

--

Software Engineer for Big Data distributed systems

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Why I fell out of love with Storyboards …

Nervos attends Grin meetup and shares Mimblewimble update

Attendees listen to a speaker at the Grin Community meetup

Interface vs Abstract classes

GitGraber — Tool to scan github for secrets

Integrating a file with AWS S3 using WSO2 Streaming Integrator

The Truth About Spreadsheets

Coding Asynchronously in JavaScript (and other languages).

SRE and DevOps

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
aditya goel

aditya goel

Software Engineer for Big Data distributed systems

More from Medium

Manual message acknowledgement in Kafka

5 Common REST API Challenges

[Spring Boot] Batching GraphQL Queries with @BatchMapping

Deploying the JDA discord bot with Docker