DEV Community

Cover image for Building a Custom DSL in Python, From Tokenizer to Interpreter
shayan holakouee
shayan holakouee

Posted on

Building a Custom DSL in Python, From Tokenizer to Interpreter

Most backend engineers reach for an existing query language or config format and call it done. That works until it doesn't. Maybe your team keeps writing thousand-line YAML files that nobody wants to touch. Maybe you need a rule engine where non-engineers can define logic without deploying code. Maybe you just want to understand what actually happens when Python parses 1 + 2 * 3.

Building a domain-specific language (DSL) forces you to confront parsing, evaluation, and language design in ways that make you a sharper engineer. This article walks through building a real one from scratch: a small expression language for defining data transformation pipelines.

No parser-combinator libraries. No ANTLR grammars. Just Python.


What We Are Building

Our DSL will let users write transformation rules like this:

FILTER age > 18
MAP name -> upper(name)
REDUCE count(*)
Enter fullscreen mode Exit fullscreen mode

By the end, you will have a working interpreter that can parse this syntax, build an AST, and evaluate it against a list of dictionaries. The concepts generalize directly to anything from config languages to query engines.


Step 1: The Lexer (Tokenizer)

The lexer's job is to turn raw text into a stream of typed tokens. Think of it as the part of the pipeline that stops caring about characters and starts caring about meaning.

from dataclasses import dataclass
from enum import Enum, auto
import re

class TokenType(Enum):
    KEYWORD    = auto()
    IDENTIFIER = auto()
    NUMBER     = auto()
    STRING     = auto()
    OPERATOR   = auto()
    ARROW      = auto()
    LPAREN     = auto()
    RPAREN     = auto()
    COMMA      = auto()
    NEWLINE    = auto()
    EOF        = auto()

@dataclass
class Token:
    type: TokenType
    value: str
    line: int

KEYWORDS = {"FILTER", "MAP", "REDUCE"}

TOKEN_PATTERNS = [
    (TokenType.ARROW,      r'->'),
    (TokenType.OPERATOR,   r'[><=!]+|and|or|not'),
    (TokenType.NUMBER,     r'\d+(\.\d+)?'),
    (TokenType.STRING,     r'"[^"]*"'),
    (TokenType.LPAREN,     r'\('),
    (TokenType.RPAREN,     r'\)'),
    (TokenType.COMMA,      r','),
    (TokenType.NEWLINE,    r'\n'),
    (TokenType.IDENTIFIER, r'[A-Za-z_][A-Za-z0-9_]*'),
]

class Lexer:
    def __init__(self, source: str):
        self.source = source
        self.pos = 0
        self.line = 1

    def tokenize(self) -> list[Token]:
        tokens = []
        while self.pos < len(self.source):
            if self.source[self.pos] in ' \t':
                self.pos += 1
                continue

            matched = False
            for token_type, pattern in TOKEN_PATTERNS:
                regex = re.compile(pattern)
                m = regex.match(self.source, self.pos)
                if m:
                    value = m.group(0)
                    if token_type == TokenType.IDENTIFIER and value in KEYWORDS:
                        token_type = TokenType.KEYWORD
                    if token_type == TokenType.NEWLINE:
                        self.line += 1
                    tokens.append(Token(token_type, value, self.line))
                    self.pos = m.end()
                    matched = True
                    break

            if not matched:
                raise SyntaxError(
                    f"Unexpected character {self.source[self.pos]!r} at line {self.line}"
                )

        tokens.append(Token(TokenType.EOF, '', self.line))
        return tokens
Enter fullscreen mode Exit fullscreen mode

A few things worth calling out here. The order of TOKEN_PATTERNS matters: -> has to appear before > or it will never match. The keyword check happens after identifier matching so we don't need a separate pattern for each keyword. Both are common gotchas.


Step 2: The AST Nodes

Before writing the parser, define the data structures it will produce. The Abstract Syntax Tree (AST) is the clean, structured representation of your program. Each node type corresponds to a concept in your language.

from dataclasses import dataclass, field
from typing import Any

@dataclass
class NumberLiteral:
    value: float

@dataclass
class StringLiteral:
    value: str

@dataclass
class Identifier:
    name: str

@dataclass
class BinaryOp:
    left: Any
    op: str
    right: Any

@dataclass
class FunctionCall:
    name: str
    args: list = field(default_factory=list)

@dataclass
class FilterStatement:
    condition: Any

@dataclass
class MapStatement:
    source: str
    target: Any

@dataclass
class ReduceStatement:
    aggregation: Any

@dataclass
class Program:
    statements: list = field(default_factory=list)
Enter fullscreen mode Exit fullscreen mode

Keep AST nodes as dumb data containers. Resist the urge to put evaluation logic inside them. That belongs in the interpreter, and keeping them separate makes the whole system easier to extend and test.


Step 3: The Parser

The parser consumes a flat list of tokens and produces the nested AST. We will use a recursive descent parser, which is the most approachable style and perfectly adequate for small languages.

class Parser:
    def __init__(self, tokens: list[Token]):
        self.tokens = tokens
        self.pos = 0

    def current(self) -> Token:
        return self.tokens[self.pos]

    def peek(self, offset=1) -> Token:
        idx = self.pos + offset
        return self.tokens[idx] if idx < len(self.tokens) else self.tokens[-1]

    def consume(self, expected_type: TokenType = None, expected_value: str = None) -> Token:
        token = self.current()
        if expected_type and token.type != expected_type:
            raise SyntaxError(
                f"Expected {expected_type}, got {token.type} ({token.value!r}) at line {token.line}"
            )
        if expected_value and token.value != expected_value:
            raise SyntaxError(
                f"Expected {expected_value!r}, got {token.value!r} at line {token.line}"
            )
        self.pos += 1
        return token

    def skip_newlines(self):
        while self.current().type == TokenType.NEWLINE:
            self.pos += 1

    def parse(self) -> Program:
        statements = []
        self.skip_newlines()
        while self.current().type != TokenType.EOF:
            stmt = self.parse_statement()
            statements.append(stmt)
            self.skip_newlines()
        return Program(statements)

    def parse_statement(self):
        token = self.current()
        if token.type != TokenType.KEYWORD:
            raise SyntaxError(f"Expected keyword at line {token.line}, got {token.value!r}")

        if token.value == "FILTER":
            return self.parse_filter()
        elif token.value == "MAP":
            return self.parse_map()
        elif token.value == "REDUCE":
            return self.parse_reduce()
        else:
            raise SyntaxError(f"Unknown keyword: {token.value}")

    def parse_filter(self) -> FilterStatement:
        self.consume(TokenType.KEYWORD, "FILTER")
        condition = self.parse_expression()
        return FilterStatement(condition)

    def parse_map(self) -> MapStatement:
        self.consume(TokenType.KEYWORD, "MAP")
        source = self.consume(TokenType.IDENTIFIER).value
        self.consume(TokenType.ARROW)
        target = self.parse_expression()
        return MapStatement(source, target)

    def parse_reduce(self) -> ReduceStatement:
        self.consume(TokenType.KEYWORD, "REDUCE")
        aggregation = self.parse_expression()
        return ReduceStatement(aggregation)

    def parse_expression(self):
        left = self.parse_primary()
        if self.current().type == TokenType.OPERATOR:
            op = self.consume(TokenType.OPERATOR).value
            right = self.parse_primary()
            return BinaryOp(left, op, right)
        return left

    def parse_primary(self):
        token = self.current()

        if token.type == TokenType.NUMBER:
            self.pos += 1
            return NumberLiteral(float(token.value))

        if token.type == TokenType.STRING:
            self.pos += 1
            return StringLiteral(token.value.strip('"'))

        if token.type == TokenType.IDENTIFIER:
            # Check if this is a function call
            if self.peek().type == TokenType.LPAREN:
                return self.parse_function_call()
            self.pos += 1
            return Identifier(token.value)

        raise SyntaxError(f"Unexpected token {token.value!r} at line {token.line}")

    def parse_function_call(self) -> FunctionCall:
        name = self.consume(TokenType.IDENTIFIER).value
        self.consume(TokenType.LPAREN)
        args = []
        while self.current().type != TokenType.RPAREN:
            args.append(self.parse_expression())
            if self.current().type == TokenType.COMMA:
                self.consume(TokenType.COMMA)
        self.consume(TokenType.RPAREN)
        return FunctionCall(name, args)
Enter fullscreen mode Exit fullscreen mode

Recursive descent parsers mirror the grammar almost 1:1. Each parse_* method corresponds to a grammar rule. Once you see that structure, the parser stops feeling like magic.


Step 4: The Interpreter

Now the fun part. The interpreter walks the AST and does real work. We will use the visitor pattern via singledispatch, which is a cleaner alternative to giant isinstance chains.

from functools import singledispatch
from typing import Callable

# Built-in functions available inside the DSL
BUILTINS: dict[str, Callable] = {
    "upper":  lambda s: s.upper(),
    "lower":  lambda s: s.lower(),
    "count":  lambda *args: len(args[0]) if args else 0,
    "length": lambda s: len(s),
    "str":    lambda x: str(x),
}

OPERATORS = {
    ">":  lambda a, b: a > b,
    "<":  lambda a, b: a < b,
    ">=": lambda a, b: a >= b,
    "<=": lambda a, b: a <= b,
    "==": lambda a, b: a == b,
    "!=": lambda a, b: a != b,
    "and": lambda a, b: a and b,
    "or":  lambda a, b: a or b,
}

class Interpreter:
    def __init__(self, builtins: dict = None):
        self.builtins = {**BUILTINS, **(builtins or {})}

    def eval_node(self, node, record: dict):
        """Dispatch to the right evaluation method based on node type."""
        method_name = f"eval_{type(node).__name__}"
        method = getattr(self, method_name, None)
        if method is None:
            raise RuntimeError(f"No evaluator for node type: {type(node).__name__}")
        return method(node, record)

    def eval_NumberLiteral(self, node: NumberLiteral, record: dict):
        return node.value

    def eval_StringLiteral(self, node: StringLiteral, record: dict):
        return node.value

    def eval_Identifier(self, node: Identifier, record: dict):
        if node.name not in record:
            raise NameError(f"Undefined field: {node.name!r}")
        return record[node.name]

    def eval_BinaryOp(self, node: BinaryOp, record: dict):
        left = self.eval_node(node.left, record)
        right = self.eval_node(node.right, record)
        op_fn = OPERATORS.get(node.op)
        if op_fn is None:
            raise RuntimeError(f"Unknown operator: {node.op!r}")
        return op_fn(left, right)

    def eval_FunctionCall(self, node: FunctionCall, record: dict):
        fn = self.builtins.get(node.name)
        if fn is None:
            raise NameError(f"Unknown function: {node.name!r}")
        args = [self.eval_node(arg, record) for arg in node.args]
        return fn(*args)

    def run(self, program: Program, data: list[dict]) -> list[dict]:
        result = list(data)
        for stmt in program.statements:
            result = self.execute_statement(stmt, result)
        return result

    def execute_statement(self, stmt, data: list[dict]) -> list[dict]:
        if isinstance(stmt, FilterStatement):
            return [
                record for record in data
                if self.eval_node(stmt.condition, record)
            ]

        if isinstance(stmt, MapStatement):
            output = []
            for record in data:
                new_record = dict(record)
                new_record[stmt.source] = self.eval_node(stmt.target, record)
                output.append(new_record)
            return output

        if isinstance(stmt, ReduceStatement):
            # For count(*), pass the whole dataset as a single record context
            count = self.eval_node(stmt.aggregation, {"*": data})
            return [{"result": count}]

        raise RuntimeError(f"Unknown statement type: {type(stmt).__name__}")
Enter fullscreen mode Exit fullscreen mode

Step 5: Wiring It All Together

def run_pipeline(source: str, data: list[dict]) -> list[dict]:
    lexer = Lexer(source)
    tokens = lexer.tokenize()

    parser = Parser(tokens)
    program = parser.parse()

    interpreter = Interpreter()
    return interpreter.run(program, data)
Enter fullscreen mode Exit fullscreen mode

Let's run it:

pipeline = """
FILTER age > 18
MAP name -> upper(name)
"""

records = [
    {"name": "alice", "age": 22},
    {"name": "bob", "age": 15},
    {"name": "carol", "age": 31},
]

result = run_pipeline(pipeline, records)
print(result)
# [{'name': 'ALICE', 'age': 22}, {'name': 'CAROL', 'age': 31}]
Enter fullscreen mode Exit fullscreen mode

It works.


Where To Go From Here

This implementation handles the core pipeline, but a production-grade DSL would need a few more things:

Operator precedence. Right now age > 18 and role == "admin" would fail because parse_expression only handles one operator. Pratt parsing (top-down operator precedence) is the standard fix and worth learning.

Better error messages. Error messages are the UX of a programming language. Include the line, the column, and a snippet of the source. Users will thank you.

A type system or validator. An AST visitor that checks field names against a schema before evaluation catches errors early and makes the DSL much safer to expose to end users.

Extensible functions. The builtins dict already accepts custom functions. You can expose a registration decorator to make it feel like a proper plugin system:

dsl = Interpreter()

@dsl.register_function("slugify")
def slugify(s: str) -> str:
    return s.lower().replace(" ", "-")
Enter fullscreen mode Exit fullscreen mode

Caching parsed programs. Parsing is the expensive part. Cache Program objects keyed by source hash so repeated runs of the same pipeline skip the lexer and parser entirely.


Why This Matters Beyond the Exercise

The pattern you just built -- tokenize, parse, evaluate -- shows up everywhere. Template engines, feature flag rule engines, database query builders, workflow automation tools. Once you have built one end to end, you start recognizing the same skeleton in every tool that lets users express logic without writing code.

The implementation here clocks in at around 250 lines. That is small enough to own completely, test thoroughly, and extend confidently. For a lot of internal tooling, that is exactly the right size.


Full Source

The complete working code is available as a single file. Clone it, break it on purpose, and add a feature. That is the best way to actually internalize what is happening.

https://gist.github.com/your-handle/python-dsl-from-scratch
Enter fullscreen mode Exit fullscreen mode

If you end up extending this with a type checker or a Pratt parser, drop it in the comments. Curious what people build on top of it.

Top comments (0)