Preface
This is the second article in the LINE Bot AP2 integration series. In the first article, we completed the basic Shopping Agent and Payment Agent integration, implementing Cart Mandate, HMAC-SHA256 digital signature, and OTP verification mechanism.
However, after actual deployment, I re-examined the AP2 official Spec and found that we missed a very important component: Credential Provider.
This article will share:
- Why Credential Provider is needed
- Problems found after re-examining the AP2 Spec
- How to implement a complete three-tier payment architecture
- Security mechanisms for Payment Tokens
- Actual pitfalls experienced
Review: Problems with the original architecture
In the first version of the implementation, our payment flow was as follows:
User selects product → Create Cart Mandate → Initiate payment directly → OTP verification → Complete
This flow has several problems:
- Payment method is hardcoded: The same card is used every time, without allowing the user to choose
- Sensitive information exposed: Card number and other information is transmitted in multiple places
- Lack of tokenization: No one-time token mechanism, there is a risk of replay attacks
Re-examining the AP2 Spec
After re-reading the AP2 Spec document, I found that the complete architecture should be as follows:
┌─────────────────────────────────────────────────────┐
│ Layer 1: Shopping Agent (Shopping Layer) │
│ - Product search and shopping cart management │
│ - Create Cart Mandate │
│ - Merchant Signature (Merchant Signature) │
└──────────────────────┬──────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ Layer 2: Credential Provider (Credential Layer) ⭐ New this time! │
│ - Securely store user's payment credentials (encrypted) │
│ - Select the best payment method based on the transaction scenario │
│ - Issue one-time Payment Token │
│ - Token binds to a specific Mandate │
└──────────────────────┬──────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ Layer 3: Payment Agent (Payment Layer) │
│ - Initiate payment using Token (not the card number!) │
│ - User Signature (User Signature) │
│ - OTP verification │
│ - Transaction completion and audit trail │
└─────────────────────────────────────────────────────┘
The key is: The Payment Agent gets a Token, not the actual card number. This way, even if the Token is intercepted, it cannot be reused.
What was modified this time
According to the requirements of the Spec, the following content was mainly added this time:
| Item | Description |
|---|---|
PaymentCredential model |
Encrypted storage of payment credentials |
PaymentToken model |
One-time payment token |
CredentialProviderService |
Core service for credential management |
get_eligible_payment_methods |
Get eligible payment methods |
issue_payment_token_for_mandate |
Issue Token for Mandate |
initiate_payment_with_token |
Initiate payment using Token |
Implementation details
1. PaymentCredential - Encrypted payment credentials
First, we need a data structure to securely store payment credentials:
# src/linebot_ap2/models/payment.py
class CredentialStatus(str, Enum):
"""Payment credential status"""
ACTIVE = "active" # Available
SUSPENDED = "suspended" # Suspended
EXPIRED = "expired" # Expired
class PaymentCredential(BaseModel):
"""Encrypted payment credentials managed by Credential Provider"""
credential_id: str # Unique identifier for credentials
user_id: str # User ID
type: PaymentMethodType # card, wallet, bank_transfer
# 🔒 Secure display information (excluding sensitive information)
last_four: str # Last four digits of the card number
brand: str # Visa, Mastercard
nickname: Optional[str] = None # User-defined name
# 🔐 Encrypted sensitive information (using Fernet encryption)
encrypted_data: Optional[str] = None
# User preferences
is_default: bool = False # Default payment method
priority: int = 0 # Priority
# Transaction restrictions
supported_currencies: List[str] = ["USD", "TWD"]
max_transaction_amount: Optional[float] = None
min_transaction_amount: float = 0.0
# Status management
status: CredentialStatus = CredentialStatus.ACTIVE
created_at: datetime
expires_at: Optional[datetime] = None
def supports_transaction(self, amount: float, currency: str) -> bool:
"""Check if this credential supports the transaction"""
if not self.is_valid():
return False
if currency not in self.supported_currencies:
return False
if self.max_transaction_amount and amount > self.max_transaction_amount:
return False
return amount >= self.min_transaction_amount
Key point: The actual card number exists inside encrypted_data, encrypted with Fernet symmetric encryption. Only secure display information like last_four can be seen outside.
2. PaymentToken - One-time payment token
This is the most important output of the Credential Provider:
class TokenType(str, Enum):
"""Token type"""
SINGLE_USE = "single_use" # One-time (default)
MULTI_USE = "multi_use" # Multiple use
RECURRING = "recurring" # Recurring payments
class PaymentToken(BaseModel):
"""One-time payment token - binds to a specific Mandate"""
token_id: str # tok_xxxxxxxxxxxx
credential_id: str # Corresponding credential
user_id: str
mandate_id: str # 🔗 Bound Mandate!
# Token value (secure random string)
token_value: str # secrets.token_urlsafe(32)
token_type: TokenType = TokenType.SINGLE_USE
# Bound transaction information
amount: float
currency: str
# Validity period (default 30 minutes)
created_at: datetime
expires_at: datetime
# Usage tracking
used: bool = False
used_at: Optional[datetime] = None
def is_valid(self) -> bool:
"""Is the Token valid"""
if self.used:
return False # Already used
if datetime.now(timezone.utc) > self.expires_at:
return False # Expired
return True
def consume(self) -> None:
"""Consume this Token (mark as used)"""
self.used = True
self.used_at = datetime.now(timezone.utc)
Key design:
- Token binds to a specific
mandate_id, cannot be used for other transactions - Expires in 30 minutes by default
- Marked
used = Trueimmediately after use
3. CredentialProviderService - Core service
# src/linebot_ap2/services/credential_provider.py
from cryptography.fernet import Fernet
import secrets
class CredentialProviderService:
"""AP2 Credential Provider implementation"""
def __init__ (self):
# 🔐 Encryption key (should be read from Secret Manager in a production environment)
self._encryption_key = Fernet.generate_key()
self._fernet = Fernet(self._encryption_key)
# Storage structure
self._credentials: Dict[str, Dict[str, PaymentCredential]] = {}
self._tokens: Dict[str, PaymentToken] = {}
# Initialize Demo data
self._init_demo_credentials()
def register_credential(
self,
user_id: str,
credential_type: PaymentMethodType,
credential_data: Dict[str, Any], # Contains the full card number
brand: str,
is_default: bool = False
) -> PaymentCredential:
"""Register a new payment credential"""
# 🔐 Encrypt sensitive information
encrypted = self._fernet.encrypt(
json.dumps(credential_data).encode()
).decode()
# Extract secure display information
card_number = credential_data.get("card_number", "")
last_four = card_number[-4:] if len(card_number) >= 4 else " ****"
credential = PaymentCredential(
credential_id=f"cred_{uuid.uuid4().hex[:12]}",
user_id=user_id,
type=credential_type,
last_four=last_four,
brand=brand,
encrypted_data=encrypted,
is_default=is_default,
created_at=datetime.now(timezone.utc)
)
# Store
if user_id not in self._credentials:
self._credentials[user_id] = {}
self._credentials[user_id][credential.credential_id] = credential
logger.info(f"Registered credential {credential.credential_id}")
return credential
def get_eligible_methods(
self,
user_id: str,
amount: float,
currency: str,
merchant_accepted_types: Optional[List[PaymentMethodType]] = None
) -> List[PaymentCredential]:
"""Get payment methods that meet the transaction conditions"""
eligible = []
for cred in self._credentials.get(user_id, {}).values():
# Check if this transaction is supported
if not cred.supports_transaction(amount, currency):
continue
# Check if the merchant accepts
if merchant_accepted_types and cred.type not in merchant_accepted_types:
continue
eligible.append(cred)
# Sort: Default priority, then by priority
eligible.sort(key=lambda c: (-int(c.is_default), -c.priority))
return eligible
def issue_payment_token(
self,
credential_id: str,
mandate_id: str,
amount: float,
currency: str,
expiry_minutes: int = 30
) -> PaymentToken:
"""Issue a one-time Token for a specific Mandate"""
credential = self._find_credential(credential_id)
if not credential:
raise ValueError(f"Credential not found: {credential_id}")
if not credential.supports_transaction(amount, currency):
raise ValueError("Credential does not support this transaction")
# 🎫 Issue Token
token = PaymentToken(
token_id=f"tok_{uuid.uuid4().hex[:12]}",
credential_id=credential_id,
user_id=credential.user_id,
mandate_id=mandate_id, # Bind!
token_value=secrets.token_urlsafe(32),
amount=amount,
currency=currency,
created_at=datetime.now(timezone.utc),
expires_at=datetime.now(timezone.utc) + timedelta(minutes=expiry_minutes)
)
self._tokens[token.token_id] = token
logger.info(f"Issued token {token.token_id} for mandate {mandate_id}")
return token
def consume_token(self, token_id: str) -> Dict[str, Any]:
"""Consume Token and return decrypted credential data (for payment processing only)"""
token = self._tokens.get(token_id)
if not token:
raise ValueError(f"Token not found: {token_id}")
if not token.is_valid():
raise ValueError("Token is expired or already used")
# 🔓 Decrypt credential data
credential = self._find_credential(token.credential_id)
decrypted = json.loads(
self._fernet.decrypt(credential.encrypted_data.encode())
)
# ✅ Mark Token as used
token.consume()
return {
"_credential_data": decrypted, # For payment processing only
"credential_id": token.credential_id,
"mandate_id": token.mandate_id,
"amount": token.amount,
"currency": token.currency
}
4. New Shopping Tool: Get Payment Methods
# src/linebot_ap2/tools/shopping_tools.py
def get_eligible_payment_methods(
user_id: str,
amount: float,
currency: str = "USD",
merchant_accepted_types: str = ""
) -> str:
"""Get payment methods that meet the transaction conditions for the user
Call this tool after creating the Mandate and before initiating the payment,
allowing the user to select which payment method to use.
"""
cp = get_credential_provider() # Singleton
eligible = cp.get_eligible_methods(
user_id=user_id,
amount=amount,
currency=currency
)
# Convert to secure display format
methods = [{
"credential_id": c.credential_id,
"type": c.type.value,
"brand": c.brand,
"last_four": c.last_four,
"nickname": c.nickname,
"is_default": c.is_default
} for c in eligible]
return json.dumps({
"user_id": user_id,
"eligible_methods": methods,
"total": len(methods),
"transaction_context": {
"amount": amount,
"currency": currency
}
}, ensure_ascii=False, indent=2)
5. New Shopping Tool: Issue Payment Token
def issue_payment_token_for_mandate(
user_id: str,
credential_id: str,
mandate_id: str
) -> str:
"""Issue a one-time Payment Token for a specific Mandate
After the user selects a payment method, call this tool to get the Token,
and then use this Token to initiate the payment.
"""
# Get Mandate information
mandate = _cart_mandates.get(mandate_id)
if not mandate:
return json.dumps({"error": f"Mandate not found: {mandate_id}"})
if mandate.get("user_id") != user_id:
return json.dumps({"error": "Mandate does not belong to this user"})
# 🎫 Issue Token
cp = get_credential_provider()
token = cp.issue_payment_token(
credential_id=credential_id,
mandate_id=mandate_id,
amount=mandate["total_amount"],
currency=mandate.get("currency", "USD")
)
# Record to Mandate
mandate["payment_token_id"] = token.token_id
return json.dumps({
"token_id": token.token_id,
"credential_id": credential_id,
"mandate_id": mandate_id,
"amount": token.amount,
"currency": token.currency,
"expires_at": token.expires_at.isoformat(),
"status": "issued",
"message": "Token has been issued, please use this Token to initiate payment"
}, ensure_ascii=False, indent=2)
6. New Payment Tool: Initiate Payment with Token
# src/linebot_ap2/tools/payment_tools.py
def initiate_payment_with_token(
mandate_id: str,
token_id: str,
user_id: str
) -> str:
"""Initiate payment using Payment Token
This method will not touch the actual card number, only use the Token.
The Token will be consumed after OTP verification is successful.
"""
cp = get_credential_provider()
# Verify Token
if not cp.validate_token(token_id):
return json.dumps({"error": "Invalid or expired token"})
token = cp._tokens.get(token_id)
# 🔐 Key check: Token must be bound to this Mandate
if token.mandate_id != mandate_id:
return json.dumps({
"error": "Token is not bound to this mandate",
"status": "invalid_token_binding"
})
# Get credential display information (excluding sensitive information)
credential = cp.get_credential_for_display(token.credential_id)
# Sign Mandate (AP2 Step 21 - User Signature)
mandate_service = MandateService()
mandate_data = _cart_mandates.get(mandate_id, {})
user_signature = mandate_service.sign_mandate(mandate_data)
# Generate OTP
otp = f"{random.randint(100000, 999999)}"
_otp_store[mandate_id] = {
"otp": otp,
"user_id": user_id,
"token_id": token_id, # Record the Token to be consumed
"expires_at": datetime.now(timezone.utc) + timedelta(minutes=5),
"attempts": 0
}
return json.dumps({
"mandate_id": mandate_id,
"token_id": token_id,
"payment_method": {
"type": credential["type"],
"brand": credential["brand"],
"last_four": credential["last_four"]
},
"amount": token.amount,
"currency": token.currency,
"user_signature": user_signature,
"status": "pending_otp",
"demo_otp": otp,
"message": f"Please enter the OTP verification code to confirm the ${token.amount} payment from {credential['brand']} **** {credential['last_four']}"
}, ensure_ascii=False, indent=2)
Complete flow Demo
I wrote a complete Demo script to demonstrate the four-stage flow:
# scripts/demo_purchase_flow.py
# === Phase 1: Product browsing ===
result = enhanced_search_products(query="phone", category="electronics")
result = enhanced_add_to_cart(user_id="demo_user", product_id="demo_001", quantity=1)
# === Phase 2: Create Mandate ===
result = enhanced_create_cart_mandate(user_id="demo_user", expires_in_minutes=30)
mandate_id = json.loads(result)["mandate_id"]
# Output: mandate_id = "mandate_abc123", merchant_signature = "..."
# === Phase 3: Credential Provider (New!) ===
# Step 1: Get available payment methods
result = get_eligible_payment_methods(
user_id="demo_user",
amount=250.0,
currency="USD"
)
# Output: [{"brand": "Visa", "last_four": "1234", "is_default": true}, ...]
# Step 2: Issue Payment Token
result = issue_payment_token_for_mandate(
user_id="demo_user",
credential_id="cred_demo_visa",
mandate_id=mandate_id
)
token_id = json.loads(result)["token_id"]
# Output: token_id = "tok_xyz789", expires_at = "..."
# === Phase 4: Payment verification ===
# Step 3: Initiate payment using Token
result = initiate_payment_with_token(
mandate_id=mandate_id,
token_id=token_id,
user_id="demo_user"
)
otp_code = json.loads(result)["demo_otp"]
# Output: demo_otp = "583926"
# Step 4: OTP verification
result = enhanced_verify_otp(
mandate_id=mandate_id,
otp_code=otp_code,
user_id="demo_user"
)
# Output: status = "payment_successful", transaction_id = "txn_..."
Pitfalls experienced
1. Persistence of encryption keys
The Demo environment generates a new encryption key every time it restarts, causing previously stored credentials to be unreadable:
# ❌ Error: New key every restart
class CredentialProviderService:
def __init__ (self):
self._encryption_key = Fernet.generate_key() # All data is broken after restart!
# ✅ Correct: Read from environment variables or Secret Manager
class CredentialProviderService:
def __init__ (self):
key_base64 = os.environ.get("CREDENTIAL_ENCRYPTION_KEY")
if key_base64:
self._encryption_key = base64.b64decode(key_base64)
else:
self._encryption_key = Fernet.generate_key()
logger.warning("Using generated key - credentials will not persist!")
2. Token and Mandate binding verification
I initially forgot to check if the Token was bound to the correct Mandate, which is a serious security vulnerability:
# ❌ Error: No binding verification
def initiate_payment_with_token(mandate_id, token_id, user_id):
token = cp._tokens.get(token_id)
if token.is_valid():
# Directly process... An attacker can use Token A to pay Mandate B!
# ✅ Correct: Must verify the Mandate bound by the Token
def initiate_payment_with_token(mandate_id, token_id, user_id):
token = cp._tokens.get(token_id)
if token.mandate_id != mandate_id:
return {"error": "Token is not bound to this mandate"}
if token.is_valid():
# Secure processing
3. Singleton pattern
Credential Provider needs to share state, but creating a new instance every time you import will cause the state to be lost:
# ❌ Error: New instance every time
def get_eligible_payment_methods(...):
cp = CredentialProviderService() # New instance, no previous credentials!
# ✅ Correct: Use singleton
_credential_provider_instance = None
def get_credential_provider():
global _credential_provider_instance
if _credential_provider_instance is None:
_credential_provider_instance = CredentialProviderService()
return _credential_provider_instance
Security mechanism overview
After integrating Credential Provider, our security mechanism is more complete:
┌────────────────────────────────────────────────────────────┐
│ Security mechanism overview │
├────────────────────────────────────────────────────────────┤
│ 🔐 Encryption │
│ • Fernet symmetric encryption - Encrypted storage of sensitive credential data │
│ • HMAC-SHA256 - Mandate digital signature │
├────────────────────────────────────────────────────────────┤
│ 🎫 Tokenization (New!) │
│ • One-time Payment Token - Expires after use │
│ • Mandate binding - Token can only be used for specified transactions │
│ • 30-minute expiration - Time limit │
├────────────────────────────────────────────────────────────┤
│ 🛡️ Verification │
│ • OTP dual authentication - Verify every transaction │
│ • Dual signature - Merchant signature + User signature │
│ • Attempt limit - 3 OTP errors lock │
├────────────────────────────────────────────────────────────┤
│ 📊 Auditing │
│ • Complete transaction records - All operations are traceable │
│ • Token usage tracking - Record usage time │
└────────────────────────────────────────────────────────────┘
Series article progress
| Stage | Content | Status |
|---|---|---|
| First article | Basic architecture (Shopping + Payment Agent) | ✅ Completed |
| Second article | Credential Provider integration (this article) | ✅ Completed |
| Third article | Production environment deployment (Secret Manager, Firestore) | 📝 Planned |
| Fourth article | LINE Pay integration | 📝 Planned |
Conclusion
Re-examining the AP2 Spec made me realize that a complete payment system is not just about "being able to pay", but also needs to consider:
- Credential security: Sensitive data must be encrypted and stored
- Tokenization: Reduce the scope of exposure of sensitive data
- Transaction binding: Each Token can only be used for a specific transaction
- Time control: Token must have an expiration time
These are all required by the AP2 Spec, but if you're just doing quick Vibe Coding, it's easy to overlook. So you still have to go back and read the documents carefully!
Example code: https://github.com/kkdai/linebot-ap2
If you find it helpful, please give it a ⭐ Star!

Top comments (0)