DEV Community

Cover image for Kotlin gRPC with Spring πŸ‘‹βœ¨πŸ’«
Alexander
Alexander

Posted on

Kotlin gRPC with Spring πŸ‘‹βœ¨πŸ’«

πŸ‘¨β€πŸ’» Full list what has been used:

Spring web framework
Spring WebFlux Reactive REST Services
gRPC Kotlin gRPC
gRPC-Spring-Boot-Starter gRPC Spring Boot Starter
Spring Data R2DBC a specification to integrate SQL databases using reactive drivers
Zipkin open source, end-to-end distributed tracing
Spring Cloud Sleuth autoconfiguration for distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
Kubernetes automating deployment, scaling, and management of containerized applications
Docker and docker-compose
Helm The package manager for Kubernetes
Flywaydb for migrations

Source code you can find in the GitHub repository.
For this project let's implement Kotlin gRPC microservice using Spring and Postgresql.
gRPC is very good for low latency and high throughput communication, so it's great for microservices where efficiency is critical.
Messages are encoded with Protobuf by default. While Protobuf is efficient to send and receive, its binary format.
Spring doesn't provide us gRPC starter out of the box, and we have to use community one, the most popular is yidongnan
and LogNet, both is good and ready to use,
for this project selected first one.
At the first step we have to add gRPC Kotlin Codegen Plugin for Protobuf Compiler.

All UI interfaces will be available on ports:

Swagger UI: http://localhost:8000/webjars/swagger-ui/index.html

Swagger

Grafana UI: http://localhost:3000

Grafana

Zipkin UI: http://localhost:9411

Zipkin

Prometheus UI: http://localhost:9090

Prometheus

Docker-compose file for this project:

version: "3.9"

services:
  microservices_postgresql:
    image: postgres:latest
    container_name: microservices_postgresql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=bank_accounts
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./docker_data/microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: microservices_node_exporter
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: microservices_grafana
    restart: always
    image: grafana/grafana
    ports:
      - '3000:3000'
    networks: [ "microservices" ]

  zipkin:
    image: openzipkin/zipkin:latest
    restart: always
    container_name: microservices_zipkin
    ports:
      - "9411:9411"
    networks: [ "microservices" ]

networks:
  microservices:
    name: microservices
Enter fullscreen mode Exit fullscreen mode

gRPC messages are serialized using Protobuf, an efficient binary message format, it serializes very quickly on the server and client,
and its serialization results in small message payloads, important in limited bandwidth scenarios like mobile apps.
The interface contract for specifying the RPC definitions for each service would be defined using Protocol Buffers.
Each microservice will have a proto file defined here for this.
At the first we have to define a service in a proto file and compile it, it has at most unary methods and one server streaming:

syntax = "proto3";

package com.example.grpc.bank.service;

import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

service BankAccountService {
  rpc createBankAccount (CreateBankAccountRequest) returns (CreateBankAccountResponse);
  rpc getBankAccountById (GetBankAccountByIdRequest) returns (GetBankAccountByIdResponse);
  rpc depositBalance (DepositBalanceRequest) returns (DepositBalanceResponse);
  rpc withdrawBalance (WithdrawBalanceRequest) returns (WithdrawBalanceResponse);
  rpc getAllByBalance (GetAllByBalanceRequest) returns (stream GetAllByBalanceResponse);
  rpc getAllByBalanceWithPagination(GetAllByBalanceWithPaginationRequest) returns (GetAllByBalanceWithPaginationResponse);
}

message BankAccountData {
  string id = 1;
  string firstName = 2;
  string lastName = 3;
  string email = 4;
  string address = 5;
  string currency = 6;
  string phone = 7;
  double balance = 8;
  string createdAt = 9;
  string updatedAt = 10;
}

message CreateBankAccountRequest {
  string email = 1;
  string firstName = 2;
  string lastName = 3;
  string address = 4;
  string currency = 5;
  string phone = 6;
  double balance = 7;
}

message CreateBankAccountResponse {
  BankAccountData bankAccount = 1;
}

message GetBankAccountByIdRequest {
  string id = 1;
}

message GetBankAccountByIdResponse {
  BankAccountData bankAccount = 1;
}

message DepositBalanceRequest {
  string id = 1;
  double balance = 2;
}

message DepositBalanceResponse {
  BankAccountData bankAccount = 1;
}

message WithdrawBalanceRequest {
  string id = 1;
  double balance = 2;
}

message WithdrawBalanceResponse {
  BankAccountData bankAccount = 1;
}

message GetAllByBalanceRequest {
  double min = 1;
  double max = 2;
  int32 page = 3;
  int32 size = 4;
}

message GetAllByBalanceResponse {
  BankAccountData bankAccount = 1;
}

message GetAllByBalanceWithPaginationRequest {
  double min = 1;
  double max = 2;
  int32 page = 3;
  int32 size = 4;
}

message GetAllByBalanceWithPaginationResponse {
  repeated BankAccountData bankAccount = 1;
  int32 page = 2;
  int32 size = 3;
  int32 totalElements = 4;
  int32 totalPages = 5;
  bool isFirst = 6;
  bool isLast = 7;
}
Enter fullscreen mode Exit fullscreen mode

The actual maven dependencies for gRPC:

<dependencies>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-kotlin-stub</artifactId>
        <version>${grpc.kotlin.version}</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-protobuf</artifactId>
        <version>${java.grpc.version}</version>
    </dependency>
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-kotlin</artifactId>
        <version>${protobuf.version}</version>
    </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

And the maven protobuf plugin:

<plugin>
    <groupId>org.xolstice.maven.plugins</groupId>
    <artifactId>protobuf-maven-plugin</artifactId>
    <version>0.6.1</version>
    <executions>
        <execution>
            <id>compile</id>
            <goals>
                <goal>compile</goal>
                <goal>compile-custom</goal>
            </goals>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:${protobuf.protoc.version}:exe:${os.detected.classifier}
                </protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>io.grpc:protoc-gen-grpc-java:${java.grpc.version}:exe:${os.detected.classifier}
                </pluginArtifact>
                <protocPlugins>
                    <protocPlugin>
                        <id>grpc-kotlin</id>
                        <groupId>io.grpc</groupId>
                        <artifactId>protoc-gen-grpc-kotlin</artifactId>
                        <version>${grpc.kotlin.version}</version>
                        <classifier>jdk8</classifier>
                        <mainClass>io.grpc.kotlin.generator.GeneratorRunner</mainClass>
                    </protocPlugin>
                </protocPlugins>
            </configuration>
        </execution>
        <execution>
            <id>compile-kt</id>
            <goals>
                <goal>compile-custom</goal>
            </goals>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:${protobuf.protoc.version}:exe:${os.detected.classifier}
                </protocArtifact>
                <outputDirectory>${project.build.directory}/generated-sources/protobuf/kotlin</outputDirectory>
                <pluginId>kotlin</pluginId>
            </configuration>
        </execution>
    </executions>
</plugin>
Enter fullscreen mode Exit fullscreen mode

The plugin generates a class for each of your gRPC services.
For example: BankAccountGrpcServiceGrpc where BankAccountGrpcService is the name of the gRPC service in the proto file.
This class contains both the client stubs and the server ImplBase that you will need to extend.
After compilation is done, we can implement out gRPC service.
@GrpcService allow us to pass list of interceptors specific for this service, so we can add LogGrpcInterceptor here.
For request validation let's use spring-boot-starter-validation which uses Hibernate Validator

@GrpcService(interceptors = [LogGrpcInterceptor::class])
class BankAccountGrpcService(
    private val bankAccountService: BankAccountService,
    private val tracer: Tracer,
    private val validator: Validator
) : BankAccountServiceGrpcKt.BankAccountServiceCoroutineImplBase() {


    override suspend fun createBankAccount(request: CreateBankAccountRequest): CreateBankAccountResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(CREATE_BANK_ACCOUNT)

                runWithTracing(span) {
                    bankAccountService.createBankAccount(validate(BankAccount.of(request)))
                        .let { CreateBankAccountResponse.newBuilder().setBankAccount(it.toProto()).build() }
                        .also { it ->
                            log.info("created bank account: $it").also { span.tag("account", it.toString()) }
                        }
                }
            }
        }

    override suspend fun getBankAccountById(request: GetBankAccountByIdRequest): GetBankAccountByIdResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(GET_BANK_ACCOUNT_BY_ID)

                runWithTracing(span) {
                    bankAccountService.getBankAccountById(UUID.fromString(request.id))
                        .let { GetBankAccountByIdResponse.newBuilder().setBankAccount(it.toProto()).build() }
                        .also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
                }
            }
        }

    override suspend fun depositBalance(request: DepositBalanceRequest): DepositBalanceResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(DEPOSIT_BALANCE)

                runWithTracing(span) {
                    bankAccountService.depositAmount(UUID.fromString(request.id), BigDecimal.valueOf(request.balance))
                        .let { DepositBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
                        .also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
                }
            }
        }

    override suspend fun withdrawBalance(request: WithdrawBalanceRequest): WithdrawBalanceResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(WITHDRAW_BALANCE)

                runWithTracing(span) {
                    bankAccountService.withdrawAmount(UUID.fromString(request.id), BigDecimal.valueOf(request.balance))
                        .let { WithdrawBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
                        .also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
                }
            }
        }

    override fun getAllByBalance(request: GetAllByBalanceRequest): Flow<GetAllByBalanceResponse> {
        runWithTracing(tracer, GET_ALL_BY_BALANCE) {
            return bankAccountService.findAllByBalanceBetween(validate(FindByBalanceRequestDto.of(request)))
                .map { GetAllByBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
                .flowOn(Dispatchers.IO + tracer.asContextElement())
        }
    }

    override suspend fun getAllByBalanceWithPagination(request: GetAllByBalanceWithPaginationRequest): GetAllByBalanceWithPaginationResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_WITH_PAGINATION)

                runWithTracing(span) {
                    bankAccountService.findByBalanceAmount(validate(FindByBalanceRequestDto.of(request)))
                        .toGetAllByBalanceWithPaginationResponse()
                        .also { log.info("response: $it") }.also { span.tag("response", it.toString()) }
                }
            }

        }


    private fun <T> validate(data: T): T {
        return data.run {
            val errors = validator.validate(data)
            if (errors.isNotEmpty()) throw ConstraintViolationException(errors).also { log.error("validation error: ${it.localizedMessage}") }
            data
        }
    }


    companion object {
        private val log = LoggerFactory.getLogger(BankAccountGrpcService::class.java)
        private const val timeOutMillis = 5000L

        private const val CREATE_BANK_ACCOUNT = "BankAccountGrpcService.createBankAccount"
        private const val GET_BANK_ACCOUNT_BY_ID = "BankAccountGrpcService.getBankAccountById"
        private const val DEPOSIT_BALANCE = "BankAccountGrpcService.depositBalance"
        private const val WITHDRAW_BALANCE = "BankAccountGrpcService.withdrawBalance"
        private const val GET_ALL_BY_BALANCE = "BankAccountGrpcService.getAllByBalance"
        private const val GET_ALL_BY_BALANCE_WITH_PAGINATION = "BankAccountGrpcService.getAllByBalanceWithPagination"
    }
}

fun Page<BankAccount>.toGetAllByBalanceWithPaginationResponse(): GetAllByBalanceWithPaginationResponse {
    return GetAllByBalanceWithPaginationResponse
        .newBuilder()
        .setIsFirst(this.isFirst)
        .setIsLast(this.isLast)
        .setTotalElements(this.totalElements.toInt())
        .setTotalPages(this.totalPages)
        .setPage(this.pageable.pageNumber)
        .setSize(this.pageable.pageSize)
        .addAllBankAccount(this.content.map { it.toProto() })
        .build()
}
Enter fullscreen mode Exit fullscreen mode

Bloom-RPC

Interceptors are a gRPC concept that allows apps to interact with incoming or outgoing gRPC calls.
They offer a way to enrich the request processing pipeline.
We can add gRPC interceptors, here we implement LogGrpcInterceptor:

class LogGrpcInterceptor : ServerInterceptor {

    override fun <ReqT : Any?, RespT : Any?> interceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {
        log.info("Service: ${call.methodDescriptor.serviceName}, Method: ${call.methodDescriptor.bareMethodName}, Headers: $headers")
        return next.startCall(call, headers)
    }

    companion object {
        private val log = LoggerFactory.getLogger(LogGrpcInterceptor::class.java)
    }
}
Enter fullscreen mode Exit fullscreen mode

and add it to the global GrpcGlobalServerInterceptor:

@Configuration(proxyBeanMethods = false)
class GlobalInterceptorConfiguration {

    @GrpcGlobalServerInterceptor
    fun logServerInterceptor(): LogGrpcInterceptor? = LogGrpcInterceptor()
}
Enter fullscreen mode Exit fullscreen mode

Zipkin

The service layer of the microservice has a few methods, for example, working with lists of data it has two methods,
one which returns PageImpl used in unary method response and one returns Flow for gRPC streaming response method.
The current Spring version supports @Transactional annotation with R2DBC
The interface and implementation are below:

@Service
interface BankAccountService {

    suspend fun createBankAccount(bankAccount: BankAccount): BankAccount

    suspend fun getBankAccountById(id: UUID): BankAccount

    suspend fun depositAmount(id: UUID, amount: BigDecimal): BankAccount

    suspend fun withdrawAmount(id: UUID, amount: BigDecimal): BankAccount

    fun findAllByBalanceBetween(requestDto: FindByBalanceRequestDto): Flow<BankAccount>

    suspend fun findByBalanceAmount(requestDto: FindByBalanceRequestDto): Page<BankAccount>
}
Enter fullscreen mode Exit fullscreen mode
@Service
class BankAccountServiceImpl(
    private val bankRepository: BankRepository,
    private val tracer: Tracer
) : BankAccountService {

    @Transactional
    override suspend fun createBankAccount(@Valid bankAccount: BankAccount): BankAccount =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(CREATE_BANK_ACCOUNT)

            runWithTracing(span) {
                bankRepository.save(bankAccount).also { span.tag("saved account", it.toString()) }
            }
        }

    @Transactional(readOnly = true)
    override suspend fun getBankAccountById(id: UUID): BankAccount =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(GET_BANK_ACCOUNT_BY_ID)

            runWithTracing(span) {
                bankRepository.findById(id).also { span.tag("bank account", it.toString()) }
                    ?: throw BankAccountNotFoundException(id.toString())
            }
        }

    @Transactional
    override suspend fun depositAmount(id: UUID, amount: BigDecimal): BankAccount =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(DEPOSIT_AMOUNT)

            runWithTracing(span) {
                bankRepository.findById(id)
                    ?.let { bankRepository.save(it.depositAmount(amount)) }
                    .also { span.tag("bank account", it.toString()) }
                    ?: throw BankAccountNotFoundException(id.toString())
            }
        }

    @Transactional
    override suspend fun withdrawAmount(id: UUID, amount: BigDecimal): BankAccount =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(WITHDRAW_AMOUNT)

            runWithTracing(span) {
                bankRepository.findById(id)
                    ?.let { bankRepository.save(it.withdrawAmount(amount)) }
                    .also { span.tag("bank account", it.toString()) }
                    ?: throw BankAccountNotFoundException(id.toString())
            }
        }

    @Transactional(readOnly = true)
    override fun findAllByBalanceBetween(requestDto: FindByBalanceRequestDto): Flow<BankAccount> {
        val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE)

        runWithTracing(span) {
            return bankRepository.findAllByBalanceBetween(
                requestDto.minBalance,
                requestDto.maxBalance,
                requestDto.pageable
            )
        }
    }

    @Transactional(readOnly = true)
    override suspend fun findByBalanceAmount(requestDto: FindByBalanceRequestDto): Page<BankAccount> =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_WITH_PAGINATION)

            runWithTracing(span) {
                bankRepository.findByBalanceAmount(requestDto.minBalance, requestDto.maxBalance, requestDto.pageable)
                    .also { span.tag("pagination", it.toString()) }
            }
        }


    companion object {
        private const val CREATE_BANK_ACCOUNT = "BankAccountService.createBankAccount"
        private const val GET_BANK_ACCOUNT_BY_ID = "BankAccountService.getBankAccountById"
        private const val DEPOSIT_AMOUNT = "BankAccountService.depositAmount"
        private const val WITHDRAW_AMOUNT = "BankAccountService.withdrawAmount"
        private const val GET_ALL_BY_BALANCE = "BankAccountService.findAllByBalanceBetween"
        private const val GET_ALL_BY_BALANCE_WITH_PAGINATION = "BankAccountService.findByBalanceAmount"
    }
}
Enter fullscreen mode Exit fullscreen mode

Bloom-RPC

R2DBC is an API which provides reactive, non-blocking APIs for relational databases.
Using this, you can have your reactive APIs in Spring Boot read and write information to the database in a reactive/asynchronous way.
The BankRepository is combination of CoroutineSortingRepository from spring data and our custom BankPostgresRepository implementation.
For our custom BankPostgresRepository implementation used here R2dbcEntityTemplate and DatabaseClient.
If we want to have same pagination response like JPA provide,
we have to manually create PageImpl.

Zipkin

@Repository
interface BankRepository : CoroutineSortingRepository<BankAccount, UUID>, BankPostgresRepository {
    fun findAllByBalanceBetween(min: BigDecimal, max: BigDecimal, pageable: Pageable): Flow<BankAccount>
}
Enter fullscreen mode Exit fullscreen mode
@Repository
interface BankPostgresRepository {
    suspend fun findByBalanceAmount(min: BigDecimal, max: BigDecimal, pageable: Pageable): Page<BankAccount>
}
Enter fullscreen mode Exit fullscreen mode
@Repository
class BankPostgresRepositoryImpl(
    private val template: R2dbcEntityTemplate,
    private val databaseClient: DatabaseClient,
    private val tracer: Tracer,
) : BankPostgresRepository {

    override suspend fun findByBalanceAmount(min: BigDecimal, max: BigDecimal, pageable: Pageable): Page<BankAccount> =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_AMOUNT)
            val query = Query.query(Criteria.where(BALANCE).between(min, max))

            runWithTracing(span) {
                val accountsList = async {
                    template.select(query.with(pageable), BankAccount::class.java)
                        .asFlow()
                        .toList()
                }

                val totalCount = async {
                    databaseClient.sql("SELECT count(bank_account_id) as total FROM microservices.bank_accounts WHERE balance BETWEEN :min AND :max")
                        .bind("min", min)
                        .bind("max", max)
                        .fetch()
                        .one()
                        .awaitFirst()
                }

                PageImpl(accountsList.await(), pageable, totalCount.await()["total"] as Long)
                    .also { span.tag("pagination", it.toString()) }
                    .also { log.debug("pagination: $it") }
            }
        }

    companion object {
        private val log = LoggerFactory.getLogger(BankPostgresRepositoryImpl::class.java)
        private const val GET_ALL_BY_BALANCE_AMOUNT = "BankPostgresRepository.findByBalanceAmount"
    }
}
Enter fullscreen mode Exit fullscreen mode

For errors handling gRPC starter provide us GrpcAdvice which marks a class to be checked up for exception handling methods,
@GrpcExceptionHandler marks the annotated method to be executed, in case of the specified exception being thrown,
status codes are good described here

@GrpcAdvice
class GrpcExceptionAdvice {

    @GrpcExceptionHandler(RuntimeException::class)
    fun handleRuntimeException(ex: RuntimeException): StatusException {
        val status = Status.INTERNAL.withDescription(ex.message).withCause(ex)
        return status.asException().also { log.error("status: $status") }
    }

    @GrpcExceptionHandler(BankAccountNotFoundException::class)
    fun handleBankAccountNotFoundException(ex: BankAccountNotFoundException): StatusException {
        val status = Status.INVALID_ARGUMENT.withDescription(ex.message).withCause(ex)
        return status.asException().also { log.error("status: $status") }
    }

    @GrpcExceptionHandler(MethodArgumentNotValidException::class)
    fun handleMethodArgumentNotValidException(ex: MethodArgumentNotValidException): StatusException {
        val errorMap: MutableMap<String, String> = HashMap()
        ex.bindingResult.fieldErrors.forEach { error -> error.defaultMessage?.let { errorMap[error.field] = it } }
        val status = Status.INVALID_ARGUMENT.withDescription(errorMap.toString()).withCause(ex)
        return status.asException().also { log.error("status: $status") }
    }

    @GrpcExceptionHandler(ConstraintViolationException::class)
    fun handleConstraintViolationException(ex: ConstraintViolationException): StatusException {
        val status = Status.INVALID_ARGUMENT.withDescription(ex.toString()).withCause(ex)
        return status.asException().also { log.error("status: $status") }
    }


    companion object {
        private val log = LoggerFactory.getLogger(GrpcExceptionAdvice::class.java)
    }
}
Enter fullscreen mode Exit fullscreen mode

Our microservice also has http controller:

@Tag(name = "BankAccount", description = "Bank Account REST Controller")
@RestController
@RequestMapping(path = ["/api/v1/bank"])
class BankAccountController(private val bankAccountService: BankAccountService) {

    @PostMapping(produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "createBankAccount",
        summary = "Create bew bank account",
        operationId = "createBankAccount",
        description = "Create new bank for account for user"
    )
    suspend fun createBankAccount(@Valid @RequestBody req: CreateBankAccountDto) =
        withTimeout(timeOutMillis) {
            ResponseEntity
                .status(HttpStatus.CREATED)
                .body(bankAccountService.createBankAccount(BankAccount.of(req)).toSuccessHttpResponse())
                .also { log.info("created bank account: $it") }
        }

    @PutMapping(path = ["/deposit/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "depositBalance",
        summary = "Deposit balance",
        operationId = "depositBalance",
        description = "Deposit given amount to the bank account balance"
    )
    suspend fun depositBalance(
        @PathVariable("id") id: UUID,
        @Valid @RequestBody depositBalanceDto: DepositBalanceDto
    ) = withTimeout(timeOutMillis) {
        ResponseEntity.ok(bankAccountService.depositAmount(id, depositBalanceDto.amount).toSuccessHttpResponse())
            .also { log.info("response: $it") }
    }

    @PutMapping(path = ["/withdraw/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "withdrawBalance",
        summary = "Withdraw balance",
        operationId = "withdrawBalance",
        description = "Withdraw given amount from the bank account balance"
    )
    suspend fun withdrawBalance(
        @PathVariable("id") id: UUID,
        @Valid @RequestBody withdrawBalanceDto: WithdrawBalanceDto
    ) = withTimeout(timeOutMillis) {
        ResponseEntity.ok(bankAccountService.depositAmount(id, withdrawBalanceDto.amount).toSuccessHttpResponse())
            .also { log.info("response: $it") }
    }

    @GetMapping(path = ["{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "getBankAccountById",
        summary = "Get bank account by id",
        operationId = "getBankAccountById",
        description = "Get user bank account by given id"
    )
    suspend fun getBankAccountById(@PathVariable(required = true) id: UUID) = withTimeout(timeOutMillis) {
        ResponseEntity.ok(bankAccountService.getBankAccountById(id).toSuccessHttpResponse())
            .also { log.info("success get bank account: $it") }
    }


    @GetMapping(path = ["all/balance"], produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "findAllAccountsByBalance",
        summary = "Find all bank account with given amount range",
        operationId = "findAllAccounts",
        description = "Find all bank accounts for the given balance range with pagination"
    )
    suspend fun findAllAccountsByBalance(
        @RequestParam(name = "min", defaultValue = "0") min: BigDecimal,
        @RequestParam(name = "max", defaultValue = "500000000") max: BigDecimal,
        @RequestParam(name = "page", defaultValue = "0") page: Int,
        @RequestParam(name = "size", defaultValue = "10") size: Int,
    ) = withTimeout(timeOutMillis) {
        ResponseEntity.ok(bankAccountService.findByBalanceAmount(FindByBalanceRequestDto(min, max, PageRequest.of(page, size))))
            .also { log.info("response: $it") }
    }

    @GetMapping(path = ["all/balance/stream"])
    @Operation(
        method = "getAllByBalanceStream",
        summary = "Find all bank account with given amount range returns stream",
        operationId = "getAllByBalanceStream",
        description = "Find all bank accounts for the given balance range"
    )
    fun getAllByBalanceStream(
        @RequestParam(name = "min", defaultValue = "0") min: BigDecimal,
        @RequestParam(name = "max", defaultValue = "500000000") max: BigDecimal,
        @RequestParam(name = "page", defaultValue = "0") page: Int,
        @RequestParam(name = "size", defaultValue = "10") size: Int,
    ): Flow<SuccessBankAccountResponse> {
        return bankAccountService.findAllByBalanceBetween(FindByBalanceRequestDto(min, max, PageRequest.of(page, size)))
            .map { it -> it.toSuccessHttpResponse().also { log.info("response: $it") } }
    }


    companion object {
        private val log = LoggerFactory.getLogger(BankAccountController::class.java)
        private const val timeOutMillis = 5000L
    }
}
Enter fullscreen mode Exit fullscreen mode

For working with gRPC available few UI clients, personally like to use BloomRPC,
another usefully tools is grpcurl and grpcui.

Next step let's deploy our microservice to k8s,
we can build a docker image in different ways, in this example using a simple multistage docker file:

Lens

FROM --platform=linux/arm64 azul/zulu-openjdk-alpine:17 as builder
ARG JAR_FILE=target/KotlinSpringGrpc-0.0.1-SNAPSHOT.jar
COPY ${JAR_FILE} application.jar
RUN java -Djarmode=layertools -jar application.jar extract

FROM azul/zulu-openjdk-alpine:17
COPY --from=builder dependencies/ ./
COPY --from=builder snapshot-dependencies/ ./
COPY --from=builder spring-boot-loader/ ./
COPY --from=builder application/ ./
ENTRYPOINT ["java", "org.springframework.boot.loader.JarLauncher", "-XX:MaxRAMPercentage=75", "-XX:+UseG1GC"]
Enter fullscreen mode Exit fullscreen mode

For working with k8s like to use Helm, deployment for the microservice is simple and has deployment itself, Service, ConfigMap
and ServiceMonitor.
The last one is required because for monitoring use kube-prometheus-stack helm chart

Microservice helm chart yaml file is:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ .Values.microservice.name }}
  labels:
    app: {{ .Values.microservice.name }}
spec:
  replicas: {{ .Values.microservice.replicas }}
  template:
    metadata:
      name: {{ .Values.microservice.name }}
      labels:
        app: {{ .Values.microservice.name }}
    spec:
      containers:
        - name: {{ .Values.microservice.name }}
          image: {{ .Values.microservice.image }}
          imagePullPolicy: Always
          resources:
            requests:
              memory: {{ .Values.microservice.resources.requests.memory }}
              cpu: {{ .Values.microservice.resources.requests.cpu }}
            limits:
              memory: {{ .Values.microservice.resources.limits.memory }}
              cpu: {{ .Values.microservice.resources.limits.cpu }}
          livenessProbe:
            httpGet:
              port: {{ .Values.microservice.livenessProbe.httpGet.port }}
              path: {{ .Values.microservice.livenessProbe.httpGet.path }}
            initialDelaySeconds: {{ .Values.microservice.livenessProbe.initialDelaySeconds }}
            periodSeconds: {{ .Values.microservice.livenessProbe.periodSeconds }}
          readinessProbe:
            httpGet:
              port: {{ .Values.microservice.readinessProbe.httpGet.port }}
              path: {{ .Values.microservice.readinessProbe.httpGet.path }}
            initialDelaySeconds: {{ .Values.microservice.readinessProbe.initialDelaySeconds }}
            periodSeconds: {{ .Values.microservice.readinessProbe.periodSeconds }}
          ports:
            - containerPort: {{ .Values.microservice.ports.http.containerPort }}
              name: {{ .Values.microservice.ports.http.name }}
            - containerPort: {{ .Values.microservice.ports.grpc.containerPort}}
              name: {{ .Values.microservice.ports.grpc.name }}
          env:
            - name: SPRING_APPLICATION_NAME
              value: microservice_k8s
            - name: JAVA_OPTS
              value: "-XX:+UseG1GC -XX:MaxRAMPercentage=75"
            - name: SERVER_PORT
              valueFrom:
                configMapKeyRef:
                  key: server_port
                  name: {{ .Values.microservice.name }}-config-map
            - name: GRPC_SERVER_PORT
              valueFrom:
                configMapKeyRef:
                  key: grpc_server_port
                  name: {{ .Values.microservice.name }}-config-map
            - name: SPRING_ZIPKIN_BASE_URL
              valueFrom:
                configMapKeyRef:
                  key: zipkin_base_url
                  name: {{ .Values.microservice.name }}-config-map
            - name: SPRING_R2DBC_URL
              valueFrom:
                configMapKeyRef:
                  key: r2dbc_url
                  name: {{ .Values.microservice.name }}-config-map
            - name: SPRING_FLYWAY_URL
              valueFrom:
                configMapKeyRef:
                  key: flyway_url
                  name: {{ .Values.microservice.name }}-config-map
      restartPolicy: Always
      terminationGracePeriodSeconds: {{ .Values.microservice.terminationGracePeriodSeconds }}
  selector:
    matchLabels:
      app: {{ .Values.microservice.name }}

---

apiVersion: v1
kind: Service
metadata:
  name: {{ .Values.microservice.name }}-service
  labels:
    app: {{ .Values.microservice.name }}
spec:
  selector:
    app: {{ .Values.microservice.name }}
  ports:
    - port: {{ .Values.microservice.service.httpPort }}
      name: http
      protocol: TCP
      targetPort: http
    - port: {{ .Values.microservice.service.grpcPort }}
      name: grpc
      protocol: TCP
      targetPort: grpc
  type: ClusterIP

---

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  labels:
    release: monitoring
  name: {{ .Values.microservice.name }}-service-monitor
  namespace: default
spec:
  selector:
    matchLabels:
      app: {{ .Values.microservice.name }}
  endpoints:
    - interval: 10s
      port: http
      path: /actuator/prometheus
  namespaceSelector:
    matchNames:
      - default

---

apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ .Values.microservice.name }}-config-map
data:
  server_port: "8080"
  grpc_server_port: "8000"
  zipkin_base_url: zipkin:9411
  r2dbc_url: "r2dbc:postgresql://postgres:5432/bank_accounts"
  flyway_url: "jdbc:postgresql://postgres:5432/bank_accounts"
Enter fullscreen mode Exit fullscreen mode

and Values.yaml file:

microservice:
  name: kotlin-spring-microservice
  image: alexanderbryksin/kotlin_spring_grpc_microservice:latest
  replicas: 1
  livenessProbe:
    httpGet:
      port: 8080
      path: /actuator/health/liveness
    initialDelaySeconds: 60
    periodSeconds: 5
  readinessProbe:
    httpGet:
      port: 8080
      path: /actuator/health/readiness
    initialDelaySeconds: 60
    periodSeconds: 5
  ports:
    http:
      name: http
      containerPort: 8080
    grpc:
      name: grpc
      containerPort: 8000
  terminationGracePeriodSeconds: 20
  service:
    httpPort: 8080
    grpcPort: 8000
  resources:
    requests:
      memory: '6000Mi'
      cpu: "3000m"
    limits:
      memory: '6000Mi'
      cpu: "3000m"
Enter fullscreen mode Exit fullscreen mode

Lens

As UI tool for working with k8s, personally like to use Lens.

More details and source code of the full project you can find GitHub repository here,
of course always in real-world projects, business logic and infrastructure code is much more complicated, and we have to implement many more necessary features.
I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)

Top comments (0)