Based on the meeting notes and code review, here are the changes needed in MerchantProcessorService.java:
Required Changes
- Add New Kafka Listeners for Visa Biweekly Fraud Topic You need to add TWO new listeners: A. Main Fraud Listener (similar to intake/BIS listeners)
@PerfLog(message = "listenVisaBiweeklyFraud")
@KafkaListener(
topics = "${kafka.topic.consumer.visabiweeklyCFFraud}",
id = "visaBiweeklyFraudListener",
autoStartup = "${kafka.visabiweekly.fraud.listener.enable}",
groupId = "#{'${kafka.consumerGroup.visabiweeklyFraud}'}",
containerFactory = "kafkaListenerContainerFactory_visabiweekly_fraud"
)
void listenVisaBiweeklyFraudRequest(
final ConsumerRecord consumerRecord, // Note: String value for raw string
@Header(HEADER_TRACEABILITY_ID) String traceabilityId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
processVisaBiweeklyFraudRequest(consumerRecord, "listenVisaBiweeklyFraud");
}
B. Separate Retry Listener for Visa Biweekly
@PerfLog(message = "listenVisaBiweeklyRetryRequest")
@KafkaListener(
topics = "${kafka.topic.retry.visabiweekly}",
id = "visaBiweeklyRetryListener",
autoStartup = "${kafka.visabiweekly.retry.listener.enable}",
groupId = "#{'${kafka.retry.consumerGroup.visabiweekly}'}",
containerFactory = "retryKafkaListenerContainerFactory_visabiweekly"
)
void listenVisaBiweeklyRetryRequest(
ConsumerRecord inputRecord,
@Header(HEADER_TRACEABILITY_ID) String traceabilityId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment acknowledgment) {
processVisaBiweeklyRetry(inputRecord, acknowledgment);
}
- Add Processing Method for Visa Biweekly Fraud
private void processVisaBiweeklyFraudRequest(
@NotNull final ConsumerRecord inputRecord,
String transactionType) {
TransactionDecision transactionDecision = null;
final var rawFraudString = inputRecord.value(); // Raw string from debatcher
var recordHeaders = inputRecord.headers();
try {
setMDCHeadersValues(
new String(CommonUtils.getHeaderVal(recordHeaders, HEADER_TRACEABILITY_ID), StandardCharsets.UTF_8),
"N"
);
getTimeElapsed(recordHeaders);
log.info(transactionType + TOPIC_POLL_MSG, inputRecord.topic(), inputRecord.partition());
// Parse raw string, do manual padding/validations
var merchantRequest = new MerchantProcessRequest(rawFraudString);
visaBiweeklyTransformer.performStringParsing(merchantRequest); // New transformer
visaBiweeklyTransformer.performCommonTransformation(merchantRequest);
transactionDecision = merchantRequest.getTransactionDecision();
/* Step 1: Get decision from FICO */
var ficoResponse = ficoDecisionManager.getFicoDecision(transactionDecision);
/* Step 2: NO DSAP publish - only handle retry on failure */
log.info("Visa Biweekly Fraud processed successfully - no DSAP publish");
} catch (ApiException e) {
log.error(MERCHANT_ERROR_MSG, getStatusCode(e), e.getStatus());
var isRetryRequest = containsAny(
getServerStatusCode(e),
SERVICE_UNAVAILABLE_ERROR_CODE,
TOKEN_GENERATION_FAILURE_ERROR_CODE,
INVALID_TOKEN_ERROR_CODE
);
if (isRetryRequest) {
log.info(PUBLISH_RETRY_START_MSG);
// Publish to VISA BIWEEKLY retry topic (NOT standard retry)
kafkaService.publishVisaBiweeklyMessageToRetry(transactionDecision, recordHeaders);
}
} catch (Exception e) {
log.error("VisaBiweekly Fraud listen: Exception: {}: Error Details: {}",
getExceptionName(e), ExceptionUtils.getStackTrace(e));
}
}
- Add Separate Retry Processing Method
private void processVisaBiweeklyRetry(
ConsumerRecord inputRecord,
Acknowledgment acknowledgment) {
setMDCHeadersValues(/* traceability */, "Y");
var transactionDecision = inputRecord.value();
var recordHeaders = inputRecord.headers();
try {
// IMPORTANT: Reset external record ID before retry
resetExternalRecordId(transactionDecision);
TransactionDecision ficoResponse = ficoRetryServiceInvoker.retryDecision(transactionDecision);
verifyFicoDecisionError(inputRecord.value());
if (Optional.ofNullable(ficoResponse).isPresent()) {
acknowledgment.acknowledge();
}
// NO DSAP publish for Visa Biweekly
} catch (RetryableFailureException ex) {
// Handle retry logic
} catch (ApiException e) {
acknowledgment.acknowledge();
log.error("VisaBiweekly Retry: ApiException: {}", getStatusCode(e));
}
}
- Modify TransactionDecision Model The TransactionDecision class needs to accept a TransactionDisposition field:
// In TransactionDecision.java or wherever the model is defined
private TransactionDisposition transactionDisposition;
- Add Helper Method to Reset External Record ID
private void resetExternalRecordId(TransactionDecision transactionDecision) {
if (transactionDecision != null && transactionDecision.getHeader() != null) {
// Reset or regenerate external record ID for retry
transactionDecision.getHeader().setExternalRecordId(generateNewRecordId());
}
}
- Key Differences from Existing Listeners ∙ Input format: String (not FraudCheckEventRequest or BusinessInformation) ∙ String parsing: Manual validation, padding before transformation ∙ NO DSAP publish: Skip publishSuccessMessageToDsap() entirely ∙ Separate retry topic: Not the standard retry queue ∙ Reset external record ID on retry ∙ TransactionDisposition handling: Ensure retry logic handles this new field properly
Summary of New Components Needed
1. ✅ New Kafka listener for Visa Biweekly Fraud topic
2. ✅ Separate retry listener for Visa Biweekly
3. ✅ String parsing/validation logic (in new transformer)
4. ✅ Modified retry logic (reset external record ID)
5. ✅ Updated TransactionDecision model
6. ✅ No DSAP publishing for this flow
Would you like me to create the complete transformer class for string parsing as well?
Top comments (0)