Apigee Logger Shared Flow - Implementation Guide
Overview
This comprehensive logging solution provides structured logging for Apigee API proxies with support for both ELK (Elasticsearch, Logstash, Kibana) and Apache Spark. The solution includes advanced data masking capabilities to protect sensitive information.
Features
Core Functionality
- Dual Destination Logging: Simultaneously logs to ELK and Spark platforms
- Request/Response Logging: Captures complete API transaction details
- Sensitive Data Masking: Automatically masks passwords, tokens, keys, PII data
- Performance Metrics: Tracks latency and processing times
- Error Handling: Comprehensive error capture and logging
- Asynchronous Processing: Non-blocking log transmission
Data Masking Capabilities
-
Email Masking:
john.doe@example.com
→jo***@example.com
-
Phone Masking:
555-123-4567
→555***4567
-
Credit Card Masking:
1234-5678-9012-3456
→1234****3456
-
SSN Masking:
123-45-6789
→XXX-XX-6789
- Token/Key Masking: Full masking for security tokens and API keys
- Password Masking: Complete masking of password fields
Installation Steps
1. Deploy Shared Flow Components
Create the following directory structure in your Apigee project:
shared-flows/
└── logger-shared-flow/
├── sharedflowbundle/
│ ├── logger-shared-flow.xml
│ ├── policies/
│ │ ├── Extract-Request-Data.xml
│ │ ├── Mask-Sensitive-Request-Data.xml
│ │ ├── Build-Request-Log-Payload.xml
│ │ ├── Send-Request-Log-ELK.xml
│ │ ├── Send-Request-Log-Spark.xml
│ │ ├── Extract-Response-Data.xml
│ │ ├── Mask-Sensitive-Response-Data.xml
│ │ ├── Build-Response-Log-Payload.xml
│ │ ├── Send-Response-Log-ELK.xml
│ │ └── Send-Response-Log-Spark.xml
│ └── resources/
│ └── jsc/
│ ├── mask-sensitive-data.js
│ └── mask-sensitive-response-data.js
2. Configure Environment Variables
Create KVM (Key-Value Map) entries for your environment:
# Using Apigee Management API
curl -X POST \
https://api.enterprise.apigee.com/v1/organizations/{org}/environments/{env}/keyvaluemaps \
-H "Authorization: Bearer $ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "logger-config",
"encrypted": false
}'
# Add configuration entries
curl -X POST \
https://api.enterprise.apigee.com/v1/organizations/{org}/environments/{env}/keyvaluemaps/logger-config/entries \
-H "Authorization: Bearer $ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "elk.logging.enabled",
"value": "true"
}'
3. Deploy the Shared Flow
# Deploy using Apigee CLI
apigee sharedflows deploy -o {org} -e {env} -n logger-shared-flow -f ./shared-flows/logger-shared-flow
Configuration
ELK Configuration Parameters
Parameter | Description | Example Value |
---|---|---|
elk.logging.enabled |
Enable/disable ELK logging | true |
elk.endpoint.url |
Elasticsearch endpoint URL | https://elasticsearch.company.com:9200 |
elk.index.name |
Index pattern for logs | apigee-logs-{YYYY.MM.DD} |
elk.auth.token |
Bearer token for authentication | eyJhbGciOiJIUzI1NiIs... |
Spark Configuration Parameters
Parameter | Description | Example Value |
---|---|---|
spark.logging.enabled |
Enable/disable Spark logging | true |
spark.endpoint.url |
Spark cluster endpoint | https://spark.company.com |
spark.topic.name |
Kafka topic for logs | apigee-api-logs |
spark.partition.key |
Partitioning strategy | default |
spark.auth.credentials |
Base64 encoded credentials | dXNlcjpwYXNzd29yZA== |
Usage in API Proxies
Request Logging (PreFlow)
Add this FlowCallout policy in your API proxy's PreFlow Request:
<FlowCallout async="false" continueOnError="true" enabled="true" name="Log-API-Request">
<DisplayName>Log API Request</DisplayName>
<SharedFlowBundle>logger-shared-flow</SharedFlowBundle>
</FlowCallout>
Response Logging (PostFlow)
Add this FlowCallout policy in your API proxy's PostFlow Response:
<FlowCallout async="false" continueOnError="true" enabled="true" name="Log-API-Response">
<DisplayName>Log API Response</DisplayName>
<SharedFlowBundle>logger-shared-flow</SharedFlowBundle>
</FlowCallout>
Complete API Proxy Integration Example
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ProxyEndpoint name="default">
<Description/>
<FaultRules/>
<PreFlow name="PreFlow">
<Request>
<Step>
<Name>Log-API-Request</Name>
<Condition>request.verb != "OPTIONS"</Condition>
</Step>
</Request>
<Response/>
</PreFlow>
<PostFlow name="PostFlow">
<Request/>
<Response>
<Step>
<Name>Log-API-Response</Name>
<Condition>request.verb != "OPTIONS"</Condition>
</Step>
</Response>
</PostFlow>
<Flows/>
<HTTPProxyConnection>
<BasePath>/api/v1/users</BasePath>
<Properties/>
<VirtualHost>secure</VirtualHost>
</HTTPProxyConnection>
<RouteRule name="default">
<TargetEndpoint>default</TargetEndpoint>
</RouteRule>
</ProxyEndpoint>
Log Structure
Request Log Schema
{
"eventType": "api_request",
"timestamp": "2025-01-01T12:00:00.000Z",
"correlationId": "12345678-1234-1234-1234-123456789012",
"environment": "prod",
"organization": "company",
"apiProxy": "users-api",
"request": {
"method": "POST",
"uri": "/api/v1/users",
"headers": {
"authorization": "Be***er token",
"xApiKey": "ak_****5678"
},
"payload": {
"username": "jo***oe",
"email": "jo***@example.com",
"password": "********"
}
},
"client": {
"ip": "192.168.1.100",
"userAgent": "Mozilla/5.0..."
},
"masking": {
"status": "completed",
"timestamp": "2025-01-01T12:00:00.000Z"
}
}
Response Log Schema
{
"eventType": "api_response",
"timestamp": "2025-01-01T12:00:01.500Z",
"correlationId": "12345678-1234-1234-1234-123456789012",
"response": {
"statusCode": 201,
"statusMessage": "Created",
"payload": {
"user_id": "user123",
"access_token": "ey***abc",
"refresh_token": "rt***xyz"
}
},
"performance": {
"totalLatency": 1500,
"targetLatency": 800
},
"error": {
"isError": false
}
}
Security Considerations
Sensitive Data Protection
- All passwords are completely masked
- API keys and tokens show only prefix/suffix
- Email addresses mask the local part
- Phone numbers mask middle digits
- Credit cards show only first 4 and last 4 digits
- SSN shows only last 4 digits
Best Practices
- Regular Token Rotation: Rotate ELK and Spark authentication tokens regularly
- Network Security: Ensure encrypted connections (HTTPS/TLS) to logging endpoints
- Access Control: Implement proper IAM policies for log access
- Data Retention: Configure appropriate log retention policies
- Monitoring: Set up alerts for logging failures
Troubleshooting
Common Issues
- Logging Disabled: Check KVM configuration for enabled flags
- Authentication Failures: Verify bearer tokens and credentials
- Network Timeouts: Adjust timeout settings in ServiceCallout policies
- Masking Errors: Check JavaScript policy execution logs
Debug Mode
Enable debug logging by adding this variable to your KVM:
debug.logging.enabled = true
Performance Monitoring
Monitor these metrics:
- Log transmission latency
- Failed log deliveries
- Masking operation time
- Memory usage during processing
Advanced Configuration
Custom Masking Rules
To add custom masking rules, modify the JavaScript resources:
// Add custom sensitive field
var customSensitiveFields = ['custom_field', 'proprietary_data'];
sensitiveFields = sensitiveFields.concat(customSensitiveFields);
// Add custom masking pattern
maskingPatterns['custom_field'] = function(value) {
// Custom masking logic
return customMaskFunction(value);
};
Environment-Specific Settings
Create separate KVM configurations for different environments:
# Development
logger-config-dev
# Testing
logger-config-test
# Production
logger-config-prod
Integration with Monitoring Tools
The logs can be integrated with various monitoring and alerting tools:
- ELK Alerts: Create Elasticsearch Watcher alerts for error conditions
- Spark Monitoring: Use Spark Streaming for real-time analytics
- Grafana Dashboards: Create visualization dashboards for log metrics
- Splunk Integration: Forward logs to Splunk for enterprise monitoring
This comprehensive logging solution provides enterprise-grade logging capabilities while maintaining security and performance standards.
I'll create a comprehensive Apigee shared flow for logging to ELK and Spark with data masking capabilities. This will include request/response logging and sensitive data protection.
Apigee Logger Shared Flow - Main Flow
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<SharedFlow revision="1" name="logger-shared-flow">
<DisplayName>ELK Spark Logger Shared Flow</DisplayName>
<Description>Comprehensive logging shared flow for ELK and Spark with data masking</Description>
<Step>
<Name>Extract-Request-Data</Name>
<Condition>(request.verb != "OPTIONS")</Condition>
</Step>
<Step>
<Name>Mask-Sensitive-Request-Data</Name>
<Condition>(request.verb != "OPTIONS")</Condition>
</Step>
<Step>
<Name>Build-Request-Log-Payload</Name>
<Condition>(request.verb != "OPTIONS")</Condition>
</Step>
<Step>
<Name>Send-Request-Log-ELK</Name>
<Condition>(request.verb != "OPTIONS") and (elk.logging.enabled = true)</Condition>
</Step>
<Step>
<Name>Send-Request-Log-Spark</Name>
<Condition>(request.verb != "OPTIONS") and (spark.logging.enabled = true)</Condition>
</Step>
<Step>
<Name>Extract-Response-Data</Name>
<Condition>(request.verb != "OPTIONS")</Condition>
</Step>
<Step>
<Name>Mask-Sensitive-Response-Data</Name>
<Condition>(request.verb != "OPTIONS")</Condition>
</Step>
<Step>
<Name>Build-Response-Log-Payload</Name>
<Condition>(request.verb != "OPTIONS")</Condition>
</Step>
<Step>
<Name>Send-Response-Log-ELK</Name>
<Condition>(request.verb != "OPTIONS") and (elk.logging.enabled = true)</Condition>
</Step>
<Step>
<Name>Send-Response-Log-Spark</Name>
<Condition>(request.verb != "OPTIONS") and (spark.logging.enabled = true)</Condition>
</Step>
</SharedFlow>
Extract Request Data Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ExtractVariables async="false" continueOnError="true" enabled="true" name="Extract-Request-Data">
<DisplayName>Extract Request Data</DisplayName>
<Properties/>
<!-- Extract basic request information -->
<IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
<!-- Request Headers -->
<Header name="Content-Type">
<Pattern ignoreCase="true">{request.header.content-type}</Pattern>
</Header>
<Header name="Authorization">
<Pattern ignoreCase="true">{request.header.authorization}</Pattern>
</Header>
<Header name="User-Agent">
<Pattern ignoreCase="true">{request.header.user-agent}</Pattern>
</Header>
<Header name="X-API-Key">
<Pattern ignoreCase="true">{request.header.x-api-key}</Pattern>
</Header>
<!-- Query Parameters -->
<QueryParam name="username">
<Pattern ignoreCase="true">{request.queryparam.username}</Pattern>
</QueryParam>
<QueryParam name="password">
<Pattern ignoreCase="true">{request.queryparam.password}</Pattern>
</QueryParam>
<QueryParam name="token">
<Pattern ignoreCase="true">{request.queryparam.token}</Pattern>
</QueryParam>
<QueryParam name="api_key">
<Pattern ignoreCase="true">{request.queryparam.api_key}</Pattern>
</QueryParam>
<!-- URI Path Parameters -->
<URIPath>
<Pattern ignoreCase="false">/v{version}/*/users/{user_id}/*</Pattern>
</URIPath>
<!-- JSON Path Extraction for Request Body -->
<JSONPayload>
<Variable name="req.username" type="string">
<JSONPath>$.username</JSONPath>
</Variable>
<Variable name="req.password" type="string">
<JSONPath>$.password</JSONPath>
</Variable>
<Variable name="req.email" type="string">
<JSONPath>$.email</JSONPath>
</Variable>
<Variable name="req.phone" type="string">
<JSONPath>$.phone</JSONPath>
</Variable>
<Variable name="req.ssn" type="string">
<JSONPath>$.ssn</JSONPath>
</Variable>
<Variable name="req.credit_card" type="string">
<JSONPath>$.credit_card</JSONPath>
</Variable>
<Variable name="req.api_key" type="string">
<JSONPath>$.api_key</JSONPath>
</Variable>
<Variable name="req.access_token" type="string">
<JSONPath>$.access_token</JSONPath>
</Variable>
<Variable name="req.refresh_token" type="string">
<JSONPath>$.refresh_token</JSONPath>
</Variable>
<Variable name="req.client_secret" type="string">
<JSONPath>$.client_secret</JSONPath>
</Variable>
</JSONPayload>
<Source>request</Source>
</ExtractVariables>
Mask Sensitive Request Data Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Javascript async="false" continueOnError="true" enabled="true" name="Mask-Sensitive-Request-Data">
<DisplayName>Mask Sensitive Request Data</DisplayName>
<Properties/>
<ResourceURL>jsc://mask-sensitive-data.js</ResourceURL>
</Javascript>
var sensitiveFields = [
'password', 'pwd', 'pass', 'secret', 'token', 'key', 'auth', 'authorization',
'username', 'user', 'email', 'phone', 'ssn', 'social', 'credit_card', 'cc',
'access_token', 'refresh_token', 'client_secret', 'api_key', 'x-api-key'
];
var maskingPatterns = {
'email': function(value) {
if (!value) return value;
var parts = value.split('@');
if (parts.length === 2) {
var localPart = parts[0];
var maskedLocal = localPart.length > 2 ?
localPart.substring(0, 2) + '*'.repeat(localPart.length - 2) :
'*'.repeat(localPart.length);
return maskedLocal + '@' + parts[1];
}
return maskDefault(value);
},
'phone': function(value) {
if (!value) return value;
var cleaned = value.replace(/\D/g, '');
if (cleaned.length >= 10) {
return cleaned.substring(0, 3) + '*'.repeat(cleaned.length - 6) + cleaned.substring(cleaned.length - 3);
}
return maskDefault(value);
},
'credit_card': function(value) {
if (!value) return value;
var cleaned = value.replace(/\D/g, '');
if (cleaned.length >= 12) {
return cleaned.substring(0, 4) + '*'.repeat(cleaned.length - 8) + cleaned.substring(cleaned.length - 4);
}
return maskDefault(value);
},
'ssn': function(value) {
if (!value) return value;
var cleaned = value.replace(/\D/g, '');
if (cleaned.length === 9) {
return 'XXX-XX-' + cleaned.substring(5);
}
return maskDefault(value);
}
};
function maskDefault(value) {
if (!value) return value;
if (value.length <= 4) {
return '*'.repeat(value.length);
}
return value.substring(0, 2) + '*'.repeat(value.length - 4) + value.substring(value.length - 2);
}
function maskSensitiveValue(key, value) {
if (!value) return value;
var lowerKey = key.toLowerCase();
// Check for specific masking patterns
for (var pattern in maskingPatterns) {
if (lowerKey.includes(pattern)) {
return maskingPatterns[pattern](value);
}
}
// Check if it's a sensitive field
for (var i = 0; i < sensitiveFields.length; i++) {
if (lowerKey.includes(sensitiveFields[i])) {
return maskDefault(value);
}
}
return value;
}
function maskHeaders() {
var headers = ['authorization', 'x-api-key', 'x-auth-token'];
for (var i = 0; i < headers.length; i++) {
var headerName = headers[i];
var headerValue = context.getVariable('request.header.' + headerName);
if (headerValue) {
var maskedValue = maskSensitiveValue(headerName, headerValue);
context.setVariable('masked.request.header.' + headerName, maskedValue);
}
}
}
function maskQueryParams() {
var queryParams = ['username', 'password', 'token', 'api_key'];
for (var i = 0; i < queryParams.length; i++) {
var paramName = queryParams[i];
var paramValue = context.getVariable('request.queryparam.' + paramName);
if (paramValue) {
var maskedValue = maskSensitiveValue(paramName, paramValue);
context.setVariable('masked.request.queryparam.' + paramName, maskedValue);
}
}
}
function maskJsonFields() {
var jsonFields = [
'req.username', 'req.password', 'req.email', 'req.phone', 'req.ssn',
'req.credit_card', 'req.api_key', 'req.access_token', 'req.refresh_token',
'req.client_secret'
];
for (var i = 0; i < jsonFields.length; i++) {
var fieldName = jsonFields[i];
var fieldValue = context.getVariable(fieldName);
if (fieldValue) {
var key = fieldName.replace('req.', '');
var maskedValue = maskSensitiveValue(key, fieldValue);
context.setVariable('masked.' + fieldName, maskedValue);
}
}
}
function maskRequestPayload() {
try {
var requestPayload = context.getVariable('request.content');
if (requestPayload && context.getVariable('request.header.content-type') &&
context.getVariable('request.header.content-type').indexOf('application/json') > -1) {
var payload = JSON.parse(requestPayload);
var maskedPayload = maskObjectRecursively(payload);
context.setVariable('masked.request.payload', JSON.stringify(maskedPayload, null, 2));
} else {
context.setVariable('masked.request.payload', requestPayload || '');
}
} catch (e) {
context.setVariable('masked.request.payload', 'Error parsing request payload: ' + e.message);
}
}
function maskObjectRecursively(obj) {
if (obj === null || obj === undefined) return obj;
if (typeof obj !== 'object') return obj;
if (Array.isArray(obj)) {
return obj.map(function(item) {
return maskObjectRecursively(item);
});
}
var maskedObj = {};
for (var key in obj) {
if (obj.hasOwnProperty(key)) {
var value = obj[key];
if (typeof value === 'object' && value !== null) {
maskedObj[key] = maskObjectRecursively(value);
} else if (typeof value === 'string') {
maskedObj[key] = maskSensitiveValue(key, value);
} else {
maskedObj[key] = value;
}
}
}
return maskedObj;
}
// Execute masking
try {
maskHeaders();
maskQueryParams();
maskJsonFields();
maskRequestPayload();
context.setVariable('masking.request.status', 'completed');
context.setVariable('masking.request.timestamp', new Date().toISOString());
} catch (error) {
context.setVariable('masking.request.status', 'error');
context.setVariable('masking.request.error', error.message);
}
Build Request Log Payload Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<AssignMessage async="false" continueOnError="true" enabled="true" name="Build-Request-Log-Payload">
<DisplayName>Build Request Log Payload</DisplayName>
<Properties/>
<AssignTo createNew="true" transport="http" type="request">logRequest</AssignTo>
<Set>
<Headers>
<Header name="Content-Type">application/json</Header>
</Headers>
</Set>
<AssignVariable>
<Name>log.event.type</Name>
<Value>api_request</Value>
</AssignVariable>
<AssignVariable>
<Name>log.timestamp</Name>
<Value>{system.timestamp}</Value>
</AssignVariable>
<AssignVariable>
<Name>log.correlation.id</Name>
<Value>{messageid}</Value>
</AssignVariable>
<Payload contentType="application/json" variablePrefix="@" variableSuffix="#">
{
"eventType": "api_request",
"timestamp": "@system.timestamp#",
"correlationId": "@messageid#",
"environment": "@environment.name#",
"organization": "@organization.name#",
"apiProxy": "@apiproxy.name#",
"apiProxyRevision": "@apiproxy.revision#",
"request": {
"method": "@request.verb#",
"uri": "@request.uri#",
"path": "@request.path#",
"queryString": "@request.querystring#",
"protocol": "@request.scheme#",
"headers": {
"contentType": "@masked.request.header.content-type#",
"userAgent": "@request.header.user-agent#",
"authorization": "@masked.request.header.authorization#",
"xApiKey": "@masked.request.header.x-api-key#",
"host": "@request.header.host#",
"accept": "@request.header.accept#",
"acceptEncoding": "@request.header.accept-encoding#"
},
"queryParams": {
"username": "@masked.request.queryparam.username#",
"password": "@masked.request.queryparam.password#",
"token": "@masked.request.queryparam.token#",
"apiKey": "@masked.request.queryparam.api_key#"
},
"pathParams": {
"version": "@version#",
"userId": "@user_id#"
},
"payload": @masked.request.payload#,
"size": "@request.header.content-length#"
},
"client": {
"ip": "@client.ip#",
"host": "@client.host#",
"port": "@client.port#",
"userAgent": "@request.header.user-agent#"
},
"security": {
"authenticated": "@is.authenticated#",
"userId": "@user.id#",
"clientId": "@client_id#",
"scopes": "@oauth.scope#"
},
"flow": {
"name": "logger-shared-flow",
"step": "request-logging",
"executionTime": "@flow.execution.time#"
},
"masking": {
"status": "@masking.request.status#",
"timestamp": "@masking.request.timestamp#",
"fieldsCount": "@masking.fields.count#"
},
"metadata": {
"source": "apigee-gateway",
"version": "1.0",
"logLevel": "INFO",
"component": "request-logger"
}
}
</Payload>
</AssignMessage>
Send Request Log to ELK Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ServiceCallout async="true" continueOnError="true" enabled="true" name="Send-Request-Log-ELK">
<DisplayName>Send Request Log to ELK</DisplayName>
<Properties/>
<Request clearPayload="false" variable="logRequest">
<Set>
<Headers>
<Header name="Content-Type">application/json</Header>
<Header name="Authorization">Bearer {elk.auth.token}</Header>
<Header name="X-Correlation-ID">{messageid}</Header>
</Headers>
</Set>
</Request>
<Response>elkRequestResponse</Response>
<HTTPTargetConnection>
<Properties>
<Property name="keepalive">true</Property>
<Property name="connect.timeout.millis">5000</Property>
<Property name="io.timeout.millis">30000</Property>
</Properties>
<URL>{elk.endpoint.url}/{elk.index.name}/_doc</URL>
</HTTPTargetConnection>
</ServiceCallout>
Send Request Log to Spark Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ServiceCallout async="true" continueOnError="true" enabled="true" name="Send-Request-Log-Spark">
<DisplayName>Send Request Log to Spark</DisplayName>
<Properties/>
<Request clearPayload="false" variable="logRequest">
<Set>
<Headers>
<Header name="Content-Type">application/json</Header>
<Header name="Authorization">Basic {spark.auth.credentials}</Header>
<Header name="X-Correlation-ID">{messageid}</Header>
<Header name="X-Spark-Topic">{spark.topic.name}</Header>
</Headers>
</Set>
</Request>
<Response>sparkRequestResponse</Response>
<HTTPTargetConnection>
<Properties>
<Property name="keepalive">true</Property>
<Property name="connect.timeout.millis">3000</Property>
<Property name="io.timeout.millis">15000</Property>
</Properties>
<URL>{spark.endpoint.url}/api/v1/logs/stream</URL>
</HTTPTargetConnection>
</ServiceCallout>
Extract Response Data Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ExtractVariables async="false" continueOnError="true" enabled="true" name="Extract-Response-Data">
<DisplayName>Extract Response Data</DisplayName>
<Properties/>
<IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>
<!-- Response Headers -->
<Header name="Content-Type">
<Pattern ignoreCase="true">{response.header.content-type}</Pattern>
</Header>
<Header name="Authorization">
<Pattern ignoreCase="true">{response.header.authorization}</Pattern>
</Header>
<Header name="Set-Cookie">
<Pattern ignoreCase="true">{response.header.set-cookie}</Pattern>
</Header>
<Header name="X-Auth-Token">
<Pattern ignoreCase="true">{response.header.x-auth-token}</Pattern>
</Header>
<!-- JSON Path Extraction for Response Body -->
<JSONPayload>
<Variable name="resp.access_token" type="string">
<JSONPath>$.access_token</JSONPath>
</Variable>
<Variable name="resp.refresh_token" type="string">
<JSONPath>$.refresh_token</JSONPath>
</Variable>
<Variable name="resp.api_key" type="string">
<JSONPath>$.api_key</JSONPath>
</Variable>
<Variable name="resp.client_secret" type="string">
<JSONPath>$.client_secret</JSONPath>
</Variable>
<Variable name="resp.password" type="string">
<JSONPath>$.password</JSONPath>
</Variable>
<Variable name="resp.username" type="string">
<JSONPath>$.username</JSONPath>
</Variable>
<Variable name="resp.email" type="string">
<JSONPath>$.email</JSONPath>
</Variable>
<Variable name="resp.phone" type="string">
<JSONPath>$.phone</JSONPath>
</Variable>
<Variable name="resp.ssn" type="string">
<JSONPath>$.ssn</JSONPath>
</Variable>
<Variable name="resp.credit_card" type="string">
<JSONPath>$.credit_card</JSONPath>
</Variable>
<Variable name="resp.session_id" type="string">
<JSONPath>$.session_id</JSONPath>
</Variable>
<Variable name="resp.user_id" type="string">
<JSONPath>$.user_id</JSONPath>
</Variable>
<Variable name="resp.status" type="string">
<JSONPath>$.status</JSONPath>
</Variable>
<Variable name="resp.message" type="string">
<JSONPath>$.message</JSONPath>
</Variable>
<Variable name="resp.error_code" type="string">
<JSONPath>$.error.code</JSONPath>
</Variable>
<Variable name="resp.error_message" type="string">
<JSONPath>$.error.message</JSONPath>
</Variable>
<Variable name="resp.expires_in" type="integer">
<JSONPath>$.expires_in</JSONPath>
</Variable>
<Variable name="resp.token_type" type="string">
<JSONPath>$.token_type</JSONPath>
</Variable>
</JSONPayload>
<Source>response</Source>
</ExtractVariables>
Mask Sensitive Response Data Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Javascript async="false" continueOnError="true" enabled="true" name="Mask-Sensitive-Response-Data">
<DisplayName>Mask Sensitive Response Data</DisplayName>
<Properties/>
<ResourceURL>jsc://mask-sensitive-response-data.js</ResourceURL>
</Javascript>
// Reuse the masking functions from request masking
var sensitiveFields = [
'password', 'pwd', 'pass', 'secret', 'token', 'key', 'auth', 'authorization',
'username', 'user', 'email', 'phone', 'ssn', 'social', 'credit_card', 'cc',
'access_token', 'refresh_token', 'client_secret', 'api_key', 'x-api-key',
'session_id', 'sessionid', 'cookie'
];
var maskingPatterns = {
'email': function(value) {
if (!value) return value;
var parts = value.split('@');
if (parts.length === 2) {
var localPart = parts[0];
var maskedLocal = localPart.length > 2 ?
localPart.substring(0, 2) + '*'.repeat(localPart.length - 2) :
'*'.repeat(localPart.length);
return maskedLocal + '@' + parts[1];
}
return maskDefault(value);
},
'phone': function(value) {
if (!value) return value;
var cleaned = value.replace(/\D/g, '');
if (cleaned.length >= 10) {
return cleaned.substring(0, 3) + '*'.repeat(cleaned.length - 6) + cleaned.substring(cleaned.length - 3);
}
return maskDefault(value);
},
'credit_card': function(value) {
if (!value) return value;
var cleaned = value.replace(/\D/g, '');
if (cleaned.length >= 12) {
return cleaned.substring(0, 4) + '*'.repeat(cleaned.length - 8) + cleaned.substring(cleaned.length - 4);
}
return maskDefault(value);
},
'ssn': function(value) {
if (!value) return value;
var cleaned = value.replace(/\D/g, '');
if (cleaned.length === 9) {
return 'XXX-XX-' + cleaned.substring(5);
}
return maskDefault(value);
},
'session': function(value) {
if (!value) return value;
if (value.length > 8) {
return value.substring(0, 4) + '*'.repeat(value.length - 8) + value.substring(value.length - 4);
}
return maskDefault(value);
}
};
function maskDefault(value) {
if (!value) return value;
if (value.length <= 4) {
return '*'.repeat(value.length);
}
return value.substring(0, 2) + '*'.repeat(value.length - 4) + value.substring(value.length - 2);
}
function maskSensitiveValue(key, value) {
if (!value) return value;
var lowerKey = key.toLowerCase();
// Check for specific masking patterns
for (var pattern in maskingPatterns) {
if (lowerKey.includes(pattern)) {
return maskingPatterns[pattern](value);
}
}
// Check if it's a sensitive field
for (var i = 0; i < sensitiveFields.length; i++) {
if (lowerKey.includes(sensitiveFields[i])) {
return maskDefault(value);
}
}
return value;
}
function maskResponseHeaders() {
var headers = ['authorization', 'set-cookie', 'x-auth-token'];
for (var i = 0; i < headers.length; i++) {
var headerName = headers[i];
var headerValue = context.getVariable('response.header.' + headerName);
if (headerValue) {
var maskedValue = maskSensitiveValue(headerName, headerValue);
context.setVariable('masked.response.header.' + headerName, maskedValue);
}
}
}
function maskResponseJsonFields() {
var jsonFields = [
'resp.access_token', 'resp.refresh_token', 'resp.api_key', 'resp.client_secret',
'resp.password', 'resp.username', 'resp.email', 'resp.phone', 'resp.ssn',
'resp.credit_card', 'resp.session_id'
];
for (var i = 0; i < jsonFields.length; i++) {
var fieldName = jsonFields[i];
var fieldValue = context.getVariable(fieldName);
if (fieldValue) {
var key = fieldName.replace('resp.', '');
var maskedValue = maskSensitiveValue(key, fieldValue);
context.setVariable('masked.' + fieldName, maskedValue);
}
}
}
function maskResponsePayload() {
try {
var responsePayload = context.getVariable('response.content');
if (responsePayload && context.getVariable('response.header.content-type') &&
context.getVariable('response.header.content-type').indexOf('application/json') > -1) {
var payload = JSON.parse(responsePayload);
var maskedPayload = maskObjectRecursively(payload);
context.setVariable('masked.response.payload', JSON.stringify(maskedPayload, null, 2));
} else {
context.setVariable('masked.response.payload', responsePayload || '');
}
} catch (e) {
context.setVariable('masked.response.payload', 'Error parsing response payload: ' + e.message);
}
}
function maskObjectRecursively(obj) {
if (obj === null || obj === undefined) return obj;
if (typeof obj !== 'object') return obj;
if (Array.isArray(obj)) {
return obj.map(function(item) {
return maskObjectRecursively(item);
});
}
var maskedObj = {};
for (var key in obj) {
if (obj.hasOwnProperty(key)) {
var value = obj[key];
if (typeof value === 'object' && value !== null) {
maskedObj[key] = maskObjectRecursively(value);
} else if (typeof value === 'string') {
maskedObj[key] = maskSensitiveValue(key, value);
} else {
maskedObj[key] = value;
}
}
}
return maskedObj;
}
// Execute masking
try {
maskResponseHeaders();
maskResponseJsonFields();
maskResponsePayload();
context.setVariable('masking.response.status', 'completed');
context.setVariable('masking.response.timestamp', new Date().toISOString());
} catch (error) {
context.setVariable('masking.response.status', 'error');
context.setVariable('masking.response.error', error.message);
}
Build Response Log Payload Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<AssignMessage async="false" continueOnError="true" enabled="true" name="Build-Response-Log-Payload">
<DisplayName>Build Response Log Payload</DisplayName>
<Properties/>
<AssignTo createNew="true" transport="http" type="request">logResponse</AssignTo>
<Set>
<Headers>
<Header name="Content-Type">application/json</Header>
</Headers>
</Set>
<AssignVariable>
<n>log.response.event.type</n>
<Value>api_response</Value>
</AssignVariable>
<AssignVariable>
<n>log.response.timestamp</n>
<Value>{system.timestamp}</Value>
</AssignVariable>
<AssignVariable>
<n>log.response.latency</n>
<Value>{client.received.end.timestamp - client.received.start.timestamp}</Value>
</AssignVariable>
<Payload contentType="application/json" variablePrefix="@" variableSuffix="#">
{
"eventType": "api_response",
"timestamp": "@system.timestamp#",
"correlationId": "@messageid#",
"environment": "@environment.name#",
"organization": "@organization.name#",
"apiProxy": "@apiproxy.name#",
"apiProxyRevision": "@apiproxy.revision#",
"response": {
"statusCode": @response.status.code#,
"statusMessage": "@response.reason.phrase#",
"headers": {
"contentType": "@response.header.content-type#",
"contentLength": "@response.header.content-length#",
"server": "@response.header.server#",
"cacheControl": "@response.header.cache-control#",
"authorization": "@masked.response.header.authorization#",
"setCookie": "@masked.response.header.set-cookie#",
"xAuthToken": "@masked.response.header.x-auth-token#"
},
"payload": @masked.response.payload#,
"size": "@response.header.content-length#"
},
"performance": {
"totalLatency": "@client.received.end.timestamp - client.received.start.timestamp#",
"targetLatency": "@target.received.end.timestamp - target.sent.start.timestamp#",
"requestProcessingTime": "@request.processing.time#",
"responseProcessingTime": "@response.processing.time#"
},
"backend": {
"url": "@target.url#",
"host": "@target.host#",
"port": "@target.port#",
"ssl": "@target.ssl#",
"statusCode": "@target.response.status.code#"
},
"client": {
"ip": "@client.ip#",
"host": "@client.host#",
"port": "@client.port#"
},
"security": {
"authenticated": "@is.authenticated#",
"userId": "@user.id#",
"clientId": "@client_id#",
"scopes": "@oauth.scope#"
},
"flow": {
"name": "logger-shared-flow",
"step": "response-logging",
"executionTime": "@flow.execution.time#"
},
"masking": {
"status": "@masking.response.status#",
"timestamp": "@masking.response.timestamp#",
"fieldsCount": "@masking.fields.count#"
},
"error": {
"isError": @is.error#,
"code": "@error.code#",
"message": "@error.message#",
"source": "@error.source#"
},
"analytics": {
"apiProduct": "@apiproduct.name#",
"developer": "@developer.email#",
"app": "@client_id#"
},
"metadata": {
"source": "apigee-gateway",
"version": "1.0",
"logLevel": "INFO",
"component": "response-logger"
}
}
</Payload>
</AssignMessage>
Send Response Log to ELK Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ServiceCallout async="true" continueOnError="true" enabled="true" name="Send-Response-Log-ELK">
<DisplayName>Send Response Log to ELK</DisplayName>
<Properties/>
<Request clearPayload="false" variable="logResponse">
<Set>
<Headers>
<Header name="Content-Type">application/json</Header>
<Header name="Authorization">Bearer {elk.auth.token}</Header>
<Header name="X-Correlation-ID">{messageid}</Header>
<Header name="X-ELK-Index">{elk.index.name}</Header>
</Headers>
</Set>
</Request>
<Response>elkResponseResponse</Response>
<HTTPTargetConnection>
<Properties>
<Property name="keepalive">true</Property>
<Property name="connect.timeout.millis">5000</Property>
<Property name="io.timeout.millis">30000</Property>
<Property name="retry.count">2</Property>
<Property name="retry.interval">1000</Property>
</Properties>
<URL>{elk.endpoint.url}/{elk.index.name}/_doc</URL>
</HTTPTargetConnection>
</ServiceCallout>
Send Response Log to Spark Policy
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ServiceCallout async="true" continueOnError="true" enabled="true" name="Send-Response-Log-Spark">
<DisplayName>Send Response Log to Spark</DisplayName>
<Properties/>
<Request clearPayload="false" variable="logResponse">
<Set>
<Headers>
<Header name="Content-Type">application/json</Header>
<Header name="Authorization">Basic {spark.auth.credentials}</Header>
<Header name="X-Correlation-ID">{messageid}</Header>
<Header name="X-Spark-Topic">{spark.topic.name}</Header>
<Header name="X-Spark-Partition">{spark.partition.key}</Header>
</Headers>
</Set>
</Request>
<Response>sparkResponseResponse</Response>
<HTTPTargetConnection>
<Properties>
<Property name="keepalive">true</Property>
<Property name="connect.timeout.millis">3000</Property>
<Property name="io.timeout.millis">15000</Property>
<Property name="retry.count">1</Property>
<Property name="retry.interval">500</Property>
</Properties>
<URL>{spark.endpoint.url}/api/v1/logs/stream</URL>
</HTTPTargetConnection>
</ServiceCallout>
Configuration Files and Environment Variables
<!-- KVM Configuration for Environment Variables -->
<!-- Create KVM entries for the following variables: -->
<!-- ELK Configuration -->
<KVM name="logger-config">
<Entry name="elk.logging.enabled">true</Entry>
<Entry name="elk.endpoint.url">https://your-elasticsearch-cluster.com:9200</Entry>
<Entry name="elk.index.name">apigee-logs-{YYYY.MM.DD}</Entry>
<Entry name="elk.auth.token">your-elk-bearer-token</Entry>
<!-- Spark Configuration -->
<Entry name="spark.logging.enabled">true</Entry>
<Entry name="spark.endpoint.url">https://your-spark-cluster.com</Entry>
<Entry name="spark.topic.name">apigee-api-logs</Entry>
<Entry name="spark.partition.key">default</Entry>
<Entry name="spark.auth.credentials">base64-encoded-credentials</Entry>
</KVM>
<!-- Shared Flow Deployment Descriptor -->
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<SharedFlowBundle revision="1" name="logger-shared-flow">
<ConfigurationVersion majorVersion="4" minorVersion="0"/>
<CreatedAt>2025-01-01T00:00:00.000Z</CreatedAt>
<CreatedBy>api-developer@company.com</CreatedBy>
<Description>ELK Spark Logger Shared Flow with Data Masking</Description>
<DisplayName>Logger Shared Flow</DisplayName>
<LastModifiedAt>2025-01-01T00:00:00.000Z</LastModifiedAt>
<LastModifiedBy>api-developer@company.com</LastModifiedBy>
<ManifestVersion majorVersion="1" minorVersion="0"/>
<Policies>
<Policy>Extract-Request-Data</Policy>
<Policy>Mask-Sensitive-Request-Data</Policy>
<Policy>Build-Request-Log-Payload</Policy>
<Policy>Send-Request-Log-ELK</Policy>
<Policy>Send-Request-Log-Spark</Policy>
<Policy>Extract-Response-Data</Policy>
<Policy>Mask-Sensitive-Response-Data</Policy>
<Policy>Build-Response-Log-Payload</Policy>
<Policy>Send-Response-Log-ELK</Policy>
<Policy>Send-Response-Log-Spark</Policy>
</Policies>
<Resources>
<Resource>jsc://mask-sensitive-data.js</Resource>
<Resource>jsc://mask-sensitive-response-data.js</Resource>
</Resources>
</SharedFlowBundle>
<!-- How to use this shared flow in your API Proxy -->
<!--
1. Deploy the shared flow to your Apigee environment
2. Add FlowCallout policies in your API proxy to call the shared flow
Example FlowCallout for Request logging:
<FlowCallout async="false" continueOnError="true" enabled="true" name="Log-Request">
<DisplayName>Log Request</DisplayName>
<SharedFlowBundle>logger-shared-flow</SharedFlowBundle>
</FlowCallout>
Add this in the PreFlow Request section of your API proxy.
Example FlowCallout for Response logging:
<FlowCallout async="false" continueOnError="true" enabled="true" name="Log-Response">
<DisplayName>Log Response</DisplayName>
<SharedFlowBundle>logger-shared-flow</SharedFlowBundle>
</FlowCallout>
Add this in the PostFlow Response section of your API proxy.
-->
<!-- Environment-specific configuration -->
<!--
For different environments (dev, test, prod), create separate KVM entries:
DEV Environment:
- elk.endpoint.url: https://dev-elasticsearch.company.com:9200
- spark.endpoint.url: https://dev-spark.company.com
TEST Environment:
- elk.endpoint.url: https://test-elasticsearch.company.com:9200
- spark.endpoint.url: https://test-spark.company.com
PROD Environment:
- elk.endpoint.url: https://prod-elasticsearch.company.com:9200
- spark.endpoint.url: https://prod-spark.company.com
-->
Top comments (0)