DEV Community

Evan Lin for Google Developer Experts

Posted on • Originally published at evanlin.com on

[Gemini][Python] LINE Bot AP2 Integration Series (Part 2) - Revisiting the Spec and Implementing a Credential Provider

Agent Payments Protocol Graphic

Preface

This is the second article in the LINE Bot AP2 integration series. In the first article, we completed the basic Shopping Agent and Payment Agent integration, implementing Cart Mandate, HMAC-SHA256 digital signature, and OTP verification mechanism.

However, after actual deployment, I re-examined the AP2 official Spec and found that we missed a very important component: Credential Provider.

This article will share:

  • Why Credential Provider is needed
  • Problems found after re-examining the AP2 Spec
  • How to implement a complete three-tier payment architecture
  • Security mechanisms for Payment Tokens
  • Actual pitfalls experienced

Review: Problems with the original architecture

In the first version of the implementation, our payment flow was as follows:

User selects product → Create Cart Mandate → Initiate payment directly → OTP verification → Complete

Enter fullscreen mode Exit fullscreen mode

This flow has several problems:

  1. Payment method is hardcoded: The same card is used every time, without allowing the user to choose
  2. Sensitive information exposed: Card number and other information is transmitted in multiple places
  3. Lack of tokenization: No one-time token mechanism, there is a risk of replay attacks

Re-examining the AP2 Spec

After re-reading the AP2 Spec document, I found that the complete architecture should be as follows:

┌─────────────────────────────────────────────────────┐
│ Layer 1: Shopping Agent (Shopping Layer) │
│ - Product search and shopping cart management │
│ - Create Cart Mandate │
│ - Merchant Signature (Merchant Signature) │
└──────────────────────┬──────────────────────────────┘
                       ↓
┌─────────────────────────────────────────────────────┐
│ Layer 2: Credential Provider (Credential Layer) ⭐ New this time! │
│ - Securely store user's payment credentials (encrypted) │
│ - Select the best payment method based on the transaction scenario │
│ - Issue one-time Payment Token │
│ - Token binds to a specific Mandate │
└──────────────────────┬──────────────────────────────┘
                       ↓
┌─────────────────────────────────────────────────────┐
│ Layer 3: Payment Agent (Payment Layer) │
│ - Initiate payment using Token (not the card number!) │
│ - User Signature (User Signature) │
│ - OTP verification │
│ - Transaction completion and audit trail │
└─────────────────────────────────────────────────────┘

Enter fullscreen mode Exit fullscreen mode

The key is: The Payment Agent gets a Token, not the actual card number. This way, even if the Token is intercepted, it cannot be reused.

What was modified this time

According to the requirements of the Spec, the following content was mainly added this time:

Item Description
PaymentCredential model Encrypted storage of payment credentials
PaymentToken model One-time payment token
CredentialProviderService Core service for credential management
get_eligible_payment_methods Get eligible payment methods
issue_payment_token_for_mandate Issue Token for Mandate
initiate_payment_with_token Initiate payment using Token

Implementation details

1. PaymentCredential - Encrypted payment credentials

First, we need a data structure to securely store payment credentials:

# src/linebot_ap2/models/payment.py

class CredentialStatus(str, Enum):
    """Payment credential status"""
    ACTIVE = "active" # Available
    SUSPENDED = "suspended" # Suspended
    EXPIRED = "expired" # Expired

class PaymentCredential(BaseModel):
    """Encrypted payment credentials managed by Credential Provider"""

    credential_id: str # Unique identifier for credentials
    user_id: str # User ID
    type: PaymentMethodType # card, wallet, bank_transfer

    # 🔒 Secure display information (excluding sensitive information)
    last_four: str # Last four digits of the card number
    brand: str # Visa, Mastercard
    nickname: Optional[str] = None # User-defined name

    # 🔐 Encrypted sensitive information (using Fernet encryption)
    encrypted_data: Optional[str] = None

    # User preferences
    is_default: bool = False # Default payment method
    priority: int = 0 # Priority

    # Transaction restrictions
    supported_currencies: List[str] = ["USD", "TWD"]
    max_transaction_amount: Optional[float] = None
    min_transaction_amount: float = 0.0

    # Status management
    status: CredentialStatus = CredentialStatus.ACTIVE
    created_at: datetime
    expires_at: Optional[datetime] = None

    def supports_transaction(self, amount: float, currency: str) -> bool:
        """Check if this credential supports the transaction"""
        if not self.is_valid():
            return False
        if currency not in self.supported_currencies:
            return False
        if self.max_transaction_amount and amount > self.max_transaction_amount:
            return False
        return amount >= self.min_transaction_amount

Enter fullscreen mode Exit fullscreen mode

Key point: The actual card number exists inside encrypted_data, encrypted with Fernet symmetric encryption. Only secure display information like last_four can be seen outside.

2. PaymentToken - One-time payment token

This is the most important output of the Credential Provider:

class TokenType(str, Enum):
    """Token type"""
    SINGLE_USE = "single_use" # One-time (default)
    MULTI_USE = "multi_use" # Multiple use
    RECURRING = "recurring" # Recurring payments

class PaymentToken(BaseModel):
    """One-time payment token - binds to a specific Mandate"""

    token_id: str # tok_xxxxxxxxxxxx
    credential_id: str # Corresponding credential
    user_id: str
    mandate_id: str # 🔗 Bound Mandate!

    # Token value (secure random string)
    token_value: str # secrets.token_urlsafe(32)
    token_type: TokenType = TokenType.SINGLE_USE

    # Bound transaction information
    amount: float
    currency: str

    # Validity period (default 30 minutes)
    created_at: datetime
    expires_at: datetime

    # Usage tracking
    used: bool = False
    used_at: Optional[datetime] = None

    def is_valid(self) -> bool:
        """Is the Token valid"""
        if self.used:
            return False # Already used
        if datetime.now(timezone.utc) > self.expires_at:
            return False # Expired
        return True

    def consume(self) -> None:
        """Consume this Token (mark as used)"""
        self.used = True
        self.used_at = datetime.now(timezone.utc)

Enter fullscreen mode Exit fullscreen mode

Key design:

  • Token binds to a specific mandate_id, cannot be used for other transactions
  • Expires in 30 minutes by default
  • Marked used = True immediately after use

3. CredentialProviderService - Core service

# src/linebot_ap2/services/credential_provider.py

from cryptography.fernet import Fernet
import secrets

class CredentialProviderService:
    """AP2 Credential Provider implementation"""

    def __init__ (self):
        # 🔐 Encryption key (should be read from Secret Manager in a production environment)
        self._encryption_key = Fernet.generate_key()
        self._fernet = Fernet(self._encryption_key)

        # Storage structure
        self._credentials: Dict[str, Dict[str, PaymentCredential]] = {}
        self._tokens: Dict[str, PaymentToken] = {}

        # Initialize Demo data
        self._init_demo_credentials()

    def register_credential(
        self,
        user_id: str,
        credential_type: PaymentMethodType,
        credential_data: Dict[str, Any], # Contains the full card number
        brand: str,
        is_default: bool = False
    ) -> PaymentCredential:
        """Register a new payment credential"""

        # 🔐 Encrypt sensitive information
        encrypted = self._fernet.encrypt(
            json.dumps(credential_data).encode()
        ).decode()

        # Extract secure display information
        card_number = credential_data.get("card_number", "")
        last_four = card_number[-4:] if len(card_number) >= 4 else " ****"

        credential = PaymentCredential(
            credential_id=f"cred_{uuid.uuid4().hex[:12]}",
            user_id=user_id,
            type=credential_type,
            last_four=last_four,
            brand=brand,
            encrypted_data=encrypted,
            is_default=is_default,
            created_at=datetime.now(timezone.utc)
        )

        # Store
        if user_id not in self._credentials:
            self._credentials[user_id] = {}
        self._credentials[user_id][credential.credential_id] = credential

        logger.info(f"Registered credential {credential.credential_id}")
        return credential

    def get_eligible_methods(
        self,
        user_id: str,
        amount: float,
        currency: str,
        merchant_accepted_types: Optional[List[PaymentMethodType]] = None
    ) -> List[PaymentCredential]:
        """Get payment methods that meet the transaction conditions"""

        eligible = []
        for cred in self._credentials.get(user_id, {}).values():
            # Check if this transaction is supported
            if not cred.supports_transaction(amount, currency):
                continue

            # Check if the merchant accepts
            if merchant_accepted_types and cred.type not in merchant_accepted_types:
                continue

            eligible.append(cred)

        # Sort: Default priority, then by priority
        eligible.sort(key=lambda c: (-int(c.is_default), -c.priority))
        return eligible

    def issue_payment_token(
        self,
        credential_id: str,
        mandate_id: str,
        amount: float,
        currency: str,
        expiry_minutes: int = 30
    ) -> PaymentToken:
        """Issue a one-time Token for a specific Mandate"""

        credential = self._find_credential(credential_id)
        if not credential:
            raise ValueError(f"Credential not found: {credential_id}")

        if not credential.supports_transaction(amount, currency):
            raise ValueError("Credential does not support this transaction")

        # 🎫 Issue Token
        token = PaymentToken(
            token_id=f"tok_{uuid.uuid4().hex[:12]}",
            credential_id=credential_id,
            user_id=credential.user_id,
            mandate_id=mandate_id, # Bind!
            token_value=secrets.token_urlsafe(32),
            amount=amount,
            currency=currency,
            created_at=datetime.now(timezone.utc),
            expires_at=datetime.now(timezone.utc) + timedelta(minutes=expiry_minutes)
        )

        self._tokens[token.token_id] = token
        logger.info(f"Issued token {token.token_id} for mandate {mandate_id}")
        return token

    def consume_token(self, token_id: str) -> Dict[str, Any]:
        """Consume Token and return decrypted credential data (for payment processing only)"""

        token = self._tokens.get(token_id)
        if not token:
            raise ValueError(f"Token not found: {token_id}")

        if not token.is_valid():
            raise ValueError("Token is expired or already used")

        # 🔓 Decrypt credential data
        credential = self._find_credential(token.credential_id)
        decrypted = json.loads(
            self._fernet.decrypt(credential.encrypted_data.encode())
        )

        # ✅ Mark Token as used
        token.consume()

        return {
            "_credential_data": decrypted, # For payment processing only
            "credential_id": token.credential_id,
            "mandate_id": token.mandate_id,
            "amount": token.amount,
            "currency": token.currency
        }

Enter fullscreen mode Exit fullscreen mode

4. New Shopping Tool: Get Payment Methods

# src/linebot_ap2/tools/shopping_tools.py

def get_eligible_payment_methods(
    user_id: str,
    amount: float,
    currency: str = "USD",
    merchant_accepted_types: str = ""
) -> str:
    """Get payment methods that meet the transaction conditions for the user

    Call this tool after creating the Mandate and before initiating the payment,
    allowing the user to select which payment method to use.
    """

    cp = get_credential_provider() # Singleton

    eligible = cp.get_eligible_methods(
        user_id=user_id,
        amount=amount,
        currency=currency
    )

    # Convert to secure display format
    methods = [{
        "credential_id": c.credential_id,
        "type": c.type.value,
        "brand": c.brand,
        "last_four": c.last_four,
        "nickname": c.nickname,
        "is_default": c.is_default
    } for c in eligible]

    return json.dumps({
        "user_id": user_id,
        "eligible_methods": methods,
        "total": len(methods),
        "transaction_context": {
            "amount": amount,
            "currency": currency
        }
    }, ensure_ascii=False, indent=2)

Enter fullscreen mode Exit fullscreen mode

5. New Shopping Tool: Issue Payment Token

def issue_payment_token_for_mandate(
    user_id: str,
    credential_id: str,
    mandate_id: str
) -> str:
    """Issue a one-time Payment Token for a specific Mandate

    After the user selects a payment method, call this tool to get the Token,
    and then use this Token to initiate the payment.
    """

    # Get Mandate information
    mandate = _cart_mandates.get(mandate_id)
    if not mandate:
        return json.dumps({"error": f"Mandate not found: {mandate_id}"})

    if mandate.get("user_id") != user_id:
        return json.dumps({"error": "Mandate does not belong to this user"})

    # 🎫 Issue Token
    cp = get_credential_provider()
    token = cp.issue_payment_token(
        credential_id=credential_id,
        mandate_id=mandate_id,
        amount=mandate["total_amount"],
        currency=mandate.get("currency", "USD")
    )

    # Record to Mandate
    mandate["payment_token_id"] = token.token_id

    return json.dumps({
        "token_id": token.token_id,
        "credential_id": credential_id,
        "mandate_id": mandate_id,
        "amount": token.amount,
        "currency": token.currency,
        "expires_at": token.expires_at.isoformat(),
        "status": "issued",
        "message": "Token has been issued, please use this Token to initiate payment"
    }, ensure_ascii=False, indent=2)

Enter fullscreen mode Exit fullscreen mode

6. New Payment Tool: Initiate Payment with Token

# src/linebot_ap2/tools/payment_tools.py

def initiate_payment_with_token(
    mandate_id: str,
    token_id: str,
    user_id: str
) -> str:
    """Initiate payment using Payment Token

    This method will not touch the actual card number, only use the Token.
    The Token will be consumed after OTP verification is successful.
    """

    cp = get_credential_provider()

    # Verify Token
    if not cp.validate_token(token_id):
        return json.dumps({"error": "Invalid or expired token"})

    token = cp._tokens.get(token_id)

    # 🔐 Key check: Token must be bound to this Mandate
    if token.mandate_id != mandate_id:
        return json.dumps({
            "error": "Token is not bound to this mandate",
            "status": "invalid_token_binding"
        })

    # Get credential display information (excluding sensitive information)
    credential = cp.get_credential_for_display(token.credential_id)

    # Sign Mandate (AP2 Step 21 - User Signature)
    mandate_service = MandateService()
    mandate_data = _cart_mandates.get(mandate_id, {})
    user_signature = mandate_service.sign_mandate(mandate_data)

    # Generate OTP
    otp = f"{random.randint(100000, 999999)}"

    _otp_store[mandate_id] = {
        "otp": otp,
        "user_id": user_id,
        "token_id": token_id, # Record the Token to be consumed
        "expires_at": datetime.now(timezone.utc) + timedelta(minutes=5),
        "attempts": 0
    }

    return json.dumps({
        "mandate_id": mandate_id,
        "token_id": token_id,
        "payment_method": {
            "type": credential["type"],
            "brand": credential["brand"],
            "last_four": credential["last_four"]
        },
        "amount": token.amount,
        "currency": token.currency,
        "user_signature": user_signature,
        "status": "pending_otp",
        "demo_otp": otp,
        "message": f"Please enter the OTP verification code to confirm the ${token.amount} payment from {credential['brand']} **** {credential['last_four']}"
    }, ensure_ascii=False, indent=2)

Enter fullscreen mode Exit fullscreen mode

Complete flow Demo

I wrote a complete Demo script to demonstrate the four-stage flow:

# scripts/demo_purchase_flow.py

# === Phase 1: Product browsing ===
result = enhanced_search_products(query="phone", category="electronics")
result = enhanced_add_to_cart(user_id="demo_user", product_id="demo_001", quantity=1)

# === Phase 2: Create Mandate ===
result = enhanced_create_cart_mandate(user_id="demo_user", expires_in_minutes=30)
mandate_id = json.loads(result)["mandate_id"]
# Output: mandate_id = "mandate_abc123", merchant_signature = "..."

# === Phase 3: Credential Provider (New!) ===
# Step 1: Get available payment methods
result = get_eligible_payment_methods(
    user_id="demo_user",
    amount=250.0,
    currency="USD"
)
# Output: [{"brand": "Visa", "last_four": "1234", "is_default": true}, ...]

# Step 2: Issue Payment Token
result = issue_payment_token_for_mandate(
    user_id="demo_user",
    credential_id="cred_demo_visa",
    mandate_id=mandate_id
)
token_id = json.loads(result)["token_id"]
# Output: token_id = "tok_xyz789", expires_at = "..."

# === Phase 4: Payment verification ===
# Step 3: Initiate payment using Token
result = initiate_payment_with_token(
    mandate_id=mandate_id,
    token_id=token_id,
    user_id="demo_user"
)
otp_code = json.loads(result)["demo_otp"]
# Output: demo_otp = "583926"

# Step 4: OTP verification
result = enhanced_verify_otp(
    mandate_id=mandate_id,
    otp_code=otp_code,
    user_id="demo_user"
)
# Output: status = "payment_successful", transaction_id = "txn_..."

Enter fullscreen mode Exit fullscreen mode

Pitfalls experienced

1. Persistence of encryption keys

The Demo environment generates a new encryption key every time it restarts, causing previously stored credentials to be unreadable:

# ❌ Error: New key every restart
class CredentialProviderService:
    def __init__ (self):
        self._encryption_key = Fernet.generate_key() # All data is broken after restart!

# ✅ Correct: Read from environment variables or Secret Manager
class CredentialProviderService:
    def __init__ (self):
        key_base64 = os.environ.get("CREDENTIAL_ENCRYPTION_KEY")
        if key_base64:
            self._encryption_key = base64.b64decode(key_base64)
        else:
            self._encryption_key = Fernet.generate_key()
            logger.warning("Using generated key - credentials will not persist!")

Enter fullscreen mode Exit fullscreen mode

2. Token and Mandate binding verification

I initially forgot to check if the Token was bound to the correct Mandate, which is a serious security vulnerability:

# ❌ Error: No binding verification
def initiate_payment_with_token(mandate_id, token_id, user_id):
    token = cp._tokens.get(token_id)
    if token.is_valid():
        # Directly process... An attacker can use Token A to pay Mandate B!

# ✅ Correct: Must verify the Mandate bound by the Token
def initiate_payment_with_token(mandate_id, token_id, user_id):
    token = cp._tokens.get(token_id)

    if token.mandate_id != mandate_id:
        return {"error": "Token is not bound to this mandate"}

    if token.is_valid():
        # Secure processing

Enter fullscreen mode Exit fullscreen mode

3. Singleton pattern

Credential Provider needs to share state, but creating a new instance every time you import will cause the state to be lost:

# ❌ Error: New instance every time
def get_eligible_payment_methods(...):
    cp = CredentialProviderService() # New instance, no previous credentials!

# ✅ Correct: Use singleton
_credential_provider_instance = None

def get_credential_provider():
    global _credential_provider_instance
    if _credential_provider_instance is None:
        _credential_provider_instance = CredentialProviderService()
    return _credential_provider_instance

Enter fullscreen mode Exit fullscreen mode

Security mechanism overview

After integrating Credential Provider, our security mechanism is more complete:

┌────────────────────────────────────────────────────────────┐
│ Security mechanism overview │
├────────────────────────────────────────────────────────────┤
│ 🔐 Encryption │
│ • Fernet symmetric encryption - Encrypted storage of sensitive credential data │
│ • HMAC-SHA256 - Mandate digital signature │
├────────────────────────────────────────────────────────────┤
│ 🎫 Tokenization (New!) │
│ • One-time Payment Token - Expires after use │
│ • Mandate binding - Token can only be used for specified transactions │
│ • 30-minute expiration - Time limit │
├────────────────────────────────────────────────────────────┤
│ 🛡️ Verification │
│ • OTP dual authentication - Verify every transaction │
│ • Dual signature - Merchant signature + User signature │
│ • Attempt limit - 3 OTP errors lock │
├────────────────────────────────────────────────────────────┤
│ 📊 Auditing │
│ • Complete transaction records - All operations are traceable │
│ • Token usage tracking - Record usage time │
└────────────────────────────────────────────────────────────┘

Enter fullscreen mode Exit fullscreen mode

Series article progress

Stage Content Status
First article Basic architecture (Shopping + Payment Agent) ✅ Completed
Second article Credential Provider integration (this article) ✅ Completed
Third article Production environment deployment (Secret Manager, Firestore) 📝 Planned
Fourth article LINE Pay integration 📝 Planned

Conclusion

Re-examining the AP2 Spec made me realize that a complete payment system is not just about "being able to pay", but also needs to consider:

  1. Credential security: Sensitive data must be encrypted and stored
  2. Tokenization: Reduce the scope of exposure of sensitive data
  3. Transaction binding: Each Token can only be used for a specific transaction
  4. Time control: Token must have an expiration time

These are all required by the AP2 Spec, but if you're just doing quick Vibe Coding, it's easy to overlook. So you still have to go back and read the documents carefully!

Example code: https://github.com/kkdai/linebot-ap2

If you find it helpful, please give it a ⭐ Star!

Top comments (0)