DEV Community

Mahmoud Sayed
Mahmoud Sayed

Posted on

Apigee Logger Shared Flow - Implementation Guide

Apigee Logger Shared Flow - Implementation Guide

Overview

This comprehensive logging solution provides structured logging for Apigee API proxies with support for both ELK (Elasticsearch, Logstash, Kibana) and Apache Spark. The solution includes advanced data masking capabilities to protect sensitive information.

Features

Core Functionality

  • Dual Destination Logging: Simultaneously logs to ELK and Spark platforms
  • Request/Response Logging: Captures complete API transaction details
  • Sensitive Data Masking: Automatically masks passwords, tokens, keys, PII data
  • Performance Metrics: Tracks latency and processing times
  • Error Handling: Comprehensive error capture and logging
  • Asynchronous Processing: Non-blocking log transmission

Data Masking Capabilities

  • Email Masking: john.doe@example.comjo***@example.com
  • Phone Masking: 555-123-4567555***4567
  • Credit Card Masking: 1234-5678-9012-34561234****3456
  • SSN Masking: 123-45-6789XXX-XX-6789
  • Token/Key Masking: Full masking for security tokens and API keys
  • Password Masking: Complete masking of password fields

Installation Steps

1. Deploy Shared Flow Components

Create the following directory structure in your Apigee project:

shared-flows/
└── logger-shared-flow/
    ├── sharedflowbundle/
    │   ├── logger-shared-flow.xml
    │   ├── policies/
    │   │   ├── Extract-Request-Data.xml
    │   │   ├── Mask-Sensitive-Request-Data.xml
    │   │   ├── Build-Request-Log-Payload.xml
    │   │   ├── Send-Request-Log-ELK.xml
    │   │   ├── Send-Request-Log-Spark.xml
    │   │   ├── Extract-Response-Data.xml
    │   │   ├── Mask-Sensitive-Response-Data.xml
    │   │   ├── Build-Response-Log-Payload.xml
    │   │   ├── Send-Response-Log-ELK.xml
    │   │   └── Send-Response-Log-Spark.xml
    │   └── resources/
    │       └── jsc/
    │           ├── mask-sensitive-data.js
    │           └── mask-sensitive-response-data.js
Enter fullscreen mode Exit fullscreen mode

2. Configure Environment Variables

Create KVM (Key-Value Map) entries for your environment:

# Using Apigee Management API
curl -X POST \
  https://api.enterprise.apigee.com/v1/organizations/{org}/environments/{env}/keyvaluemaps \
  -H "Authorization: Bearer $ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "logger-config",
    "encrypted": false
  }'

# Add configuration entries
curl -X POST \
  https://api.enterprise.apigee.com/v1/organizations/{org}/environments/{env}/keyvaluemaps/logger-config/entries \
  -H "Authorization: Bearer $ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "elk.logging.enabled",
    "value": "true"
  }'
Enter fullscreen mode Exit fullscreen mode

3. Deploy the Shared Flow

# Deploy using Apigee CLI
apigee sharedflows deploy -o {org} -e {env} -n logger-shared-flow -f ./shared-flows/logger-shared-flow
Enter fullscreen mode Exit fullscreen mode

Configuration

ELK Configuration Parameters

Parameter Description Example Value
elk.logging.enabled Enable/disable ELK logging true
elk.endpoint.url Elasticsearch endpoint URL https://elasticsearch.company.com:9200
elk.index.name Index pattern for logs apigee-logs-{YYYY.MM.DD}
elk.auth.token Bearer token for authentication eyJhbGciOiJIUzI1NiIs...

Spark Configuration Parameters

Parameter Description Example Value
spark.logging.enabled Enable/disable Spark logging true
spark.endpoint.url Spark cluster endpoint https://spark.company.com
spark.topic.name Kafka topic for logs apigee-api-logs
spark.partition.key Partitioning strategy default
spark.auth.credentials Base64 encoded credentials dXNlcjpwYXNzd29yZA==

Usage in API Proxies

Request Logging (PreFlow)

Add this FlowCallout policy in your API proxy's PreFlow Request:

<FlowCallout async="false" continueOnError="true" enabled="true" name="Log-API-Request">
    <DisplayName>Log API Request</DisplayName>
    <SharedFlowBundle>logger-shared-flow</SharedFlowBundle>
</FlowCallout>
Enter fullscreen mode Exit fullscreen mode

Response Logging (PostFlow)

Add this FlowCallout policy in your API proxy's PostFlow Response:

<FlowCallout async="false" continueOnError="true" enabled="true" name="Log-API-Response">
    <DisplayName>Log API Response</DisplayName>
    <SharedFlowBundle>logger-shared-flow</SharedFlowBundle>
</FlowCallout>
Enter fullscreen mode Exit fullscreen mode

Complete API Proxy Integration Example

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ProxyEndpoint name="default">
    <Description/>
    <FaultRules/>
    <PreFlow name="PreFlow">
        <Request>
            <Step>
                <Name>Log-API-Request</Name>
                <Condition>request.verb != "OPTIONS"</Condition>
            </Step>
        </Request>
        <Response/>
    </PreFlow>
    <PostFlow name="PostFlow">
        <Request/>
        <Response>
            <Step>
                <Name>Log-API-Response</Name>
                <Condition>request.verb != "OPTIONS"</Condition>
            </Step>
        </Response>
    </PostFlow>
    <Flows/>
    <HTTPProxyConnection>
        <BasePath>/api/v1/users</BasePath>
        <Properties/>
        <VirtualHost>secure</VirtualHost>
    </HTTPProxyConnection>
    <RouteRule name="default">
        <TargetEndpoint>default</TargetEndpoint>
    </RouteRule>
</ProxyEndpoint>
Enter fullscreen mode Exit fullscreen mode

Log Structure

Request Log Schema

{
  "eventType": "api_request",
  "timestamp": "2025-01-01T12:00:00.000Z",
  "correlationId": "12345678-1234-1234-1234-123456789012",
  "environment": "prod",
  "organization": "company",
  "apiProxy": "users-api",
  "request": {
    "method": "POST",
    "uri": "/api/v1/users",
    "headers": {
      "authorization": "Be***er token",
      "xApiKey": "ak_****5678"
    },
    "payload": {
      "username": "jo***oe",
      "email": "jo***@example.com",
      "password": "********"
    }
  },
  "client": {
    "ip": "192.168.1.100",
    "userAgent": "Mozilla/5.0..."
  },
  "masking": {
    "status": "completed",
    "timestamp": "2025-01-01T12:00:00.000Z"
  }
}
Enter fullscreen mode Exit fullscreen mode

Response Log Schema

{
  "eventType": "api_response",
  "timestamp": "2025-01-01T12:00:01.500Z",
  "correlationId": "12345678-1234-1234-1234-123456789012",
  "response": {
    "statusCode": 201,
    "statusMessage": "Created",
    "payload": {
      "user_id": "user123",
      "access_token": "ey***abc",
      "refresh_token": "rt***xyz"
    }
  },
  "performance": {
    "totalLatency": 1500,
    "targetLatency": 800
  },
  "error": {
    "isError": false
  }
}
Enter fullscreen mode Exit fullscreen mode

Security Considerations

Sensitive Data Protection

  • All passwords are completely masked
  • API keys and tokens show only prefix/suffix
  • Email addresses mask the local part
  • Phone numbers mask middle digits
  • Credit cards show only first 4 and last 4 digits
  • SSN shows only last 4 digits

Best Practices

  1. Regular Token Rotation: Rotate ELK and Spark authentication tokens regularly
  2. Network Security: Ensure encrypted connections (HTTPS/TLS) to logging endpoints
  3. Access Control: Implement proper IAM policies for log access
  4. Data Retention: Configure appropriate log retention policies
  5. Monitoring: Set up alerts for logging failures

Troubleshooting

Common Issues

  1. Logging Disabled: Check KVM configuration for enabled flags
  2. Authentication Failures: Verify bearer tokens and credentials
  3. Network Timeouts: Adjust timeout settings in ServiceCallout policies
  4. Masking Errors: Check JavaScript policy execution logs

Debug Mode

Enable debug logging by adding this variable to your KVM:

debug.logging.enabled = true
Enter fullscreen mode Exit fullscreen mode

Performance Monitoring

Monitor these metrics:

  • Log transmission latency
  • Failed log deliveries
  • Masking operation time
  • Memory usage during processing

Advanced Configuration

Custom Masking Rules

To add custom masking rules, modify the JavaScript resources:

// Add custom sensitive field
var customSensitiveFields = ['custom_field', 'proprietary_data'];
sensitiveFields = sensitiveFields.concat(customSensitiveFields);

// Add custom masking pattern
maskingPatterns['custom_field'] = function(value) {
    // Custom masking logic
    return customMaskFunction(value);
};
Enter fullscreen mode Exit fullscreen mode

Environment-Specific Settings

Create separate KVM configurations for different environments:

# Development
logger-config-dev

# Testing  
logger-config-test

# Production
logger-config-prod
Enter fullscreen mode Exit fullscreen mode

Integration with Monitoring Tools

The logs can be integrated with various monitoring and alerting tools:

  • ELK Alerts: Create Elasticsearch Watcher alerts for error conditions
  • Spark Monitoring: Use Spark Streaming for real-time analytics
  • Grafana Dashboards: Create visualization dashboards for log metrics
  • Splunk Integration: Forward logs to Splunk for enterprise monitoring

This comprehensive logging solution provides enterprise-grade logging capabilities while maintaining security and performance standards.

I'll create a comprehensive Apigee shared flow for logging to ELK and Spark with data masking capabilities. This will include request/response logging and sensitive data protection.

Apigee Logger Shared Flow - Main Flow

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<SharedFlow revision="1" name="logger-shared-flow">
    <DisplayName>ELK Spark Logger Shared Flow</DisplayName>
    <Description>Comprehensive logging shared flow for ELK and Spark with data masking</Description>

    <Step>
        <Name>Extract-Request-Data</Name>
        <Condition>(request.verb != "OPTIONS")</Condition>
    </Step>

    <Step>
        <Name>Mask-Sensitive-Request-Data</Name>
        <Condition>(request.verb != "OPTIONS")</Condition>
    </Step>

    <Step>
        <Name>Build-Request-Log-Payload</Name>
        <Condition>(request.verb != "OPTIONS")</Condition>
    </Step>

    <Step>
        <Name>Send-Request-Log-ELK</Name>
        <Condition>(request.verb != "OPTIONS") and (elk.logging.enabled = true)</Condition>
    </Step>

    <Step>
        <Name>Send-Request-Log-Spark</Name>
        <Condition>(request.verb != "OPTIONS") and (spark.logging.enabled = true)</Condition>
    </Step>

    <Step>
        <Name>Extract-Response-Data</Name>
        <Condition>(request.verb != "OPTIONS")</Condition>
    </Step>

    <Step>
        <Name>Mask-Sensitive-Response-Data</Name>
        <Condition>(request.verb != "OPTIONS")</Condition>
    </Step>

    <Step>
        <Name>Build-Response-Log-Payload</Name>
        <Condition>(request.verb != "OPTIONS")</Condition>
    </Step>

    <Step>
        <Name>Send-Response-Log-ELK</Name>
        <Condition>(request.verb != "OPTIONS") and (elk.logging.enabled = true)</Condition>
    </Step>

    <Step>
        <Name>Send-Response-Log-Spark</Name>
        <Condition>(request.verb != "OPTIONS") and (spark.logging.enabled = true)</Condition>
    </Step>
</SharedFlow>
Enter fullscreen mode Exit fullscreen mode

Extract Request Data Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ExtractVariables async="false" continueOnError="true" enabled="true" name="Extract-Request-Data">
    <DisplayName>Extract Request Data</DisplayName>
    <Properties/>

    <!-- Extract basic request information -->
    <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>

    <!-- Request Headers -->
    <Header name="Content-Type">
        <Pattern ignoreCase="true">{request.header.content-type}</Pattern>
    </Header>

    <Header name="Authorization">
        <Pattern ignoreCase="true">{request.header.authorization}</Pattern>
    </Header>

    <Header name="User-Agent">
        <Pattern ignoreCase="true">{request.header.user-agent}</Pattern>
    </Header>

    <Header name="X-API-Key">
        <Pattern ignoreCase="true">{request.header.x-api-key}</Pattern>
    </Header>

    <!-- Query Parameters -->
    <QueryParam name="username">
        <Pattern ignoreCase="true">{request.queryparam.username}</Pattern>
    </QueryParam>

    <QueryParam name="password">
        <Pattern ignoreCase="true">{request.queryparam.password}</Pattern>
    </QueryParam>

    <QueryParam name="token">
        <Pattern ignoreCase="true">{request.queryparam.token}</Pattern>
    </QueryParam>

    <QueryParam name="api_key">
        <Pattern ignoreCase="true">{request.queryparam.api_key}</Pattern>
    </QueryParam>

    <!-- URI Path Parameters -->
    <URIPath>
        <Pattern ignoreCase="false">/v{version}/*/users/{user_id}/*</Pattern>
    </URIPath>

    <!-- JSON Path Extraction for Request Body -->
    <JSONPayload>
        <Variable name="req.username" type="string">
            <JSONPath>$.username</JSONPath>
        </Variable>
        <Variable name="req.password" type="string">
            <JSONPath>$.password</JSONPath>
        </Variable>
        <Variable name="req.email" type="string">
            <JSONPath>$.email</JSONPath>
        </Variable>
        <Variable name="req.phone" type="string">
            <JSONPath>$.phone</JSONPath>
        </Variable>
        <Variable name="req.ssn" type="string">
            <JSONPath>$.ssn</JSONPath>
        </Variable>
        <Variable name="req.credit_card" type="string">
            <JSONPath>$.credit_card</JSONPath>
        </Variable>
        <Variable name="req.api_key" type="string">
            <JSONPath>$.api_key</JSONPath>
        </Variable>
        <Variable name="req.access_token" type="string">
            <JSONPath>$.access_token</JSONPath>
        </Variable>
        <Variable name="req.refresh_token" type="string">
            <JSONPath>$.refresh_token</JSONPath>
        </Variable>
        <Variable name="req.client_secret" type="string">
            <JSONPath>$.client_secret</JSONPath>
        </Variable>
    </JSONPayload>

    <Source>request</Source>
</ExtractVariables>
Enter fullscreen mode Exit fullscreen mode

Mask Sensitive Request Data Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Javascript async="false" continueOnError="true" enabled="true" name="Mask-Sensitive-Request-Data">
    <DisplayName>Mask Sensitive Request Data</DisplayName>
    <Properties/>
    <ResourceURL>jsc://mask-sensitive-data.js</ResourceURL>
</Javascript>
Enter fullscreen mode Exit fullscreen mode
var sensitiveFields = [
    'password', 'pwd', 'pass', 'secret', 'token', 'key', 'auth', 'authorization',
    'username', 'user', 'email', 'phone', 'ssn', 'social', 'credit_card', 'cc',
    'access_token', 'refresh_token', 'client_secret', 'api_key', 'x-api-key'
];

var maskingPatterns = {
    'email': function(value) {
        if (!value) return value;
        var parts = value.split('@');
        if (parts.length === 2) {
            var localPart = parts[0];
            var maskedLocal = localPart.length > 2 ? 
                localPart.substring(0, 2) + '*'.repeat(localPart.length - 2) : 
                '*'.repeat(localPart.length);
            return maskedLocal + '@' + parts[1];
        }
        return maskDefault(value);
    },
    'phone': function(value) {
        if (!value) return value;
        var cleaned = value.replace(/\D/g, '');
        if (cleaned.length >= 10) {
            return cleaned.substring(0, 3) + '*'.repeat(cleaned.length - 6) + cleaned.substring(cleaned.length - 3);
        }
        return maskDefault(value);
    },
    'credit_card': function(value) {
        if (!value) return value;
        var cleaned = value.replace(/\D/g, '');
        if (cleaned.length >= 12) {
            return cleaned.substring(0, 4) + '*'.repeat(cleaned.length - 8) + cleaned.substring(cleaned.length - 4);
        }
        return maskDefault(value);
    },
    'ssn': function(value) {
        if (!value) return value;
        var cleaned = value.replace(/\D/g, '');
        if (cleaned.length === 9) {
            return 'XXX-XX-' + cleaned.substring(5);
        }
        return maskDefault(value);
    }
};

function maskDefault(value) {
    if (!value) return value;
    if (value.length <= 4) {
        return '*'.repeat(value.length);
    }
    return value.substring(0, 2) + '*'.repeat(value.length - 4) + value.substring(value.length - 2);
}

function maskSensitiveValue(key, value) {
    if (!value) return value;

    var lowerKey = key.toLowerCase();

    // Check for specific masking patterns
    for (var pattern in maskingPatterns) {
        if (lowerKey.includes(pattern)) {
            return maskingPatterns[pattern](value);
        }
    }

    // Check if it's a sensitive field
    for (var i = 0; i < sensitiveFields.length; i++) {
        if (lowerKey.includes(sensitiveFields[i])) {
            return maskDefault(value);
        }
    }

    return value;
}

function maskHeaders() {
    var headers = ['authorization', 'x-api-key', 'x-auth-token'];

    for (var i = 0; i < headers.length; i++) {
        var headerName = headers[i];
        var headerValue = context.getVariable('request.header.' + headerName);
        if (headerValue) {
            var maskedValue = maskSensitiveValue(headerName, headerValue);
            context.setVariable('masked.request.header.' + headerName, maskedValue);
        }
    }
}

function maskQueryParams() {
    var queryParams = ['username', 'password', 'token', 'api_key'];

    for (var i = 0; i < queryParams.length; i++) {
        var paramName = queryParams[i];
        var paramValue = context.getVariable('request.queryparam.' + paramName);
        if (paramValue) {
            var maskedValue = maskSensitiveValue(paramName, paramValue);
            context.setVariable('masked.request.queryparam.' + paramName, maskedValue);
        }
    }
}

function maskJsonFields() {
    var jsonFields = [
        'req.username', 'req.password', 'req.email', 'req.phone', 'req.ssn',
        'req.credit_card', 'req.api_key', 'req.access_token', 'req.refresh_token',
        'req.client_secret'
    ];

    for (var i = 0; i < jsonFields.length; i++) {
        var fieldName = jsonFields[i];
        var fieldValue = context.getVariable(fieldName);
        if (fieldValue) {
            var key = fieldName.replace('req.', '');
            var maskedValue = maskSensitiveValue(key, fieldValue);
            context.setVariable('masked.' + fieldName, maskedValue);
        }
    }
}

function maskRequestPayload() {
    try {
        var requestPayload = context.getVariable('request.content');
        if (requestPayload && context.getVariable('request.header.content-type') && 
            context.getVariable('request.header.content-type').indexOf('application/json') > -1) {

            var payload = JSON.parse(requestPayload);
            var maskedPayload = maskObjectRecursively(payload);
            context.setVariable('masked.request.payload', JSON.stringify(maskedPayload, null, 2));
        } else {
            context.setVariable('masked.request.payload', requestPayload || '');
        }
    } catch (e) {
        context.setVariable('masked.request.payload', 'Error parsing request payload: ' + e.message);
    }
}

function maskObjectRecursively(obj) {
    if (obj === null || obj === undefined) return obj;

    if (typeof obj !== 'object') return obj;

    if (Array.isArray(obj)) {
        return obj.map(function(item) {
            return maskObjectRecursively(item);
        });
    }

    var maskedObj = {};
    for (var key in obj) {
        if (obj.hasOwnProperty(key)) {
            var value = obj[key];
            if (typeof value === 'object' && value !== null) {
                maskedObj[key] = maskObjectRecursively(value);
            } else if (typeof value === 'string') {
                maskedObj[key] = maskSensitiveValue(key, value);
            } else {
                maskedObj[key] = value;
            }
        }
    }
    return maskedObj;
}

// Execute masking
try {
    maskHeaders();
    maskQueryParams();
    maskJsonFields();
    maskRequestPayload();

    context.setVariable('masking.request.status', 'completed');
    context.setVariable('masking.request.timestamp', new Date().toISOString());
} catch (error) {
    context.setVariable('masking.request.status', 'error');
    context.setVariable('masking.request.error', error.message);
}
Enter fullscreen mode Exit fullscreen mode

Build Request Log Payload Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<AssignMessage async="false" continueOnError="true" enabled="true" name="Build-Request-Log-Payload">
    <DisplayName>Build Request Log Payload</DisplayName>
    <Properties/>

    <AssignTo createNew="true" transport="http" type="request">logRequest</AssignTo>

    <Set>
        <Headers>
            <Header name="Content-Type">application/json</Header>
        </Headers>
    </Set>

    <AssignVariable>
        <Name>log.event.type</Name>
        <Value>api_request</Value>
    </AssignVariable>

    <AssignVariable>
        <Name>log.timestamp</Name>
        <Value>{system.timestamp}</Value>
    </AssignVariable>

    <AssignVariable>
        <Name>log.correlation.id</Name>
        <Value>{messageid}</Value>
    </AssignVariable>

    <Payload contentType="application/json" variablePrefix="@" variableSuffix="#">
{
  "eventType": "api_request",
  "timestamp": "@system.timestamp#",
  "correlationId": "@messageid#",
  "environment": "@environment.name#",
  "organization": "@organization.name#",
  "apiProxy": "@apiproxy.name#",
  "apiProxyRevision": "@apiproxy.revision#",
  "request": {
    "method": "@request.verb#",
    "uri": "@request.uri#",
    "path": "@request.path#",
    "queryString": "@request.querystring#",
    "protocol": "@request.scheme#",
    "headers": {
      "contentType": "@masked.request.header.content-type#",
      "userAgent": "@request.header.user-agent#",
      "authorization": "@masked.request.header.authorization#",
      "xApiKey": "@masked.request.header.x-api-key#",
      "host": "@request.header.host#",
      "accept": "@request.header.accept#",
      "acceptEncoding": "@request.header.accept-encoding#"
    },
    "queryParams": {
      "username": "@masked.request.queryparam.username#",
      "password": "@masked.request.queryparam.password#",
      "token": "@masked.request.queryparam.token#",
      "apiKey": "@masked.request.queryparam.api_key#"
    },
    "pathParams": {
      "version": "@version#",
      "userId": "@user_id#"
    },
    "payload": @masked.request.payload#,
    "size": "@request.header.content-length#"
  },
  "client": {
    "ip": "@client.ip#",
    "host": "@client.host#",
    "port": "@client.port#",
    "userAgent": "@request.header.user-agent#"
  },
  "security": {
    "authenticated": "@is.authenticated#",
    "userId": "@user.id#",
    "clientId": "@client_id#",
    "scopes": "@oauth.scope#"
  },
  "flow": {
    "name": "logger-shared-flow",
    "step": "request-logging",
    "executionTime": "@flow.execution.time#"
  },
  "masking": {
    "status": "@masking.request.status#",
    "timestamp": "@masking.request.timestamp#",
    "fieldsCount": "@masking.fields.count#"
  },
  "metadata": {
    "source": "apigee-gateway",
    "version": "1.0",
    "logLevel": "INFO",
    "component": "request-logger"
  }
}
    </Payload>
</AssignMessage>
Enter fullscreen mode Exit fullscreen mode

Send Request Log to ELK Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ServiceCallout async="true" continueOnError="true" enabled="true" name="Send-Request-Log-ELK">
    <DisplayName>Send Request Log to ELK</DisplayName>
    <Properties/>

    <Request clearPayload="false" variable="logRequest">
        <Set>
            <Headers>
                <Header name="Content-Type">application/json</Header>
                <Header name="Authorization">Bearer {elk.auth.token}</Header>
                <Header name="X-Correlation-ID">{messageid}</Header>
            </Headers>
        </Set>
    </Request>

    <Response>elkRequestResponse</Response>

    <HTTPTargetConnection>
        <Properties>
            <Property name="keepalive">true</Property>
            <Property name="connect.timeout.millis">5000</Property>
            <Property name="io.timeout.millis">30000</Property>
        </Properties>
        <URL>{elk.endpoint.url}/{elk.index.name}/_doc</URL>
    </HTTPTargetConnection>
</ServiceCallout>
Enter fullscreen mode Exit fullscreen mode

Send Request Log to Spark Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ServiceCallout async="true" continueOnError="true" enabled="true" name="Send-Request-Log-Spark">
    <DisplayName>Send Request Log to Spark</DisplayName>
    <Properties/>

    <Request clearPayload="false" variable="logRequest">
        <Set>
            <Headers>
                <Header name="Content-Type">application/json</Header>
                <Header name="Authorization">Basic {spark.auth.credentials}</Header>
                <Header name="X-Correlation-ID">{messageid}</Header>
                <Header name="X-Spark-Topic">{spark.topic.name}</Header>
            </Headers>
        </Set>
    </Request>

    <Response>sparkRequestResponse</Response>

    <HTTPTargetConnection>
        <Properties>
            <Property name="keepalive">true</Property>
            <Property name="connect.timeout.millis">3000</Property>
            <Property name="io.timeout.millis">15000</Property>
        </Properties>
        <URL>{spark.endpoint.url}/api/v1/logs/stream</URL>
    </HTTPTargetConnection>
</ServiceCallout>
Enter fullscreen mode Exit fullscreen mode

Extract Response Data Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ExtractVariables async="false" continueOnError="true" enabled="true" name="Extract-Response-Data">
    <DisplayName>Extract Response Data</DisplayName>
    <Properties/>

    <IgnoreUnresolvedVariables>true</IgnoreUnresolvedVariables>

    <!-- Response Headers -->
    <Header name="Content-Type">
        <Pattern ignoreCase="true">{response.header.content-type}</Pattern>
    </Header>

    <Header name="Authorization">
        <Pattern ignoreCase="true">{response.header.authorization}</Pattern>
    </Header>

    <Header name="Set-Cookie">
        <Pattern ignoreCase="true">{response.header.set-cookie}</Pattern>
    </Header>

    <Header name="X-Auth-Token">
        <Pattern ignoreCase="true">{response.header.x-auth-token}</Pattern>
    </Header>

    <!-- JSON Path Extraction for Response Body -->
    <JSONPayload>
        <Variable name="resp.access_token" type="string">
            <JSONPath>$.access_token</JSONPath>
        </Variable>
        <Variable name="resp.refresh_token" type="string">
            <JSONPath>$.refresh_token</JSONPath>
        </Variable>
        <Variable name="resp.api_key" type="string">
            <JSONPath>$.api_key</JSONPath>
        </Variable>
        <Variable name="resp.client_secret" type="string">
            <JSONPath>$.client_secret</JSONPath>
        </Variable>
        <Variable name="resp.password" type="string">
            <JSONPath>$.password</JSONPath>
        </Variable>
        <Variable name="resp.username" type="string">
            <JSONPath>$.username</JSONPath>
        </Variable>
        <Variable name="resp.email" type="string">
            <JSONPath>$.email</JSONPath>
        </Variable>
        <Variable name="resp.phone" type="string">
            <JSONPath>$.phone</JSONPath>
        </Variable>
        <Variable name="resp.ssn" type="string">
            <JSONPath>$.ssn</JSONPath>
        </Variable>
        <Variable name="resp.credit_card" type="string">
            <JSONPath>$.credit_card</JSONPath>
        </Variable>
        <Variable name="resp.session_id" type="string">
            <JSONPath>$.session_id</JSONPath>
        </Variable>
        <Variable name="resp.user_id" type="string">
            <JSONPath>$.user_id</JSONPath>
        </Variable>
        <Variable name="resp.status" type="string">
            <JSONPath>$.status</JSONPath>
        </Variable>
        <Variable name="resp.message" type="string">
            <JSONPath>$.message</JSONPath>
        </Variable>
        <Variable name="resp.error_code" type="string">
            <JSONPath>$.error.code</JSONPath>
        </Variable>
        <Variable name="resp.error_message" type="string">
            <JSONPath>$.error.message</JSONPath>
        </Variable>
        <Variable name="resp.expires_in" type="integer">
            <JSONPath>$.expires_in</JSONPath>
        </Variable>
        <Variable name="resp.token_type" type="string">
            <JSONPath>$.token_type</JSONPath>
        </Variable>
    </JSONPayload>

    <Source>response</Source>
</ExtractVariables>
Enter fullscreen mode Exit fullscreen mode

Mask Sensitive Response Data Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Javascript async="false" continueOnError="true" enabled="true" name="Mask-Sensitive-Response-Data">
    <DisplayName>Mask Sensitive Response Data</DisplayName>
    <Properties/>
    <ResourceURL>jsc://mask-sensitive-response-data.js</ResourceURL>
</Javascript>
Enter fullscreen mode Exit fullscreen mode
// Reuse the masking functions from request masking
var sensitiveFields = [
    'password', 'pwd', 'pass', 'secret', 'token', 'key', 'auth', 'authorization',
    'username', 'user', 'email', 'phone', 'ssn', 'social', 'credit_card', 'cc',
    'access_token', 'refresh_token', 'client_secret', 'api_key', 'x-api-key',
    'session_id', 'sessionid', 'cookie'
];

var maskingPatterns = {
    'email': function(value) {
        if (!value) return value;
        var parts = value.split('@');
        if (parts.length === 2) {
            var localPart = parts[0];
            var maskedLocal = localPart.length > 2 ? 
                localPart.substring(0, 2) + '*'.repeat(localPart.length - 2) : 
                '*'.repeat(localPart.length);
            return maskedLocal + '@' + parts[1];
        }
        return maskDefault(value);
    },
    'phone': function(value) {
        if (!value) return value;
        var cleaned = value.replace(/\D/g, '');
        if (cleaned.length >= 10) {
            return cleaned.substring(0, 3) + '*'.repeat(cleaned.length - 6) + cleaned.substring(cleaned.length - 3);
        }
        return maskDefault(value);
    },
    'credit_card': function(value) {
        if (!value) return value;
        var cleaned = value.replace(/\D/g, '');
        if (cleaned.length >= 12) {
            return cleaned.substring(0, 4) + '*'.repeat(cleaned.length - 8) + cleaned.substring(cleaned.length - 4);
        }
        return maskDefault(value);
    },
    'ssn': function(value) {
        if (!value) return value;
        var cleaned = value.replace(/\D/g, '');
        if (cleaned.length === 9) {
            return 'XXX-XX-' + cleaned.substring(5);
        }
        return maskDefault(value);
    },
    'session': function(value) {
        if (!value) return value;
        if (value.length > 8) {
            return value.substring(0, 4) + '*'.repeat(value.length - 8) + value.substring(value.length - 4);
        }
        return maskDefault(value);
    }
};

function maskDefault(value) {
    if (!value) return value;
    if (value.length <= 4) {
        return '*'.repeat(value.length);
    }
    return value.substring(0, 2) + '*'.repeat(value.length - 4) + value.substring(value.length - 2);
}

function maskSensitiveValue(key, value) {
    if (!value) return value;

    var lowerKey = key.toLowerCase();

    // Check for specific masking patterns
    for (var pattern in maskingPatterns) {
        if (lowerKey.includes(pattern)) {
            return maskingPatterns[pattern](value);
        }
    }

    // Check if it's a sensitive field
    for (var i = 0; i < sensitiveFields.length; i++) {
        if (lowerKey.includes(sensitiveFields[i])) {
            return maskDefault(value);
        }
    }

    return value;
}

function maskResponseHeaders() {
    var headers = ['authorization', 'set-cookie', 'x-auth-token'];

    for (var i = 0; i < headers.length; i++) {
        var headerName = headers[i];
        var headerValue = context.getVariable('response.header.' + headerName);
        if (headerValue) {
            var maskedValue = maskSensitiveValue(headerName, headerValue);
            context.setVariable('masked.response.header.' + headerName, maskedValue);
        }
    }
}

function maskResponseJsonFields() {
    var jsonFields = [
        'resp.access_token', 'resp.refresh_token', 'resp.api_key', 'resp.client_secret',
        'resp.password', 'resp.username', 'resp.email', 'resp.phone', 'resp.ssn',
        'resp.credit_card', 'resp.session_id'
    ];

    for (var i = 0; i < jsonFields.length; i++) {
        var fieldName = jsonFields[i];
        var fieldValue = context.getVariable(fieldName);
        if (fieldValue) {
            var key = fieldName.replace('resp.', '');
            var maskedValue = maskSensitiveValue(key, fieldValue);
            context.setVariable('masked.' + fieldName, maskedValue);
        }
    }
}

function maskResponsePayload() {
    try {
        var responsePayload = context.getVariable('response.content');
        if (responsePayload && context.getVariable('response.header.content-type') && 
            context.getVariable('response.header.content-type').indexOf('application/json') > -1) {

            var payload = JSON.parse(responsePayload);
            var maskedPayload = maskObjectRecursively(payload);
            context.setVariable('masked.response.payload', JSON.stringify(maskedPayload, null, 2));
        } else {
            context.setVariable('masked.response.payload', responsePayload || '');
        }
    } catch (e) {
        context.setVariable('masked.response.payload', 'Error parsing response payload: ' + e.message);
    }
}

function maskObjectRecursively(obj) {
    if (obj === null || obj === undefined) return obj;

    if (typeof obj !== 'object') return obj;

    if (Array.isArray(obj)) {
        return obj.map(function(item) {
            return maskObjectRecursively(item);
        });
    }

    var maskedObj = {};
    for (var key in obj) {
        if (obj.hasOwnProperty(key)) {
            var value = obj[key];
            if (typeof value === 'object' && value !== null) {
                maskedObj[key] = maskObjectRecursively(value);
            } else if (typeof value === 'string') {
                maskedObj[key] = maskSensitiveValue(key, value);
            } else {
                maskedObj[key] = value;
            }
        }
    }
    return maskedObj;
}

// Execute masking
try {
    maskResponseHeaders();
    maskResponseJsonFields();
    maskResponsePayload();

    context.setVariable('masking.response.status', 'completed');
    context.setVariable('masking.response.timestamp', new Date().toISOString());
} catch (error) {
    context.setVariable('masking.response.status', 'error');
    context.setVariable('masking.response.error', error.message);
}
Enter fullscreen mode Exit fullscreen mode

Build Response Log Payload Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<AssignMessage async="false" continueOnError="true" enabled="true" name="Build-Response-Log-Payload">
    <DisplayName>Build Response Log Payload</DisplayName>
    <Properties/>

    <AssignTo createNew="true" transport="http" type="request">logResponse</AssignTo>

    <Set>
        <Headers>
            <Header name="Content-Type">application/json</Header>
        </Headers>
    </Set>

    <AssignVariable>
        <n>log.response.event.type</n>
        <Value>api_response</Value>
    </AssignVariable>

    <AssignVariable>
        <n>log.response.timestamp</n>
        <Value>{system.timestamp}</Value>
    </AssignVariable>

    <AssignVariable>
        <n>log.response.latency</n>
        <Value>{client.received.end.timestamp - client.received.start.timestamp}</Value>
    </AssignVariable>

    <Payload contentType="application/json" variablePrefix="@" variableSuffix="#">
{
  "eventType": "api_response",
  "timestamp": "@system.timestamp#",
  "correlationId": "@messageid#",
  "environment": "@environment.name#",
  "organization": "@organization.name#",
  "apiProxy": "@apiproxy.name#",
  "apiProxyRevision": "@apiproxy.revision#",
  "response": {
    "statusCode": @response.status.code#,
    "statusMessage": "@response.reason.phrase#",
    "headers": {
      "contentType": "@response.header.content-type#",
      "contentLength": "@response.header.content-length#",
      "server": "@response.header.server#",
      "cacheControl": "@response.header.cache-control#",
      "authorization": "@masked.response.header.authorization#",
      "setCookie": "@masked.response.header.set-cookie#",
      "xAuthToken": "@masked.response.header.x-auth-token#"
    },
    "payload": @masked.response.payload#,
    "size": "@response.header.content-length#"
  },
  "performance": {
    "totalLatency": "@client.received.end.timestamp - client.received.start.timestamp#",
    "targetLatency": "@target.received.end.timestamp - target.sent.start.timestamp#",
    "requestProcessingTime": "@request.processing.time#",
    "responseProcessingTime": "@response.processing.time#"
  },
  "backend": {
    "url": "@target.url#",
    "host": "@target.host#",
    "port": "@target.port#",
    "ssl": "@target.ssl#",
    "statusCode": "@target.response.status.code#"
  },
  "client": {
    "ip": "@client.ip#",
    "host": "@client.host#",
    "port": "@client.port#"
  },
  "security": {
    "authenticated": "@is.authenticated#",
    "userId": "@user.id#",
    "clientId": "@client_id#",
    "scopes": "@oauth.scope#"
  },
  "flow": {
    "name": "logger-shared-flow",
    "step": "response-logging",
    "executionTime": "@flow.execution.time#"
  },
  "masking": {
    "status": "@masking.response.status#",
    "timestamp": "@masking.response.timestamp#",
    "fieldsCount": "@masking.fields.count#"
  },
  "error": {
    "isError": @is.error#,
    "code": "@error.code#",
    "message": "@error.message#",
    "source": "@error.source#"
  },
  "analytics": {
    "apiProduct": "@apiproduct.name#",
    "developer": "@developer.email#",
    "app": "@client_id#"
  },
  "metadata": {
    "source": "apigee-gateway",
    "version": "1.0",
    "logLevel": "INFO",
    "component": "response-logger"
  }
}
    </Payload>
</AssignMessage>
Enter fullscreen mode Exit fullscreen mode

Send Response Log to ELK Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ServiceCallout async="true" continueOnError="true" enabled="true" name="Send-Response-Log-ELK">
    <DisplayName>Send Response Log to ELK</DisplayName>
    <Properties/>

    <Request clearPayload="false" variable="logResponse">
        <Set>
            <Headers>
                <Header name="Content-Type">application/json</Header>
                <Header name="Authorization">Bearer {elk.auth.token}</Header>
                <Header name="X-Correlation-ID">{messageid}</Header>
                <Header name="X-ELK-Index">{elk.index.name}</Header>
            </Headers>
        </Set>
    </Request>

    <Response>elkResponseResponse</Response>

    <HTTPTargetConnection>
        <Properties>
            <Property name="keepalive">true</Property>
            <Property name="connect.timeout.millis">5000</Property>
            <Property name="io.timeout.millis">30000</Property>
            <Property name="retry.count">2</Property>
            <Property name="retry.interval">1000</Property>
        </Properties>
        <URL>{elk.endpoint.url}/{elk.index.name}/_doc</URL>
    </HTTPTargetConnection>
</ServiceCallout>
Enter fullscreen mode Exit fullscreen mode

Send Response Log to Spark Policy

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ServiceCallout async="true" continueOnError="true" enabled="true" name="Send-Response-Log-Spark">
    <DisplayName>Send Response Log to Spark</DisplayName>
    <Properties/>

    <Request clearPayload="false" variable="logResponse">
        <Set>
            <Headers>
                <Header name="Content-Type">application/json</Header>
                <Header name="Authorization">Basic {spark.auth.credentials}</Header>
                <Header name="X-Correlation-ID">{messageid}</Header>
                <Header name="X-Spark-Topic">{spark.topic.name}</Header>
                <Header name="X-Spark-Partition">{spark.partition.key}</Header>
            </Headers>
        </Set>
    </Request>

    <Response>sparkResponseResponse</Response>

    <HTTPTargetConnection>
        <Properties>
            <Property name="keepalive">true</Property>
            <Property name="connect.timeout.millis">3000</Property>
            <Property name="io.timeout.millis">15000</Property>
            <Property name="retry.count">1</Property>
            <Property name="retry.interval">500</Property>
        </Properties>
        <URL>{spark.endpoint.url}/api/v1/logs/stream</URL>
    </HTTPTargetConnection>
</ServiceCallout>
Enter fullscreen mode Exit fullscreen mode

Configuration Files and Environment Variables

<!-- KVM Configuration for Environment Variables -->
<!-- Create KVM entries for the following variables: -->

<!-- ELK Configuration -->
<KVM name="logger-config">
    <Entry name="elk.logging.enabled">true</Entry>
    <Entry name="elk.endpoint.url">https://your-elasticsearch-cluster.com:9200</Entry>
    <Entry name="elk.index.name">apigee-logs-{YYYY.MM.DD}</Entry>
    <Entry name="elk.auth.token">your-elk-bearer-token</Entry>

    <!-- Spark Configuration -->
    <Entry name="spark.logging.enabled">true</Entry>
    <Entry name="spark.endpoint.url">https://your-spark-cluster.com</Entry>
    <Entry name="spark.topic.name">apigee-api-logs</Entry>
    <Entry name="spark.partition.key">default</Entry>
    <Entry name="spark.auth.credentials">base64-encoded-credentials</Entry>
</KVM>

<!-- Shared Flow Deployment Descriptor -->
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<SharedFlowBundle revision="1" name="logger-shared-flow">
    <ConfigurationVersion majorVersion="4" minorVersion="0"/>
    <CreatedAt>2025-01-01T00:00:00.000Z</CreatedAt>
    <CreatedBy>api-developer@company.com</CreatedBy>
    <Description>ELK Spark Logger Shared Flow with Data Masking</Description>
    <DisplayName>Logger Shared Flow</DisplayName>
    <LastModifiedAt>2025-01-01T00:00:00.000Z</LastModifiedAt>
    <LastModifiedBy>api-developer@company.com</LastModifiedBy>
    <ManifestVersion majorVersion="1" minorVersion="0"/>
    <Policies>
        <Policy>Extract-Request-Data</Policy>
        <Policy>Mask-Sensitive-Request-Data</Policy>
        <Policy>Build-Request-Log-Payload</Policy>
        <Policy>Send-Request-Log-ELK</Policy>
        <Policy>Send-Request-Log-Spark</Policy>
        <Policy>Extract-Response-Data</Policy>
        <Policy>Mask-Sensitive-Response-Data</Policy>
        <Policy>Build-Response-Log-Payload</Policy>
        <Policy>Send-Response-Log-ELK</Policy>
        <Policy>Send-Response-Log-Spark</Policy>
    </Policies>
    <Resources>
        <Resource>jsc://mask-sensitive-data.js</Resource>
        <Resource>jsc://mask-sensitive-response-data.js</Resource>
    </Resources>
</SharedFlowBundle>

<!-- How to use this shared flow in your API Proxy -->
<!-- 
1. Deploy the shared flow to your Apigee environment
2. Add FlowCallout policies in your API proxy to call the shared flow

Example FlowCallout for Request logging:
<FlowCallout async="false" continueOnError="true" enabled="true" name="Log-Request">
    <DisplayName>Log Request</DisplayName>
    <SharedFlowBundle>logger-shared-flow</SharedFlowBundle>
</FlowCallout>

Add this in the PreFlow Request section of your API proxy.

Example FlowCallout for Response logging:
<FlowCallout async="false" continueOnError="true" enabled="true" name="Log-Response">
    <DisplayName>Log Response</DisplayName>
    <SharedFlowBundle>logger-shared-flow</SharedFlowBundle>
</FlowCallout>

Add this in the PostFlow Response section of your API proxy.
-->

<!-- Environment-specific configuration -->
<!-- 
For different environments (dev, test, prod), create separate KVM entries:

DEV Environment:
- elk.endpoint.url: https://dev-elasticsearch.company.com:9200
- spark.endpoint.url: https://dev-spark.company.com

TEST Environment:
- elk.endpoint.url: https://test-elasticsearch.company.com:9200
- spark.endpoint.url: https://test-spark.company.com

PROD Environment:
- elk.endpoint.url: https://prod-elasticsearch.company.com:9200
- spark.endpoint.url: https://prod-spark.company.com
-->
Enter fullscreen mode Exit fullscreen mode

Top comments (0)