Banner copyright: Gemini
Introduction
The Stellar Web Authentication also referred to as SEP-0010 is based on JSON Web Token (JWT) RFC7519 which is the most used self-contained way for securely transmitting information between parties as a JSON object. One of the use cases for JWT is authentication which is the focus of this article. Stellar Web Authentication uses a wallet's keypair which comprises the public key: G...
and the secret key: S...
as base credentials for authentication.
Stellar Web Authentication
This walk-through comprises different components and keywords that will be highlighted earlier for ease of understanding.
Note: This implementation of SEP-0010 does not feature client attribution which entails the verification of the client_domain
and verify_challenge_transaction_signed_by_client_master_key
operation.
Assuming you have already set up your Django project, let's go ahead and define an app that will handle our auth:
python manage.py startapp sep10
Add the created app to the installed apps list in the settings.py
file in the root folder of your project:
INSTALLED_APPS = [
"sep10",
]
Define the variables below in the settings.py
file in the root folder of your project:
from stellar_sdk import Server
STELLAR_NETWORK = "Test SDF Network ; September 2015"
HORIZON_URI = "https://horizon-testnet.stellar.org"
SERVER_JWT_KEY = "key_used_to_hash_the_jwt_str"
HORIZON_SERVER = Server(horizon_url=HORIZON_URI)
SIGNING_KEY = "server_account_public_key"
SIGNING_SEED = "server_account_secret_seed"
HOST_URL = "http://localhost:8000"
SEP10_HOME_DOMAINS="foo.com, bar.io"
Define a custom JWT token class
In the sep10
app folder add the JWT token class below in the token.py
file:
from jwt import decode
from datetime import datetime, timezone
from jwt.exceptions import InvalidTokenError
from typing import Optional, Union, Dict
from stellar_sdk import Keypair, MuxedAccount
from stellar_sdk.strkey import StrKey
from stellar_sdk.exceptions import (
Ed25519PublicKeyInvalidError,
MuxedEd25519AccountInvalidError,
)
from project_name import settings # replace project_name with actual Django project name
class SEP10Token:
"""An object that represents the authenticated sessions of the client."""
_CLAIMS = {"iss", "sub", "exp", "iat"}
def __init__(self, jwt: Union[str, Dict]):
if isinstance(jwt, str):
try:
jwt = decode(jwt, settings.SERVER_JWT_KEY, algorithms=["HS256"])
except InvalidTokenError:
raise ValueError("jwt token has expired or is invalid")
elif not isinstance(jwt, Dict):
raise ValueError(
"invalid type for 'jwt' parameter: must be a string or dictionary"
)
if not self._CLAIMS.issubset(set(jwt.keys())):
raise ValueError(
f"jwt is missing one of the required claims: {', '.join(self._CLAIMS)}"
)
self.memo = None
self.stellar_account = None
if jwt["sub"].startswith("M"):
try:
StrKey.decode_muxed_account(jwt["sub"])
except (ValueError, MuxedEd25519AccountInvalidError):
raise ValueError(f"invalid muxed account: {jwt['sub']}")
elif ":" in jwt["sub"]:
try:
self.stellar_account, self.memo = jwt["sub"].split(":")
except ValueError:
raise ValueError(f"improperly formatted 'sub' value: {jwt['sub']}")
else:
self.stellar_account = jwt["sub"]
if self.stellar_account:
try:
Keypair.from_public_key(self.stellar_account)
except Ed25519PublicKeyInvalidError:
raise ValueError(f"invalid Stellar public key: {jwt['sub']}")
if self.memo:
try:
int(self.memo)
except ValueError:
raise ValueError(
f"invalid memo in 'sub' value, expected 64-bit integer: {self.memo}"
)
try:
iat = datetime.fromtimestamp(jwt["iat"], tz=timezone.utc)
except (OSError, ValueError, OverflowError):
raise ValueError("invalid iat value")
try:
exp = datetime.fromtimestamp(jwt["exp"], tz=timezone.utc)
except (OSError, ValueError, OverflowError):
raise ValueError("invalid exp value")
now = datetime.now(tz=timezone.utc)
if now < iat or now > exp:
raise ValueError("jwt is no longer valid")
self._payload = jwt
@property
def account(self) -> str:
"""
The Stellar account (`G...`) authenticated. Note that a muxed account
could have been authenticated, in which case `Token.muxed_account` should
be used.
"""
if self._payload["sub"].startswith("M"):
return MuxedAccount.from_account(self._payload["sub"]).account_id
elif ":" in self._payload["sub"]:
return self._payload["sub"].split(":")[0]
else:
return self._payload["sub"]
@property
def muxed_account(self) -> Optional[str]:
"""
The M-address specified in the payload's ``sub`` value, if present
"""
if self._payload["sub"].startswith("M"):
return self._payload["sub"]
else:
return None
@property
def memo(self) -> Optional[int]:
"""
The memo included with the payload's ``sub`` value, if present
"""
return (
int(self._payload["sub"].split(":")[1])
if ":" in self._payload["sub"]
else None
)
@property
def issuer(self) -> str:
"""
The principal that issued a token, RFC7519, Section 4.1.1 — a Uniform
Resource Identifier (URI) for the issuer
(https://example.com or https://example.com/G...)
"""
return self._payload["iss"]
@property
def issued_at(self) -> datetime:
"""
The time at which the JWT was issued RFC7519, Section 4.1.6 -
represented as a UTC datetime object
"""
return datetime.fromtimestamp(self._payload["iat"], tz=timezone.utc)
@property
def expires_at(self) -> datetime:
"""
The expiration time on or after which the JWT will not accepted for
processing, RFC7519, Section 4.1.4 — represented as a UTC datetime object
"""
return datetime.fromtimestamp(self._payload["exp"], tz=timezone.utc)
@property
def payload(self) -> dict:
"""
The decoded contents of the JWT string
"""
return self._payload
Define a custom authentication scheme
The authentication scheme inherits from the BaseAuthentication
class and allows us to override the authenticate method that performs checks on the request object such as:
- Verifying authorization header format in request i.e.
Bearer <token>
. - Verifying and decoding the JWT token provided in the authorization header.
- Verifying that the credentials provided are tied to an existing User object assuming the User model contains the stellar account as a field.
In the sep10
app folder add the authentication scheme class below in the authentication.py
file:
from django.contrib.auth import get_user_model
from rest_framework.request import Request
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed as SEP10AuthenticationFailed
from sep10.token import SEP10Token
User = get_user_model()
class SEP10Authentication(BaseAuthentication):
def authenticate(self, request: Request) -> tuple[User, SEP10Token]:
"""
This function authenticates a user by verifying and decoding
a JWT token from the authorization header of a request.
:param request: The HTTP request object that contains information
about the incoming request, such as headers, query
parameters, and request body
:return: a tuple containing the authenticated User object and
the SEP10 token object.
"""
auth_header = request.headers.get("Authorization")
if not auth_header:
return None
if not auth_header.startswith("Bearer "):
raise SEP10AuthenticationFailed("Invalid auth header format.")
jwt = auth_header.split(" ", 1)[1]
token = SEP10Token(jwt)
try:
user = User.objects.get(stellar_account=token.stellar_account)
except User.DoesNotExist:
raise SEP10AuthenticationFailed(
"No active account found with the given credentials."
)
return (user, token)
The custom authentication scheme can be set globally on the default authentication classes setting:
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": ["sep10.authentication.SEP10Authentication",]
}
Or on a per-view or per-viewset basis, using the APIView class-based views:
from sep10.authentication import SEP10Authentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
class ExampleView(APIView):
authentication_classes = [SEP10Authentication]
permission_classes = [IsAuthenticated]
def get(self, request, format=None):
content = {}
return Response(content)
Or, if you're using the @api_view
decorator with function-based views:
@api_view(['GET'])
@authentication_classes([SEP10Authentication])
@permission_classes([IsAuthenticated])
def example_view(request, format=None):
content = {}
return Response(content)
Define helper functions
These helper functions wrap the functionality of the SEP10 Authentication into tangible views or viewsets using the following operations: build_challenge_transaction
, read_challenge_transaction
and verify_challenge_transaction_threshold
which can be found in detail in the stellar-sdk.
In the sep10
app folder add the helper functions below in the helpers.py
file:
import os
import jwt
import logging
from urllib.parse import urlparse
from stellar_sdk.sep.stellar_web_authentication import (
build_challenge_transaction,
read_challenge_transaction,
InvalidSep10ChallengeError,
verify_challenge_transaction_threshold,
)
from stellar_sdk import MuxedAccount, exceptions
from project_name import settings
logger = logging.getLogger(__name__)
def _challenge_transaction(
client_account_id,
home_domain,
client_domain=None,
client_signing_key=None,
memo=None,
):
"""
This function generates a challenge transaction for a client account in Python, as per SEP 10.
:param client_account_id: The Stellar account ID of the client requesting authentication
:param home_domain: The domain of the Stellar account issuing the challenge transaction.
This is typically the domain of the server that is providing the authentication service
:param client_domain: The domain of the client requesting the challenge transaction. It is an
optional parameter
:param client_signing_key: The public key of the client account's signing key. This is used to
verify the signature of the challenge transaction
:param memo: A memo to include in the challenge transaction. This is an optional parameter and can
be used to include additional information in the transaction
:return: The XDR encoding of the challenge transaction for a client account.
"""
return build_challenge_transaction(
server_secret=settings.SIGNING_SEED,
client_account_id=client_account_id,
home_domain=home_domain,
web_auth_domain=urlparse(settings.HOST_URL).netloc,
network_passphrase=settings.STELLAR_NETWORK,
client_domain=client_domain,
client_signing_key=client_signing_key,
memo=memo,
)
def _generate_jwt(envelope_xdr: str) -> str:
"""
Generates the JSON web token from the challenge transaction XDR.
See: https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0010.md#token
"""
try:
challenge = read_challenge_transaction(
challenge_transaction=envelope_xdr,
server_account_id=settings.SIGNING_KEY,
home_domains=settings.SEP10_HOME_DOMAINS,
web_auth_domain=urlparse(settings.HOST_URL).netloc,
network_passphrase=settings.STELLAR_NETWORK,
)
except (InvalidSep10ChallengeError, TypeError) as e:
logger.error(f"Invalid challenge transaction: {e}")
raise ValueError(f"Invalid challenge transaction: {e}")
# extract the Stellar account from the muxed account to check for its existence
stellar_account = challenge.client_account_id
if challenge.client_account_id.startswith("M"):
stellar_account = MuxedAccount.from_account(
challenge.client_account_id
).account_id
with settings.HORIZON_SERVER as server:
try:
account = server.load_account(stellar_account)
except exceptions.NotFoundError:
raise ValueError(f"Account {stellar_account} not found")
signers = account.load_ed25519_public_key_signers()
threshold = account.thresholds.med_threshold
try:
signers_found = verify_challenge_transaction_threshold(
challenge_transaction=envelope_xdr,
server_account_id=settings.SIGNING_KEY,
home_domains=settings.SEP10_HOME_DOMAINS,
web_auth_domain=urlparse(settings.HOST_URL).netloc,
network_passphrase=settings.STELLAR_NETWORK,
threshold=threshold,
signers=signers,
)
logger.info(
f"Generating SEP-10 token for account {challenge.client_account_id}"
)
except InvalidSep10ChallengeError as e:
logger.error(f"Invalid challenge transaction: {e}")
raise ValueError(f"Invalid challenge transaction: {e}")
logger.info(
"Challenge verified using account signers: "
f"{[s.account_id for s in signers_found]}"
)
# set iat value to minimum timebound of the challenge so that the JWT returned
# for a given challenge is always the same.
# https://github.com/stellar/stellar-protocol/pull/982
issued_at = challenge.transaction.transaction.preconditions.time_bounds.min_time
# format sub value based on muxed account or memo
if challenge.client_account_id.startswith("M") or not challenge.memo:
sub = challenge.client_account_id
else:
sub = f"{challenge.client_account_id}:{challenge.memo}"
jwt_dict = {
"iss": os.path.join(settings.HOST_URL, "auth"),
"sub": sub,
"iat": issued_at,
"exp": issued_at + 24 * 60 * 60,
"jti": challenge.transaction.hash().hex(),
}
return jwt.encode(jwt_dict, settings.SERVER_JWT_KEY, algorithm="HS256")
Define views to handle the authentication flow
The views will be defined based on the auth flow below:
Going with the single responsibility principle, we'll separate the auth flow into three separate class-based views; GetChallengeTransaction
, SignChallengeTransaction
and GetJWTToken
In the sep10
app folder add the view-based classes below in the views.py
file:
import logging
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework import status
from stellar_sdk import Keypair, exceptions, helpers
from sep10.helpers import _challenge_transaction, _generate_jwt
from project_name import settings
logger = logging.getLogger(__name__)
class GetChallengeTransaction(APIView):
def get(self, request: Request) -> Response:
with settings.HORIZON_SERVER as server:
client_account = request.query_params.get("client_account")
home_domain = request.query_params.get("home_domain")
try:
server.load_account(client_account)
except exceptions.NotFoundError:
return Response(
{"error": "Client account not found."},
status=status.HTTP_404_NOT_FOUND,
)
if home_domain not in settings.SEP10_HOME_DOMAINS:
return Response(
{"error": "invalid 'home_domain' value."},
status=status.HTTP_400_BAD_REQUEST,
)
try:
transaction = _challenge_transaction(client_account, home_domain)
except ValueError as e:
return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST)
logger.info(f"Returning SEP-10 challenge for account {client_account}")
return Response(
{
"transaction": transaction,
"network_passphrase": settings.STELLAR_NETWORK,
},
status=status.HTTP_200_OK,
)
class SignChallengeTransaction(APIView):
def post(self, request: Request) -> Response:
with settings.HORIZON_SERVER as server:
transaction = request.data.get("challenge_transaction_xdr")
client_account_seed = request.data.get("client_account_seed")
signer_keypair = Keypair.from_secret(client_account_seed)
source = signer_keypair.public_key
# load source account. fail-safe if source account is not found
try:
server.load_account(source)
except exceptions.NotFoundError:
return Response(
{"error": "Client account not found."},
status=status.HTTP_404_NOT_FOUND,
)
# sign challenge transaction
try:
# Parse xdr obj to transaction envelope
envelope = helpers.parse_transaction_envelope_from_xdr(
transaction, settings.STELLAR_NETWORK
)
envelope.sign(signer_keypair)
logger.info(f"Signed SEP-10 challenge for client account {source}")
return Response(
{
"transaction": envelope.to_xdr(),
"network_passphrase": settings.STELLAR_NETWORK,
},
status=status.HTTP_200_OK,
)
except (exceptions.SignatureExistError, ValueError) as e:
logger.error("%s", str(e))
return Response(
{"error": str(e)}, status=status.HTTP_400_BAD_REQUEST
)
class GetJWTToken(APIView):
def post(self, request: Request) -> Response:
transaction = request.data.get("signed_challenge_transaction_xdr")
# Parse xdr obj to transaction envelope to validate
try:
helpers.parse_transaction_envelope_from_xdr(
transaction, settings.STELLAR_NETWORK
)
except ValueError as e:
logger.error("%s", str(e))
return Response(
{"error": str(e)}, status=status.HTTP_400_BAD_REQUEST
)
try:
jwt_token = _generate_jwt(transaction)
return Response(
{
"token": jwt_token,
},
status=status.HTTP_200_OK,
)
except Exception as e:
logger.error("%s", str(e))
return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST)
Finally, in the urls.py
file in the sep10
app folder add the following paths:
from sep10 import views
from django.urls import path
app_name = "sep10"
urlpatterns = [
path(
"challenge",
views.GetChallengeTransaction.as_view(),
name="sep10-challenge",
),
path(
"sign",
views.SignChallengeTransaction.as_view(),
name="sign-challenge",
),
path(
"token",
views.GetJWTToken.as_view(),
name="jwt-token",
),
]
add sep10
app URLs in project_name/urls.py
file:
from django.urls import include, re_path
urlpatterns = [
re_path(r"^v1/auth/", include("sep10.urls", namespace="v1:sep10")),
]
Response snippets:
GET auth/challenge
data: {
"transaction": "AAAAAgAAAAAOzp8SYe...",
"network_passphrase": "Test SDF Network ; September 2015"
},
POST auth/sign
data: {
"transaction": "AAAAAgAAAAAOzp8SYe...",
"network_passphrase": "Test SDF Network ; September 2015"
},
POST auth/token
data: {
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...."
},
Conclusion
The Stellar network boasts ecosystem proposals that depict engineering marvels and Stellar Web Authentication (SEP-0010) is one of them, you can find other ecosystem proposals in detail here. Thank you for reading to the end of this article. You can find a live implementation of the SEP-0010 protocol here
Top comments (0)