DEV Community

Aman Vaths
Aman Vaths

Posted on

Mempool Monitoring for MEV Bots: Technical Implementation Guide

The blockchain mempool represents a constantly churning pool of unconfirmed transactions waiting for block inclusion. For MEV (Maximal Extractable Value) bots, the mempool is a goldmine of opportunity — every pending swap, liquidation, and protocol interaction creates potential profit for those fast enough to exploit it.

This technical guide provides a comprehensive implementation roadmap for mempool monitoring systems powering MEV extraction. We'll cover everything from node configuration and WebSocket subscriptions through transaction parsing and opportunity classification to bundle construction and submission via Flashbots.

Whether you're building arbitrage bots, liquidation systems, or exploring MEV bot development including sandwich strategies, understanding mempool monitoring fundamentals is essential for profitable operation.


Table of Contents

  1. Understanding MEV and Mempool Dynamics
  2. Node Infrastructure and Configuration
  3. Mempool Subscription and Transaction Streaming
  4. Transaction Decoding and Analysis
  5. Opportunity Detection Algorithms
  6. Gas Optimization and Priority Fees
  7. Flashbots and Private Transaction Pools
  8. Transaction Simulation and State Prediction
  9. Liquidation Bot Implementation
  10. Sandwich Attack Mechanics
  11. Advanced Bundle Construction Strategies
  12. Multi-Chain MEV Strategies
  13. MEV Risk Management
  14. Common Implementation Mistakes

Understanding MEV and Mempool Dynamics

Maximal Extractable Value (MEV) represents profit available to block producers through including, excluding, or reordering transactions within blocks. Originally called "Miner Extractable Value" in proof-of-work systems, the concept extends to validators in proof-of-stake networks.

MEV arises from the gap between transaction broadcast and block confirmation. During this window, pending transactions reveal profitable opportunities to observers who can act before confirmation. A large swap on Uniswap will move the price; knowing this in advance enables front-running (trading before) or back-running (trading after) for profit. A position approaching liquidation threshold will trigger a liquidation call; monitoring the victim's transactions and protocol state enables capturing the liquidation bounty.

MEV Types Overview

Type Description Competition Avg Profit
DEX Arbitrage Price discrepancies across exchanges after pending swaps Very High $50-500
Liquidations Under-collateralized lending positions in Aave, Compound High $100-10K
Sandwich Attacks Front-run and back-run large swaps with high slippage High $20-200
JIT Liquidity Provide concentrated liquidity just before large swaps Medium $10-100

Detailed MEV Categories

DEX Arbitrage is the most common MEV type. When a pending transaction will move the price on one DEX, check if that creates arbitrage against other exchanges like SushiSwap, Curve, or Balancer. These opportunities require fast multi-DEX price monitoring and optimal path calculation.

Liquidations involve monitoring lending protocols for positions with health factors approaching 1.0. Protocols like Aave and Compound offer 5-15% liquidation bonuses. Success requires predicting when price movements will trigger liquidation and positioning your transaction optimally.

Sandwich Attacks extract value by placing your transaction before and after a victim's swap. The front-run moves the price against them, they execute at a worse rate, and your back-run captures the difference. While controversial, this remains a significant MEV source.

Just-In-Time (JIT) Liquidity involves providing concentrated liquidity immediately before a large swap, capturing the fees, then withdrawing. This requires precise timing and position management in protocols like Uniswap V3.

MEV Extraction Flow

Transaction Broadcast → Mempool Detection → Opportunity Analysis → Bundle Construction → Block Inclusion
Enter fullscreen mode Exit fullscreen mode

The mempool (memory pool) is a waiting area where unconfirmed transactions reside after broadcast until miners or validators include them in blocks. Each node maintains its own mempool view, and transactions propagate through the P2P network with slight timing differences between nodes. This propagation delay creates opportunities — a bot connected to well-positioned nodes sees transactions milliseconds before others, enabling first-mover advantage in capturing MEV.

The Economics of MEV

Understanding MEV economics helps set realistic expectations. In the current MEV-Boost ecosystem:

  • Searchers typically retain 10-50% of extracted value
  • Remaining value goes to builders and validators as tips
  • Highly competitive opportunities see margins compress
  • Less competitive niches maintain better margins but offer fewer opportunities

Infrastructure costs form the baseline for profitability calculations:

  • Node infrastructure: $500-2,000/month
  • Data feeds: $100-500/month
  • RPC services: $50-300/month
  • Failed transaction gas: Variable

Calculate break-even extraction levels before committing to infrastructure investment.


Node Infrastructure and Configuration

Competitive MEV extraction begins with infrastructure. Running your own Ethereum node provides:

  • Complete mempool visibility — See all pending transactions without filtering
  • Lowest latency — No round-trip to external services
  • Independence — No reliance on third-party service limitations
  • Customization — Configure specifically for MEV workloads

Public RPC endpoints filter mempool data, add latency, and may be unreliable during high-activity periods. For serious MEV operations, dedicated node infrastructure is non-negotiable.

Client Selection

Geth (Go Ethereum) is the most common client for MEV operations due to its robust txpool implementation and extensive RPC support. Configure Geth with txpool options that maximize mempool capacity and adjust peer connections for optimal transaction propagation.

Erigon offers advantages for archive data access but has different mempool characteristics. Some searchers run both clients for redundancy and different perspectives on the mempool.

Consensus layer clients (Lighthouse, Prysm, Teku, Nimbus) handle proof-of-stake duties while execution clients manage the mempool. Choose based on stability and resource requirements.

Geth Configuration for MEV

geth \
  --http \
  --http.api eth,net,web3,txpool,debug \
  --http.addr 0.0.0.0 \
  --http.port 8545 \
  --ws \
  --ws.api eth,net,web3,txpool \
  --ws.addr 0.0.0.0 \
  --ws.port 8546 \
  --txpool.globalslots 50000 \
  --txpool.globalqueue 10000 \
  --txpool.accountslots 512 \
  --txpool.accountqueue 256 \
  --txpool.lifetime 3h \
  --maxpeers 100 \
  --cache 8192 \
  --syncmode snap \
  --datadir /data/geth
Enter fullscreen mode Exit fullscreen mode

Hardware Requirements

Component Minimum Recommended Impact
CPU 8 cores 16+ cores Parallel tx processing
RAM 32 GB 64+ GB State caching, mempool size
Storage 2 TB NVMe 4+ TB NVMe State access speed
Network 1 Gbps 10 Gbps Tx propagation latency

Mempool Subscription and Transaction Streaming

Mempool monitoring begins with subscribing to pending transaction streams via WebSocket. Ethereum nodes expose pending transactions through the eth_subscribe method with "newPendingTransactions" parameter. This provides a real-time stream of transaction hashes as they enter the node's mempool.

Subscription Methods

WebSocket Subscriptions provide real-time transaction hashes with minimal latency. This is the primary method for production MEV systems.

txpool_content RPC provides complete mempool snapshot including all pending and queued transactions with full details. While useful for initial state and periodic synchronization, polling adds latency compared to streaming.

Alternative Data Sources like BloxRoute, Chainbound, and Blocknative provide aggregated mempool streams from globally distributed nodes. These can surface transactions milliseconds before they reach your local node through peer propagation.

Optimal Implementation

The best implementations combine:

  1. WebSocket subscriptions for real-time transaction hashes
  2. Batched eth_getTransactionByHash calls to retrieve full data
  3. Premium mempool feeds for additional coverage
  4. Multiple geographically distributed nodes

Basic Mempool Monitor

import asyncio
import json
import websockets
from web3 import Web3

class MempoolMonitor:
    def __init__(self, ws_url, http_url):
        self.ws_url = ws_url
        self.w3 = Web3(Web3.HTTPProvider(http_url))
        self.pending_txs = {}
        self.callbacks = []

    async def subscribe_pending_transactions(self):
        """Subscribe to pending transactions via WebSocket"""
        async with websockets.connect(self.ws_url) as ws:
            # Subscribe to pending transactions
            subscribe_msg = {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "eth_subscribe",
                "params": ["newPendingTransactions"]
            }
            await ws.send(json.dumps(subscribe_msg))

            # Handle subscription confirmation
            response = await ws.recv()
            subscription_id = json.loads(response)["result"]
            print(f"Subscribed with ID: {subscription_id}")

            # Process incoming transactions
            async for message in ws:
                data = json.loads(message)
                if "params" in data:
                    tx_hash = data["params"]["result"]
                    await self._process_transaction(tx_hash)

    async def _process_transaction(self, tx_hash):
        """Fetch and process transaction details"""
        try:
            tx = self.w3.eth.get_transaction(tx_hash)
            if tx:
                tx_data = {
                    'hash': tx_hash,
                    'from': tx['from'],
                    'to': tx['to'],
                    'value': tx['value'],
                    'input': tx['input'],
                    'gas': tx['gas'],
                    'gasPrice': tx.get('gasPrice'),
                    'maxFeePerGas': tx.get('maxFeePerGas'),
                    'maxPriorityFeePerGas': tx.get('maxPriorityFeePerGas'),
                    'nonce': tx['nonce']
                }

                # Notify all registered callbacks
                for callback in self.callbacks:
                    await callback(tx_data)

        except Exception as e:
            pass  # Transaction may have been mined already

    def register_callback(self, callback):
        """Register callback for new transactions"""
        self.callbacks.append(callback)


# Usage
monitor = MempoolMonitor(
    ws_url="ws://localhost:8546",
    http_url="http://localhost:8545"
)

async def on_transaction(tx):
    print(f"New pending tx: {tx['hash']}")

monitor.register_callback(on_transaction)
asyncio.run(monitor.subscribe_pending_transactions())
Enter fullscreen mode Exit fullscreen mode

Transaction Decoding and Analysis

Raw transaction data contains encoded function calls that must be decoded to understand transaction intent. The input field contains the function selector (first 4 bytes, derived from the function signature hash) followed by ABI-encoded parameters.

Understanding Transaction Structure

Every Ethereum transaction contains:

  • from — Sender address
  • to — Recipient contract address
  • value — ETH amount sent
  • input — Encoded function call data
  • gas — Gas limit
  • gasPrice/maxFeePerGas — Gas pricing
  • nonce — Transaction count

The input field structure:

0x38ed1739 + [encoded parameters]
   ↑
   Function selector (4 bytes = keccak256("swapExactTokensForTokens(uint256,uint256,address[],address,uint256)")[:4])
Enter fullscreen mode Exit fullscreen mode

Building an ABI Database

Building a comprehensive ABI database enables decoding transactions to known DeFi protocols:

Standard Interfaces:

  • ERC-20 (transfer, approve, transferFrom)
  • Uniswap V2 Router (swap functions)
  • Uniswap V3 Router (exactInput, exactOutput)
  • Aave Pool (supply, borrow, repay, liquidationCall)
  • Compound (mint, redeem, borrow, repay)
  • Curve (exchange, exchange_underlying)

Store these in a lookup structure keyed by function selector for O(1) decoding. For unknown contracts, Etherscan API or 4byte.directory can provide function signatures.

What Decoded Transactions Reveal

Swap Transactions reveal:

  • Token paths (what's being traded)
  • Amounts (input and minimum output)
  • Slippage tolerance (amountOutMin vs expected)
  • Deadline (time-sensitivity)

Lending Operations reveal:

  • Asset being deposited/borrowed/repaid
  • Amount
  • User position being affected

Liquidation Calls reveal:

  • User being liquidated
  • Collateral and debt assets
  • Amount to cover

Transaction Decoder Implementation

from web3 import Web3
from eth_abi import decode

class TransactionDecoder:
    # Common DEX function selectors
    SELECTORS = {
        '0x38ed1739': {'name': 'swapExactTokensForTokens', 'protocol': 'uniswap_v2'},
        '0x8803dbee': {'name': 'swapTokensForExactTokens', 'protocol': 'uniswap_v2'},
        '0x7ff36ab5': {'name': 'swapExactETHForTokens', 'protocol': 'uniswap_v2'},
        '0x18cbafe5': {'name': 'swapExactTokensForETH', 'protocol': 'uniswap_v2'},
        '0xc04b8d59': {'name': 'exactInput', 'protocol': 'uniswap_v3'},
        '0xdb3e2198': {'name': 'exactOutputSingle', 'protocol': 'uniswap_v3'},
    }

    def decode_transaction(self, tx_data):
        """Decode transaction input data"""
        input_data = tx_data['input']

        if len(input_data) < 10:
            return None  # No function call data

        # Extract function selector (first 4 bytes)
        selector = input_data[:10]
        params_data = input_data[10:]

        if selector in self.SELECTORS:
            func_info = self.SELECTORS[selector]
            decoded = self._decode_params(func_info['name'], params_data)

            return {
                'selector': selector,
                'function': func_info['name'],
                'protocol': func_info['protocol'],
                'params': decoded,
                'to': tx_data['to'],
                'value': tx_data['value'],
                'gas_price': tx_data.get('gasPrice') or tx_data.get('maxFeePerGas')
            }

        return None

    def _decode_params(self, func_name, params_hex):
        """Decode function parameters based on known signatures"""
        params_bytes = bytes.fromhex(params_hex)

        if func_name == 'swapExactTokensForTokens':
            # (uint256 amountIn, uint256 amountOutMin, address[] path, address to, uint256 deadline)
            try:
                decoded = decode(
                    ['uint256', 'uint256', 'address[]', 'address', 'uint256'],
                    params_bytes
                )
                return {
                    'amountIn': decoded[0],
                    'amountOutMin': decoded[1],
                    'path': decoded[2],
                    'recipient': decoded[3],
                    'deadline': decoded[4]
                }
            except:
                return None

        return None

    def is_swap_transaction(self, decoded):
        """Check if transaction is a DEX swap"""
        if not decoded:
            return False
        swap_functions = ['swap', 'exactInput', 'exactOutput']
        return any(f in decoded['function'].lower() for f in swap_functions)
Enter fullscreen mode Exit fullscreen mode

Opportunity Detection Algorithms

Opportunity detection transforms decoded transactions into actionable MEV opportunities. Each type requires specialized logic evaluating profitability after gas costs.

Detection Pipeline Architecture

┌─────────────┐     ┌──────────────┐     ┌─────────────┐     ┌──────────────┐
│  Mempool    │ →   │  Decoder     │ →   │  Classifier │ →   │  Profitability│
│  Stream     │     │  (ABI Parse) │     │  (Type ID)  │     │  Calculator   │
└─────────────┘     └──────────────┘     └─────────────┘     └──────────────┘
                                                                    ↓
┌─────────────┐     ┌──────────────┐     ┌─────────────┐     ┌──────────────┐
│  Execute    │ ←   │  Bundle      │ ←   │  Simulation │ ←   │  Filter      │
│  (Flashbots)│     │  Builder     │     │  (Verify)   │     │  (Min Profit)│
└─────────────┘     └──────────────┘     └─────────────┘     └──────────────┘
Enter fullscreen mode Exit fullscreen mode

Arbitrage Detection Deep Dive

When a pending swap will move the price on DEX A, check other venues for arbitrage:

class ArbitrageScanner:
    def __init__(self):
        self.dexes = {
            'uniswap_v2': UniswapV2Client(),
            'uniswap_v3': UniswapV3Client(),
            'sushiswap': SushiswapClient(),
            'curve': CurveClient(),
            'balancer': BalancerClient(),
        }

    async def find_arbitrage(self, pending_swap):
        """Find cross-DEX arbitrage from pending swap"""
        token_in = pending_swap['path'][0]
        token_out = pending_swap['path'][-1]
        amount = pending_swap['amountIn']

        # Calculate post-swap price on source DEX
        source_dex = self.identify_dex(pending_swap['to'])
        impact = await self.dexes[source_dex].simulate_swap(
            token_in, token_out, amount
        )

        # Check all other DEXes
        opportunities = []
        for name, dex in self.dexes.items():
            if name == source_dex:
                continue

            # Get current price on this DEX
            price = await dex.get_price(token_in, token_out)

            # Calculate spread
            spread = abs(impact.new_price - price) / price

            if spread > 0.001:  # 0.1% minimum spread
                # Calculate optimal arb size
                optimal = await self.optimize_arb_size(
                    source_dex, name, 
                    token_in, token_out,
                    impact.new_price, price
                )

                if optimal['profit'] > self.min_profit:
                    opportunities.append({
                        'buy_dex': name if price < impact.new_price else source_dex,
                        'sell_dex': source_dex if price < impact.new_price else name,
                        'amount': optimal['amount'],
                        'expected_profit': optimal['profit']
                    })

        return opportunities
Enter fullscreen mode Exit fullscreen mode

Multi-Hop Arbitrage

Sometimes profitable paths span multiple DEXes:

ETH → USDC on Uniswap → USDC → WBTC on Curve → WBTC → ETH on Sushi
Enter fullscreen mode Exit fullscreen mode
async def find_multi_hop_arb(self, token, max_hops=3):
    """Find profitable multi-hop cycles starting and ending with token"""
    paths = self.generate_paths(token, max_hops)

    profitable = []
    for path in paths:
        profit = await self.simulate_path(path)

        if profit > self.min_profit:
            profitable.append({
                'path': path,
                'profit': profit
            })

    return sorted(profitable, key=lambda x: x['profit'], reverse=True)
Enter fullscreen mode Exit fullscreen mode

Real-Time Price Feeds

Maintain current prices for fast opportunity detection:

class PriceCache:
    def __init__(self):
        self.prices = {}
        self.last_updated = {}

    async def update_on_block(self, block_number):
        """Update all prices when new block arrives"""
        for pair in self.tracked_pairs:
            self.prices[pair] = await self.fetch_price(pair)
            self.last_updated[pair] = block_number

    async def update_on_swap(self, swap_tx):
        """Update affected pair prices after swap"""
        pair = (swap_tx['token_in'], swap_tx['token_out'])

        # Simulate swap effect
        new_price = self.calculate_post_swap_price(
            self.prices[pair],
            swap_tx['amount'],
            self.reserves[pair]
        )

        self.prices[pair] = new_price

    def get_price(self, token_a, token_b):
        """Get cached price instantly"""
        return self.prices.get((token_a, token_b))
Enter fullscreen mode Exit fullscreen mode

DEX Arbitrage Detection

class ArbitrageDetector:
    def __init__(self, dex_clients, min_profit_wei):
        self.dexes = dex_clients  # Dict of DEX price fetchers
        self.min_profit = min_profit_wei

    async def detect_arbitrage(self, pending_swap):
        """Detect arbitrage opportunity from pending swap"""
        token_in = pending_swap['params']['path'][0]
        token_out = pending_swap['params']['path'][-1]
        amount_in = pending_swap['params']['amountIn']

        # Calculate price impact on source DEX
        source_dex = self._identify_dex(pending_swap['to'])
        post_swap_price = await self._simulate_price_impact(
            source_dex, token_in, token_out, amount_in
        )

        # Check prices on other DEXes
        opportunities = []
        for dex_name, dex_client in self.dexes.items():
            if dex_name == source_dex:
                continue

            current_price = await dex_client.get_price(token_in, token_out)

            # Calculate arbitrage profit
            price_diff = abs(post_swap_price - current_price) / current_price

            if price_diff > 0.001:  # 0.1% minimum
                profit = await self._calculate_optimal_arb(
                    source_dex, dex_name, token_in, token_out,
                    post_swap_price, current_price
                )

                if profit > self.min_profit:
                    opportunities.append({
                        'type': 'arbitrage',
                        'buy_dex': dex_name if current_price < post_swap_price else source_dex,
                        'sell_dex': source_dex if current_price < post_swap_price else dex_name,
                        'token_pair': (token_in, token_out),
                        'expected_profit': profit,
                        'trigger_tx': pending_swap['hash']
                    })

        return opportunities

    async def _calculate_optimal_arb(self, buy_dex, sell_dex, token_in, token_out, 
                                      buy_price, sell_price):
        """Calculate optimal arbitrage amount and profit"""
        # Binary search for optimal amount
        low, high = 0, 10 ** 20  # Search space
        best_profit = 0

        for _ in range(50):  # Binary search iterations
            mid = (low + high) // 2

            # Simulate buy and sell
            bought = await self.dexes[buy_dex].get_amount_out(mid, token_in, token_out)
            sold = await self.dexes[sell_dex].get_amount_out(bought, token_out, token_in)

            profit = sold - mid

            if profit > best_profit:
                best_profit = profit
                low = mid
            else:
                high = mid

        return best_profit
Enter fullscreen mode Exit fullscreen mode

Gas Optimization and Priority Fees

Gas strategy determines MEV profitability and execution priority. Post-EIP-1559, transactions include:

  • Base Fee — Burned, algorithmically determined by network congestion
  • Priority Fee (Tip) — Paid to validators, determines inclusion priority
  • Max Fee — Maximum total gas price you're willing to pay

Higher priority fees increase inclusion likelihood and position within blocks. For MEV, transaction ordering within the block matters — being one position too late means another bot captures the opportunity.

EIP-1559 Base Fee Dynamics

Base fee adjusts by up to 12.5% per block based on gas usage:

  • If block was >50% full → base fee increases
  • If block was <50% full → base fee decreases

This creates predictable short-term base fees, enabling better gas estimation.

Priority Fee Strategy by Opportunity Type

Urgency % of Profit to Gas Use Case
Low 10% Non-time-sensitive arbitrage
Normal 30% Standard opportunities
High 50% Competitive opportunities
Critical 80% High-value, time-sensitive

Gas Estimation Best Practices

  1. Simulate locally — Use eth_estimateGas with state overrides
  2. Add 10% buffer — Account for on-chain variance
  3. Consider pending txs — State may differ at execution
  4. Track historical accuracy — Calibrate your estimations

Dynamic Gas Pricing

class GasOptimizer:
    def __init__(self, w3, max_priority_fee_gwei=50):
        self.w3 = w3
        self.max_priority_fee = max_priority_fee_gwei * 10**9
        self.recent_base_fees = []
        self.recent_priority_fees = []

    async def get_optimal_gas_params(self, opportunity_profit, urgency='normal'):
        """Calculate optimal gas parameters for opportunity"""
        # Get current base fee
        latest_block = self.w3.eth.get_block('latest')
        base_fee = latest_block['baseFeePerGas']

        # Predict next block base fee
        gas_used_ratio = latest_block['gasUsed'] / latest_block['gasLimit']
        if gas_used_ratio > 0.5:
            predicted_base_fee = int(base_fee * (1 + 0.125 * (gas_used_ratio - 0.5) / 0.5))
        else:
            predicted_base_fee = int(base_fee * (1 - 0.125 * (0.5 - gas_used_ratio) / 0.5))

        # Calculate priority fee based on urgency and profit
        priority_fee = self._calculate_priority_fee(
            opportunity_profit, urgency, predicted_base_fee
        )

        # Set max fee with buffer
        max_fee = predicted_base_fee + priority_fee + (base_fee // 10)

        return {
            'maxFeePerGas': max_fee,
            'maxPriorityFeePerGas': priority_fee,
            'estimated_base_fee': predicted_base_fee
        }

    def _calculate_priority_fee(self, profit, urgency, base_fee):
        """Calculate priority fee based on profit and urgency"""
        urgency_multipliers = {
            'low': 0.1,       # 10% of profit to gas
            'normal': 0.3,   # 30% of profit to gas
            'high': 0.5,     # 50% of profit to gas
            'critical': 0.8  # 80% of profit to gas
        }

        gas_budget = profit * urgency_multipliers.get(urgency, 0.3)
        estimated_gas = 200000  # Typical MEV tx

        base_cost = base_fee * estimated_gas
        remaining_budget = gas_budget - base_cost

        if remaining_budget <= 0:
            return 0

        priority_fee = remaining_budget // estimated_gas
        return min(priority_fee, self.max_priority_fee)
Enter fullscreen mode Exit fullscreen mode

Key Gas Metrics:

Metric Value Description
Gas Budget 30-50% Typical profit allocation to gas
Gas Units ~200K Typical MEV transaction
Max Change 12.5% Base fee change per block
Safety Buffer 10% Gas estimation margin

Flashbots and Private Transaction Pools

Flashbots revolutionized MEV by providing a private transaction pool that bypasses the public mempool. Traditional MEV transactions broadcast to the public mempool are visible to everyone, enabling competitors to front-run your opportunity.

How Flashbots Works

  1. Bundle Creation — Package your MEV transactions together
  2. Direct Submission — Send to Flashbots relay, not public mempool
  3. Simulation — Relay simulates bundle for validity
  4. Block Building — Builder includes bundle in block proposal
  5. Atomic Execution — Bundle executes completely or not at all

Key Flashbots Concepts

Bundles are ordered lists of transactions that execute atomically:

bundle = [
  { signed_transaction: victim_tx_bytes },  // Target tx to follow
  { signer: my_account, transaction: arb_tx } // Our MEV tx
]
Enter fullscreen mode Exit fullscreen mode

Coinbase Transfer — Payment to block builder for inclusion:

block.coinbase.transfer(tip_amount);
Enter fullscreen mode Exit fullscreen mode

Target Block — Bundles specify which block they're valid for. If not included, they expire without cost.

Post-Merge Architecture

After Ethereum's merge, MEV-Boost introduced proposer-builder separation:

Searchers → Block Builders → MEV-Boost Relay → Validators
Enter fullscreen mode Exit fullscreen mode
  • Searchers submit bundles to builders
  • Builders aggregate bundles into optimal blocks
  • Relays facilitate trust between builders and validators
  • Validators select highest-value block

Major Block Builders

Builder Market Share Notes
beaverbuild ~25% Aggressive optimization
Flashbots ~20% Original MEV infrastructure
rsync ~15% Low latency, geographic diversity
builder0x69 ~10% Consistent, fair pricing

Pro tip: Submit to multiple builders simultaneously for higher inclusion rates.

Flashbots Bundle Submission

from flashbots import flashbot
from eth_account import Account
from web3 import Web3

class FlashbotsSubmitter:
    def __init__(self, w3, private_key, flashbots_key):
        self.w3 = w3
        self.account = Account.from_key(private_key)
        self.flashbots_account = Account.from_key(flashbots_key)

        # Initialize Flashbots provider
        flashbot(w3, self.flashbots_account)

    async def submit_bundle(self, transactions, target_block, tip_percentage=0.9):
        """Submit bundle to Flashbots"""
        # Build bundle
        bundle = []
        for tx in transactions:
            if 'signed' in tx:
                # Pre-signed transaction (e.g., victim tx)
                bundle.append({'signed_transaction': tx['signed']})
            else:
                # Our transaction to sign
                bundle.append({
                    'signer': self.account,
                    'transaction': tx
                })

        # Simulate bundle
        simulation = self.w3.flashbots.simulate(bundle, target_block)

        if 'error' in simulation:
            return {'success': False, 'error': simulation['error']}

        # Calculate profit and tip
        coinbase_diff = simulation['coinbaseDiff']
        gas_used = simulation['totalGasUsed']

        # Send bundle
        result = self.w3.flashbots.send_bundle(
            bundle,
            target_block_number=target_block
        )

        # Wait for inclusion
        result.wait()

        try:
            receipts = result.receipts()
            return {
                'success': True,
                'block': target_block,
                'receipts': receipts,
                'profit': coinbase_diff
            }
        except:
            return {'success': False, 'error': 'Bundle not included'}

    async def submit_to_multiple_builders(self, bundle, target_block):
        """Submit to multiple block builders"""
        builders = [
            'https://relay.flashbots.net',
            'https://builder0x69.io',
            'https://rpc.beaverbuild.org',
            'https://rsync-builder.xyz'
        ]

        results = []
        for builder in builders:
            try:
                result = await self._submit_to_builder(bundle, target_block, builder)
                results.append(result)
            except:
                continue

        return results
Enter fullscreen mode Exit fullscreen mode

Flashbots Benefits

No Failed Transaction Costs — Bundles execute atomically or not at all

Front-running Protection — Transactions hidden until block inclusion

Precise Ordering — Exact transaction ordering within bundles


Transaction Simulation and State Prediction

Accurate simulation predicts transaction outcomes before execution. This is essential for profitability calculation and avoiding failed transactions.

Why Simulation Matters

Without simulation, you're flying blind:

  • Profit miscalculation — Actual profit may differ from estimated
  • Failed transactions — State may have changed since detection
  • Wasted gas — Failed MEV transactions still cost gas in public mempool

Simulation Methods

Method 1: eth_call with State Overrides

async def simulate_with_overrides(self, tx, state_changes):
    """Simulate transaction with modified state"""
    result = self.w3.eth.call(
        {
            'from': tx['from'],
            'to': tx['to'],
            'data': tx['data'],
            'value': tx.get('value', 0)
        },
        'latest',
        state_changes  # Override specific storage slots
    )

    return self.decode_result(result)
Enter fullscreen mode Exit fullscreen mode

Method 2: debug_traceCall

async def trace_transaction(self, tx):
    """Get detailed execution trace"""
    trace = self.w3.manager.request_blocking(
        'debug_traceCall',
        [
            {
                'from': tx['from'],
                'to': tx['to'],
                'data': tx['data'],
                'gas': hex(tx['gas']),
                'value': hex(tx.get('value', 0))
            },
            'latest',
            {'tracer': 'callTracer'}  # Or 'prestateTracer'
        ]
    )

    return trace
Enter fullscreen mode Exit fullscreen mode

Method 3: Local Fork Simulation

from anvil import Anvil

class ForkSimulator:
    def __init__(self, fork_url):
        self.anvil = Anvil(fork_url)

    async def simulate_sequence(self, transactions):
        """Simulate multiple transactions in sequence"""
        results = []

        for tx in transactions:
            # Execute on fork
            receipt = await self.anvil.send_transaction(tx)

            results.append({
                'success': receipt['status'] == 1,
                'gas_used': receipt['gasUsed'],
                'logs': receipt['logs']
            })

        return results

    async def reset(self):
        """Reset fork to original state"""
        await self.anvil.reset()
Enter fullscreen mode Exit fullscreen mode

Simulating Pending Transaction Effects

The key insight: your transaction executes AFTER pending transactions, not at current state.

class StatePrediction:
    def __init__(self):
        self.pending_effects = {}

    async def predict_state(self, pending_txs, my_tx):
        """Predict state after pending txs, then simulate my tx"""

        # Step 1: Simulate each pending tx
        for pending in pending_txs:
            effects = await self.simulate_single(pending)
            self.apply_effects(effects)

        # Step 2: Now simulate my tx against predicted state
        result = await self.simulate_with_state(my_tx, self.pending_effects)

        return result

    def apply_effects(self, effects):
        """Apply transaction effects to predicted state"""
        for address, storage in effects.items():
            if address not in self.pending_effects:
                self.pending_effects[address] = {}

            self.pending_effects[address].update(storage)
Enter fullscreen mode Exit fullscreen mode

Simulation Accuracy Optimization

class AccuracyTracker:
    def __init__(self):
        self.predictions = []

    def record_prediction(self, predicted_profit, actual_profit):
        self.predictions.append({
            'predicted': predicted_profit,
            'actual': actual_profit,
            'accuracy': actual_profit / predicted_profit if predicted_profit > 0 else 0
        })

    def get_accuracy_stats(self):
        accuracies = [p['accuracy'] for p in self.predictions]

        return {
            'mean_accuracy': sum(accuracies) / len(accuracies),
            'min_accuracy': min(accuracies),
            'max_accuracy': max(accuracies),
            'within_10pct': len([a for a in accuracies if 0.9 < a < 1.1]) / len(accuracies)
        }
Enter fullscreen mode Exit fullscreen mode

Target: >90% of predictions within 10% of actual profit.

State Simulation Implementation

class StateSimulator:
    def __init__(self, w3, fork_block='latest'):
        self.w3 = w3
        self.fork_block = fork_block
        self.state_overrides = {}

    async def simulate_transaction(self, tx, pending_txs=None):
        """Simulate transaction with optional pending transaction effects"""
        # Build state overrides from pending transactions
        if pending_txs:
            for pending in pending_txs:
                state_changes = await self._simulate_single(pending)
                self._apply_state_changes(state_changes)

        # Simulate our transaction with modified state
        result = await self._simulate_with_overrides(tx)
        return result

    async def _simulate_single(self, tx):
        """Simulate single transaction and return state changes"""
        try:
            trace = self.w3.manager.request_blocking(
                'debug_traceCall',
                [{
                    'from': tx['from'],
                    'to': tx['to'],
                    'data': tx['input'],
                    'value': hex(tx.get('value', 0)),
                    'gas': hex(tx['gas'])
                }, 'latest', {'tracer': 'prestateTracer'}]
            )
            return self._parse_trace(trace)
        except:
            return {}

    async def simulate_arbitrage_profit(self, buy_tx, sell_tx, pending_txs):
        """Simulate complete arbitrage and calculate profit"""
        # Apply pending transactions
        for tx in pending_txs:
            await self._simulate_single(tx)

        # Simulate buy
        buy_result = await self._simulate_with_overrides(buy_tx)
        if not buy_result['success']:
            return {'profitable': False, 'reason': 'buy_failed'}

        # Apply buy state changes
        self._apply_state_changes(buy_result.get('state_changes', {}))

        # Simulate sell
        sell_result = await self._simulate_with_overrides(sell_tx)
        if not sell_result['success']:
            return {'profitable': False, 'reason': 'sell_failed'}

        # Calculate profit
        total_gas = buy_result['gas_used'] + sell_result['gas_used']
        profit = self._calculate_profit(buy_result, sell_result, total_gas)

        return {
            'profitable': profit > 0,
            'profit': profit,
            'gas_cost': total_gas
        }
Enter fullscreen mode Exit fullscreen mode

Liquidation Bot Implementation

Liquidation bots monitor lending protocols for under-collateralized positions. Major protocols offer 5-15% liquidation incentives.

Understanding Liquidations

Health Factor Formula:

Health Factor = (Collateral Value × Liquidation Threshold) / Borrow Value

If Health Factor < 1.0 → Position is liquidatable
Enter fullscreen mode Exit fullscreen mode

Example:

  • Collateral: 1 ETH @ $2,000 = $2,000
  • Liquidation Threshold: 80%
  • Borrow: 1,500 USDC
  • Health Factor: ($2,000 × 0.80) / $1,500 = 1.07

If ETH drops to $1,800:

  • Health Factor: ($1,800 × 0.80) / $1,500 = 0.96 → Liquidatable!

Position Monitoring Strategy

class PositionMonitor:
    def __init__(self, protocol):
        self.protocol = protocol
        self.positions = {}
        self.price_feeds = {}

    async def index_positions(self):
        """Index all positions from protocol events"""
        # Get all borrow events
        borrows = await self.protocol.get_events('Borrow')

        for borrow in borrows:
            user = borrow['user']

            # Get current position
            position = await self.protocol.get_position(user)

            if position['debt'] > 0:
                self.positions[user] = {
                    'collateral': position['collateral'],
                    'debt': position['debt'],
                    'collateral_asset': position['collateral_asset'],
                    'debt_asset': position['debt_asset'],
                    'health_factor': position['health_factor']
                }

    async def find_liquidatable(self, price_change_threshold=0.05):
        """Find positions that are or will be liquidatable"""
        liquidatable = []
        at_risk = []

        for user, position in self.positions.items():
            hf = position['health_factor']

            if hf < 1.0:
                liquidatable.append(user)
            elif hf < 1.0 + price_change_threshold:
                # Would be liquidatable with X% price drop
                at_risk.append({
                    'user': user,
                    'current_hf': hf,
                    'price_drop_to_liquidate': (hf - 1.0) / hf
                })

        return liquidatable, at_risk
Enter fullscreen mode Exit fullscreen mode

Flash Loan Liquidation

Execute liquidations without holding inventory:

async def flash_liquidate(self, position):
    """Liquidate using flash loan"""
    debt_to_cover = position['debt'] // 2  # Max 50% per tx

    # Build flash loan transaction
    flash_loan_tx = self.build_flash_loan({
        'asset': position['debt_asset'],
        'amount': debt_to_cover,
        'callback': self.liquidator_contract,
        'params': encode_liquidation_params(position)
    })

    # Liquidator contract will:
    # 1. Receive flash loaned debt token
    # 2. Call liquidationCall on lending protocol
    # 3. Receive collateral with bonus
    # 4. Swap collateral to debt token
    # 5. Repay flash loan
    # 6. Keep profit

    # Calculate expected profit
    collateral_received = self.calculate_collateral(
        position, debt_to_cover
    )

    swap_output = await self.dex.get_amount_out(
        collateral_received,
        position['collateral_asset'],
        position['debt_asset']
    )

    profit = swap_output - debt_to_cover - flash_loan_fee

    if profit > self.min_profit:
        return await self.flashbots.submit_bundle([flash_loan_tx])
Enter fullscreen mode Exit fullscreen mode

Liquidation Timing

async def predict_liquidation_time(self, position, price_feed):
    """Predict when position will become liquidatable"""
    current_hf = position['health_factor']

    if current_hf < 1.0:
        return 0  # Already liquidatable

    # Calculate price drop needed
    required_drop = (current_hf - 1.0) / current_hf

    # Use historical volatility to estimate time
    daily_volatility = price_feed.get_volatility(days=30)

    # Time to X% drop (rough estimate)
    # Assumes price follows random walk
    expected_days = (required_drop / daily_volatility) ** 2

    return expected_days
Enter fullscreen mode Exit fullscreen mode

Aave Liquidation Bot

class LiquidationBot:
    LIQUIDATION_CALL_SELECTOR = '0x00a718a9'

    def __init__(self, w3, pool_address, price_oracle):
        self.w3 = w3
        self.pool = w3.eth.contract(address=pool_address, abi=AAVE_POOL_ABI)
        self.oracle = price_oracle
        self.monitored_positions = {}

    async def monitor_positions(self):
        """Continuously monitor positions for liquidation opportunities"""
        while True:
            # Get all positions with health factor < 1.1
            at_risk = await self._get_at_risk_positions()

            for position in at_risk:
                health = await self._get_health_factor(position['user'])

                if health < 1.0:
                    # Liquidatable! Execute immediately
                    await self._execute_liquidation(position)
                elif health < 1.05:
                    # Prepare transaction, monitor closely
                    await self._prepare_liquidation(position)

            await asyncio.sleep(0.5)  # Check every 500ms

    async def _get_health_factor(self, user):
        """Get user's current health factor"""
        account_data = self.pool.functions.getUserAccountData(user).call()
        return account_data[5] / 10**18

    async def _execute_liquidation(self, position):
        """Execute liquidation with flash loan"""
        user = position['user']
        collateral = position['collateral_asset']
        debt = position['debt_asset']
        debt_amount = position['debt_to_cover']

        # Calculate expected profit
        collateral_received = await self._calculate_collateral_received(
            position, debt_amount
        )
        profit = await self._calculate_profit(
            collateral, collateral_received, debt, debt_amount
        )

        if profit > 0:
            # Submit via Flashbots
            await self.flashbots.submit_bundle([{
                'to': self.liquidator_contract,
                'data': self._encode_flash_liquidation(position),
                'gas': 500000
            }], target_block=self.w3.eth.block_number + 1)

    async def _calculate_collateral_received(self, position, debt_amount):
        """Calculate collateral received from liquidation"""
        reserve_data = self.pool.functions.getReserveData(
            position['collateral_asset']
        ).call()
        liquidation_bonus = reserve_data[1]  # In basis points

        debt_price = await self.oracle.get_price(position['debt_asset'])
        collateral_price = await self.oracle.get_price(position['collateral_asset'])

        collateral_amount = (
            debt_amount * debt_price * (10000 + liquidation_bonus) // 
            (10000 * collateral_price)
        )

        return collateral_amount
Enter fullscreen mode Exit fullscreen mode

Sandwich Attack Mechanics

Sandwich attacks extract value by wrapping a victim's swap between two attacker transactions. While controversial, understanding this helps both MEV extraction and protection.

How Sandwiching Works

Block N:
1. [ATTACKER] Buy token → Price increases
2. [VICTIM]   Buy token → Gets worse price due to step 1
3. [ATTACKER] Sell token → Price decreases, keeps profit from spread
Enter fullscreen mode Exit fullscreen mode

Profit Calculation:

Profit = Backrun_output - Frontrun_input - Gas_costs
Enter fullscreen mode Exit fullscreen mode

Finding Sandwich Targets

Ideal targets have:

  • High slippage tolerance — More room to extract
  • Large trade size — More price impact to capture
  • Low liquidity pool — Smaller trades move price more
class SandwichScanner:
    def __init__(self, min_profit_wei):
        self.min_profit = min_profit_wei

    def calculate_slippage(self, amount_in, amount_out_min, pool_reserves):
        """Calculate victim's slippage tolerance"""
        expected_out = self.calculate_expected_output(
            amount_in, pool_reserves
        )

        slippage = (expected_out - amount_out_min) / expected_out
        return slippage

    def is_sandwichable(self, tx):
        """Quick filter for potential targets"""
        decoded = self.decode_swap(tx)

        if not decoded:
            return False

        slippage = self.calculate_slippage(
            decoded['amount_in'],
            decoded['amount_out_min'],
            decoded['pool_reserves']
        )

        # Need at least 0.5% slippage to be worth it
        if slippage < 0.005:
            return False

        # Need meaningful size (adjust based on gas costs)
        if decoded['amount_in'] < self.min_trade_size:
            return False

        return True
Enter fullscreen mode Exit fullscreen mode

AMM Math for Sandwiching

Understanding constant product AMM formula is essential:

def calculate_amm_output(amount_in, reserve_in, reserve_out, fee=0.003):
    """
    Uniswap V2 formula:
    amount_out = (amount_in * (1-fee) * reserve_out) / (reserve_in + amount_in * (1-fee))
    """
    amount_in_with_fee = amount_in * (1 - fee)

    numerator = amount_in_with_fee * reserve_out
    denominator = reserve_in + amount_in_with_fee

    return numerator // denominator

def calculate_new_reserves(amount_in, amount_out, reserve_in, reserve_out):
    """Calculate reserves after swap"""
    new_reserve_in = reserve_in + amount_in
    new_reserve_out = reserve_out - amount_out

    return new_reserve_in, new_reserve_out

def sandwich_profit(frontrun_amount, victim_amount, victim_min_out, r0, r1):
    """
    Calculate sandwich profit:
    1. Frontrun: We buy
    2. Victim: They buy at worse price
    3. Backrun: We sell at elevated price
    """
    # Step 1: Our frontrun
    frontrun_out = calculate_amm_output(frontrun_amount, r0, r1)
    r0_after_frontrun, r1_after_frontrun = calculate_new_reserves(
        frontrun_amount, frontrun_out, r0, r1
    )

    # Step 2: Victim's swap
    victim_out = calculate_amm_output(
        victim_amount, r0_after_frontrun, r1_after_frontrun
    )

    # Check if victim tx would revert
    if victim_out < victim_min_out:
        return 0  # Can't sandwich, victim would revert

    r0_after_victim, r1_after_victim = calculate_new_reserves(
        victim_amount, victim_out, r0_after_frontrun, r1_after_frontrun
    )

    # Step 3: Our backrun (selling what we bought)
    backrun_out = calculate_amm_output(
        frontrun_out, r1_after_victim, r0_after_victim
    )

    # Profit = what we got back - what we put in
    profit = backrun_out - frontrun_amount

    return profit
Enter fullscreen mode Exit fullscreen mode

Optimal Frontrun Size

Binary search for maximum profit:

def find_optimal_frontrun(victim_in, victim_min_out, r0, r1, gas_price):
    """Find frontrun amount that maximizes profit"""

    low = 0
    high = r0 // 5  # Max 20% of reserves
    best_profit = 0
    best_amount = 0

    for _ in range(40):  # Binary search iterations
        mid = (low + high) // 2

        profit = sandwich_profit(mid, victim_in, victim_min_out, r0, r1)

        # Subtract gas costs (roughly 300k gas for sandwich)
        net_profit = profit - (300000 * gas_price)

        if net_profit > best_profit:
            best_profit = net_profit
            best_amount = mid
            low = mid
        else:
            high = mid

    return best_amount, best_profit
Enter fullscreen mode Exit fullscreen mode

Ethical Considerations

Sandwiching is controversial because it extracts value from regular users. Many MEV searchers choose to focus on:

  • Pure arbitrage — Benefits ecosystem by maintaining price consistency
  • Liquidations — Maintains protocol health
  • Backrunning — Doesn't harm the target transaction

Consider the ethics of your MEV strategy carefully.

Sandwich Opportunity Detection

class SandwichDetector:
    def __init__(self, w3, min_profit_wei):
        self.w3 = w3
        self.min_profit = min_profit_wei
        self.pool_cache = {}

    async def analyze_swap(self, decoded_swap):
        """Analyze swap for sandwich profitability"""
        if not decoded_swap:
            return None

        amount_in = decoded_swap['params']['amountIn']
        amount_out_min = decoded_swap['params']['amountOutMin']
        path = decoded_swap['params']['path']

        # Calculate slippage tolerance
        expected_out = await self._get_expected_output(amount_in, path)
        slippage = (expected_out - amount_out_min) / expected_out

        if slippage < 0.005:  # Less than 0.5% slippage
            return None

        # Get pool reserves
        pool = await self._get_pool(path[0], path[1])
        reserves = await self._get_reserves(pool)

        # Calculate optimal sandwich size
        optimal = await self._optimize_sandwich(
            amount_in, amount_out_min, reserves, path
        )

        if optimal['profit'] > self.min_profit:
            return {
                'type': 'sandwich',
                'victim_tx': decoded_swap['hash'],
                'frontrun_amount': optimal['frontrun_size'],
                'expected_profit': optimal['profit'],
                'path': path
            }

        return None

    async def _optimize_sandwich(self, victim_in, victim_min_out, reserves, path):
        """Find optimal frontrun size using binary search"""
        r0, r1 = reserves

        low = 0
        high = r0 // 10  # Max 10% of reserves
        best_profit = 0
        best_size = 0

        for _ in range(30):
            mid = (low + high) // 2
            profit = await self._calculate_sandwich_profit(
                mid, victim_in, victim_min_out, r0, r1
            )

            if profit > best_profit:
                best_profit = profit
                best_size = mid
                low = mid
            else:
                high = mid

        return {'frontrun_size': best_size, 'profit': best_profit}

    async def _calculate_sandwich_profit(self, frontrun, victim_in, victim_min, r0, r1):
        """Calculate profit for given frontrun size"""
        # AMM constant product formula
        # After frontrun
        frontrun_out = (frontrun * 997 * r1) // (r0 * 1000 + frontrun * 997)
        new_r0 = r0 + frontrun
        new_r1 = r1 - frontrun_out

        # Victim swap at new price
        victim_out = (victim_in * 997 * new_r1) // (new_r0 * 1000 + victim_in * 997)

        if victim_out < victim_min:
            return 0  # Would revert

        # After victim
        post_r0 = new_r0 + victim_in
        post_r1 = new_r1 - victim_out

        # Backrun
        backrun_out = (frontrun_out * 997 * post_r0) // (post_r1 * 1000 + frontrun_out * 997)

        profit = backrun_out - frontrun
        gas_cost = 300000 * await self._get_gas_price()

        return profit - gas_cost
Enter fullscreen mode Exit fullscreen mode

Advanced Bundle Construction Strategies

Multi-Block Bundle Strategy

class AdvancedBundleStrategy:
    def __init__(self, w3, builders):
        self.w3 = w3
        self.builders = builders
        self.historical_tips = {}

    async def calculate_optimal_tip(self, opportunity_profit, builder, urgency):
        """Calculate optimal tip for specific builder"""
        tips = self.historical_tips.get(builder, {})
        p50_tip = tips.get('p50', 0)
        p90_tip = tips.get('p90', 0)

        urgency_multiplier = {
            'low': 0.5,
            'normal': 0.7,
            'high': 0.9,
            'critical': 1.0
        }

        base_tip_rate = urgency_multiplier.get(urgency, 0.7)
        target_tip_rate = p50_tip + (p90_tip - p50_tip) * base_tip_rate
        max_tip = opportunity_profit * 0.9

        return min(target_tip_rate, max_tip)

    async def submit_multi_block(self, bundle, blocks_ahead=3):
        """Submit bundle targeting multiple consecutive blocks"""
        current_block = self.w3.eth.block_number
        results = []

        for offset in range(blocks_ahead):
            target_block = current_block + 1 + offset

            adjusted_bundle = await self._adjust_for_block(
                bundle, target_block, offset
            )

            for builder in self.builders:
                result = await self._submit_to_builder(
                    adjusted_bundle, target_block, builder
                )
                results.append({
                    'builder': builder,
                    'block': target_block,
                    'result': result
                })

        return results

    async def merge_opportunities(self, opportunities):
        """Merge multiple opportunities into single bundle"""
        if len(opportunities) < 2:
            return opportunities[0] if opportunities else None

        if self._has_conflicts(opportunities):
            return max(opportunities, key=lambda x: x['profit'])

        merged_txs = []
        total_profit = 0

        for opp in opportunities:
            merged_txs.extend(opp['transactions'])
            total_profit += opp['profit']

        return {
            'transactions': merged_txs,
            'profit': total_profit,
            'merged': True
        }
Enter fullscreen mode Exit fullscreen mode

Multi-Chain MEV Strategies

MEV opportunities exist across all EVM-compatible chains. While Ethereum remains the largest MEV venue, alternative chains often present less competition.

Chain Comparison

Chain Block Time Avg Gas Cost Competition MEV Infrastructure
Ethereum 12s $5-50 Very High Flashbots, MEV-Boost
Arbitrum 0.25s $0.10-1 Medium Sequencer-based
Polygon 2s $0.01-0.1 High Public mempool
BSC 3s $0.05-0.5 High Public mempool
Base 2s $0.01-0.5 Medium Sequencer-based
Optimism 2s $0.01-0.5 Medium Sequencer-based

Chain-Specific Considerations

Block Times affect opportunity windows:

  • Ethereum's 12s blocks = longer detection window
  • Arbitrum's 0.25s blocks = must be extremely fast
  • Faster blocks = more opportunities but less time each

Gas Costs affect minimum profitable trade:

  • High gas (Ethereum) = need large opportunities
  • Low gas (L2s) = can profit on smaller trades

Consensus Mechanisms affect MEV extraction:

  • L2 sequencers control ordering directly
  • Some L2s implementing fair ordering
  • Public mempools allow traditional MEV

Cross-Chain MEV

Opportunities spanning multiple chains:

  • Bridge Arbitrage — Price differences between chains
  • Cross-Chain Liquidations — Monitor positions across chains
  • Latency Arbitrage — Exploit bridge/oracle delays

Cross-chain adds complexity but reduces competition.


Infrastructure and Latency Optimization

MEV extraction is fundamentally a latency competition. Every millisecond matters.

Latency Breakdown

Pipeline Stage Typical Optimized Strategy
Tx Propagation 100-500ms 10-50ms Multi-region nodes, premium feeds
Tx Parsing 1-5ms <0.1ms Compiled decoder, selector cache
Opportunity Detection 5-50ms <1ms Pre-computed state, parallel eval
Bundle Construction 5-20ms <1ms Pre-signed txs, template reuse
Bundle Submission 50-200ms 10-30ms Direct builder connections

Code Optimization Tips

Language Choice:

  • ✅ Rust, Go, C++ for latency-critical paths
  • ⚠️ Python acceptable for strategy logic only
  • ❌ Avoid interpreted languages in hot paths

Memory Management:

# Bad: Allocations in hot path
def process_tx(tx):
    result = {}  # Allocation every call
    result['decoded'] = decode(tx)
    return result

# Good: Pre-allocated buffers
class TxProcessor:
    def __init__(self):
        self.result_buffer = {}  # Reused

    def process_tx(self, tx):
        self.result_buffer.clear()
        self.result_buffer['decoded'] = decode(tx)
        return self.result_buffer
Enter fullscreen mode Exit fullscreen mode

Caching Strategy:

  • Cache pool reserves, update incrementally per block
  • Pre-compute function selectors → ABI mappings
  • Maintain hot contract state in memory

Network Optimization

  1. Geographic Distribution — Nodes in AWS us-east-1, Frankfurt, Singapore
  2. Peer Connections — Maximize connections to major node operators
  3. Premium Feeds — BloxRoute, Blocknative for early transaction access
  4. Co-location — Servers near major builders/validators

Testing and Development Workflow

Rigorous testing prevents costly failures. MEV bugs can drain funds instantly.

Testing Pyramid

                    ┌─────────────┐
                    │  Mainnet    │  ← Staged rollout
                    │  (Live)     │
                  ┌─┴─────────────┴─┐
                  │    Testnet      │  ← Integration testing
                  │   (Goerli)      │
                ┌─┴─────────────────┴─┐
                │   Mainnet Fork      │  ← Realistic simulation
                │    (Anvil)          │
              ┌─┴─────────────────────┴─┐
              │      Unit Tests         │  ← Individual functions
              └─────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Local Development with Anvil

# Fork mainnet at specific block
anvil --fork-url https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY \
      --fork-block-number 18500000

# Run tests against fork
python test_arbitrage.py --rpc http://localhost:8545
Enter fullscreen mode Exit fullscreen mode

Historical Replay Testing

Test against known MEV opportunities:

async def replay_historical_mev(block_number):
    """Test if our system would have captured known MEV"""
    # Get historical mempool state
    pending_txs = get_historical_pending(block_number)

    # Run detection
    opportunities = await detector.analyze(pending_txs)

    # Compare to actual extracted MEV
    actual_mev = get_block_mev(block_number)

    print(f"Detected: {len(opportunities)}")
    print(f"Actual: {len(actual_mev)}")
    print(f"Capture Rate: {len(opportunities)/len(actual_mev)*100:.1f}%")
Enter fullscreen mode Exit fullscreen mode

Staged Mainnet Deployment

  1. Week 1: $100 capital, monitor only, no execution
  2. Week 2: $500 capital, low-competition opportunities only
  3. Week 3: $2,000 capital, expand to medium competition
  4. Week 4+: Gradually increase based on performance

MEV Risk Management

MEV extraction carries significant financial and technical risks that must be actively managed.

Risk Categories

Risk Type Severity Mitigation
Smart Contract Critical Audit, test, limit balances, emergency pause
Execution High Simulate thoroughly, use Flashbots
Competition Medium Invest in latency, develop unique strategies
Capital Manageable Position sizing, loss limits, diversification

Smart Contract Security

Your MEV contracts hold and process funds. Security is paramount:

// Essential security patterns
contract MEVExecutor {
    address public owner;
    bool public paused;

    modifier onlyOwner() {
        require(msg.sender == owner, "Not owner");
        _;
    }

    modifier whenNotPaused() {
        require(!paused, "Paused");
        _;
    }

    // Emergency pause
    function pause() external onlyOwner {
        paused = true;
    }

    // Emergency withdrawal
    function emergencyWithdraw(address token) external onlyOwner {
        uint256 balance = IERC20(token).balanceOf(address(this));
        IERC20(token).transfer(owner, balance);
    }
}
Enter fullscreen mode Exit fullscreen mode

Capital Management Rules

class RiskManager:
    def __init__(self, max_position_pct=0.05, daily_loss_limit_pct=0.03):
        self.max_position_pct = max_position_pct
        self.daily_loss_limit_pct = daily_loss_limit_pct
        self.daily_pnl = 0

    def check_opportunity(self, opportunity, capital):
        """Validate opportunity against risk limits"""
        max_position = capital * self.max_position_pct

        if opportunity['required_capital'] > max_position:
            return False, "Exceeds position limit"

        daily_loss_limit = capital * self.daily_loss_limit_pct
        if self.daily_pnl < -daily_loss_limit:
            return False, "Daily loss limit reached"

        return True, "Approved"
Enter fullscreen mode Exit fullscreen mode

Backrunning Strategies

Backrunning extracts value by executing immediately after a target transaction rather than before. This is generally less competitive than front-running.

Types of Backrun MEV

Pure Arbitrage Backrun:

Large Swap → Price moves on DEX A → Your arbitrage restores price
Enter fullscreen mode Exit fullscreen mode

Oracle Update Backrun:

Chainlink Update → Price changes → Liquidations become possible → You liquidate
Enter fullscreen mode Exit fullscreen mode

Governance Backrun:

Proposal Passes → New feature enabled → You're first to use it
Enter fullscreen mode Exit fullscreen mode

Implementation Pattern

async def backrun_opportunity(target_tx, our_tx):
    """Create backrun bundle"""
    bundle = [
        {'signed_transaction': target_tx.raw},  # Target first
        {
            'signer': our_account,
            'transaction': our_tx
        }
    ]

    # Submit bundle - our tx guaranteed to follow target
    await flashbots.send_bundle(bundle, target_block)
Enter fullscreen mode Exit fullscreen mode

Why Backrunning is Attractive

  1. Lower competition — Don't need to outbid target
  2. Ethical — Pure backrun arbitrage benefits ecosystem
  3. Simpler — Only need to follow, not front-run
  4. Accessible — Good entry point for new searchers

Common Implementation Mistakes

Learning from common failures accelerates development.

❌ Insufficient Simulation

Problem: Executing without thorough simulation

# BAD: No simulation
async def execute_opportunity(opp):
    return await send_transaction(opp.tx)

# GOOD: Full simulation with pending tx effects
async def execute_opportunity(opp):
    # Simulate with current pending txs
    result = await simulate_with_pending(opp.tx, pending_pool)

    if not result.success:
        return None

    if result.profit < min_profit:
        return None

    return await send_transaction(opp.tx)
Enter fullscreen mode Exit fullscreen mode

❌ Ignoring Gas Costs

Problem: Calculating profit without gas

# BAD: Gross profit only
profit = sell_amount - buy_amount

# GOOD: Net profit after all costs
base_fee = get_base_fee()
priority_fee = calculate_priority_fee(opportunity_value)
gas_units = estimate_gas(tx)

gas_cost = (base_fee + priority_fee) * gas_units
builder_tip = opportunity_value * 0.1  # 10% tip

net_profit = sell_amount - buy_amount - gas_cost - builder_tip
Enter fullscreen mode Exit fullscreen mode

❌ Public Mempool Submission

Problem: Broadcasting MEV tx to public mempool

# BAD: Visible to everyone
w3.eth.send_raw_transaction(signed_tx)

# GOOD: Private via Flashbots
flashbots.send_bundle(bundle, target_block)
Enter fullscreen mode Exit fullscreen mode

❌ Single Point of Failure

Problem: Relying on single node/service

# BAD: Single node
class MEVBot:
    def __init__(self):
        self.w3 = Web3(HTTPProvider("http://node1:8545"))

# GOOD: Redundant infrastructure
class MEVBot:
    def __init__(self):
        self.nodes = [
            Web3(HTTPProvider("http://node1:8545")),
            Web3(HTTPProvider("http://node2:8545")),
            Web3(HTTPProvider("http://backup:8545")),
        ]

    async def get_transaction(self, hash):
        for node in self.nodes:
            try:
                return node.eth.get_transaction(hash)
            except:
                continue
        raise Exception("All nodes failed")
Enter fullscreen mode Exit fullscreen mode

❌ Over-Complex Strategies

Problem: Building complex multi-leg strategies first

Solution: Start simple:

  1. Week 1-4: Simple two-hop arbitrage
  2. Month 2: Add more pairs
  3. Month 3: Multi-hop paths
  4. Month 4+: Complex strategies

❌ Inadequate Testing

Problem: Deploying without mainnet fork testing

Minimum Testing Requirements:

  • [ ] Unit tests for all functions
  • [ ] Mainnet fork simulation
  • [ ] Historical replay testing
  • [ ] Testnet deployment
  • [ ] Paper trading (detection without execution)
  • [ ] Minimal capital live testing

NFT MEV Opportunities

NFT markets present unique MEV opportunities distinct from DeFi.

NFT MEV Types

Listing Snipe:
Purchase NFTs listed significantly below fair value before others notice. Requires:

  • Real-time marketplace monitoring
  • ML valuation models for fair price estimation
  • Fast execution infrastructure

Mint Racing:
Secure allocation in popular NFT launches:

async def mint_snipe(contract_address, mint_function):
    # Pre-build transaction
    tx = {
        'to': contract_address,
        'data': encode_mint_function(mint_function),
        'gas': 300000,
        'maxFeePerGas': aggressive_gas_price,
        'maxPriorityFeePerGas': high_priority
    }

    # Watch for mint enable
    while True:
        if await is_mint_live(contract_address):
            return await send_immediately(tx)
        await asyncio.sleep(0.1)
Enter fullscreen mode Exit fullscreen mode

Cross-Marketplace Arbitrage:
NFT listed on OpenSea at 1 ETH, Blur at 0.9 ETH:

  1. Buy on Blur for 0.9 ETH
  2. List/sell on OpenSea for 1 ETH
  3. Profit: 0.1 ETH minus fees

NFT Detection System

class NFTMempoolMonitor:
    MARKETPLACE_SELECTORS = {
        '0xab834bab': 'opensea_atomic_match',
        '0x9a1fc3a7': 'blur_buy',
        '0x00000000': 'looksrare_execute',
    }

    async def analyze_nft_tx(self, tx):
        selector = tx['input'][:10]

        if selector in self.MARKETPLACE_SELECTORS:
            decoded = self.decode_nft_tx(tx)

            # Check if underpriced
            fair_value = await self.estimate_value(
                decoded['collection'],
                decoded['token_id']
            )

            if decoded['price'] < fair_value * 0.9:
                return {
                    'type': 'nft_snipe',
                    'collection': decoded['collection'],
                    'token_id': decoded['token_id'],
                    'price': decoded['price'],
                    'fair_value': fair_value,
                    'profit': fair_value - decoded['price']
                }

        return None
Enter fullscreen mode Exit fullscreen mode

Performance Monitoring and Analytics

Comprehensive monitoring enables optimization and rapid problem detection.

Key Performance Indicators

Track these metrics for your MEV system:

Detection Metrics:
| Metric | Description | Target |
|--------|-------------|--------|
| Opportunities/hour | Raw detection volume | Varies by strategy |
| Detection latency | Time from tx to detection | <10ms |
| False positive rate | Invalid opportunities | <20% |

Execution Metrics:
| Metric | Description | Target |
|--------|-------------|--------|
| Win rate | Successful extractions | >20% |
| Bundle inclusion rate | Bundles included vs sent | >30% |
| Profit accuracy | Actual vs predicted | >85% |

Infrastructure Metrics:
| Metric | Description | Target |
|--------|-------------|--------|
| Node sync status | Blocks behind tip | 0 |
| WebSocket uptime | Connection availability | >99.9% |
| Processing throughput | Txs processed/second | >1000 |

Monitoring Implementation

import prometheus_client as prom

# Define metrics
opportunities_detected = prom.Counter(
    'mev_opportunities_detected_total',
    'Total opportunities detected',
    ['type']
)

bundle_latency = prom.Histogram(
    'mev_bundle_latency_seconds',
    'Bundle construction and submission latency'
)

profit_actual = prom.Gauge(
    'mev_profit_actual_wei',
    'Actual profit from last extraction'
)

class MEVMonitor:
    def __init__(self):
        # Start Prometheus server
        prom.start_http_server(8000)

    def record_opportunity(self, opp_type):
        opportunities_detected.labels(type=opp_type).inc()

    @bundle_latency.time()
    async def submit_bundle(self, bundle):
        return await flashbots.send_bundle(bundle)

    def record_profit(self, profit_wei):
        profit_actual.set(profit_wei)
Enter fullscreen mode Exit fullscreen mode

Alerting Rules

Set up alerts for critical conditions:

# Prometheus alerting rules
groups:
  - name: mev_alerts
    rules:
      - alert: HighFailureRate
        expr: rate(mev_bundles_failed_total[5m]) > 0.5
        for: 2m
        annotations:
          summary: "Bundle failure rate above 50%"

      - alert: NodeOutOfSync
        expr: mev_node_blocks_behind > 2
        for: 1m
        annotations:
          summary: "Node is behind chain tip"

      - alert: NoOpportunities
        expr: rate(mev_opportunities_detected_total[10m]) == 0
        for: 10m
        annotations:
          summary: "No opportunities detected in 10 minutes"
Enter fullscreen mode Exit fullscreen mode

Dashboard Components

Build dashboards showing:

  1. Real-time P&L — Rolling profit/loss over time
  2. Opportunity funnel — Detected → Simulated → Submitted → Included
  3. Latency distribution — Detection and submission timing
  4. Competition analysis — Win rate by opportunity type
  5. Gas efficiency — Gas spent vs profit earned
# Example: Real-time P&L tracking
class PnLTracker:
    def __init__(self):
        self.trades = []
        self.cumulative_pnl = 0

    def record_trade(self, profit, gas_cost, opportunity_type):
        net_pnl = profit - gas_cost
        self.cumulative_pnl += net_pnl

        self.trades.append({
            'timestamp': datetime.now(),
            'profit': profit,
            'gas_cost': gas_cost,
            'net_pnl': net_pnl,
            'cumulative': self.cumulative_pnl,
            'type': opportunity_type
        })

    def get_hourly_summary(self):
        hour_ago = datetime.now() - timedelta(hours=1)
        recent = [t for t in self.trades if t['timestamp'] > hour_ago]

        return {
            'trades': len(recent),
            'gross_profit': sum(t['profit'] for t in recent),
            'gas_spent': sum(t['gas_cost'] for t in recent),
            'net_pnl': sum(t['net_pnl'] for t in recent),
            'win_rate': len([t for t in recent if t['net_pnl'] > 0]) / len(recent) if recent else 0
        }
Enter fullscreen mode Exit fullscreen mode

MEV Protection for Users

Understanding MEV attacks helps design protective systems.

Protection Methods

1. Private Transaction Pools

Route transactions through Flashbots Protect or similar:

# Instead of public RPC
curl https://eth-mainnet.g.alchemy.com/v2/KEY

# Use Flashbots Protect
curl https://rpc.flashbots.net
Enter fullscreen mode Exit fullscreen mode

2. Tight Slippage Settings

Reduce extractable value:

// Bad: 5% slippage = sandwichable
const slippage = 0.05;

// Better: 0.5% slippage = minimal extraction
const slippage = 0.005;
Enter fullscreen mode Exit fullscreen mode

3. MEV-Aware DEX Aggregators

  • CoW Swap — Batch auctions prevent front-running
  • 1inch Fusion — Solver competition for best execution
  • UniswapX — Dutch auctions with MEV protection

4. Limit Orders

Avoid market orders that reveal intent:

# Market order: MEV bot sees your price tolerance
swap_exact_tokens_for_tokens(amount_in, min_out)

# Limit order: Execute only at specific price
place_limit_order(price, amount)
Enter fullscreen mode Exit fullscreen mode

Protocol-Level MEV Resistance

Protocols can design MEV resistance:

  1. Batch Auctions — Aggregate orders, execute at uniform price
  2. Commit-Reveal — Hide order details until execution
  3. Encrypted Mempools — Prevent pre-execution visibility
  4. Fair Ordering — Time-based rather than gas-based ordering

Future of MEV

The MEV landscape continues evolving rapidly.

Emerging Trends

Intent-Based Protocols

Protocols like CoW Swap, UniswapX, and 1inch Fusion enable users to specify desired outcomes while solvers compete to execute optimally. This captures MEV for users rather than external searchers.

Traditional: User → Mempool → MEV Bot extracts value → Execution
Intent-Based: User → Intent Pool → Solvers compete → Best execution for user
Enter fullscreen mode Exit fullscreen mode

Encrypted Mempools

Threshold encryption prevents pre-execution transaction visibility. Projects exploring this would eliminate front-running entirely by keeping contents hidden until ordering is committed.

Cross-Chain MEV

As DeFi spreads across chains, opportunities exist in:

  • Bridge arbitrage
  • Cross-chain liquidations
  • Latency exploitation between chains

Higher complexity = lower competition.

What This Means for Searchers

  1. Adapt or die — Static strategies will stop working
  2. Become a solver — Compete on execution quality, not speed
  3. Go cross-chain — Early mover advantage on new frontiers
  4. Build moats — Proprietary infrastructure, unique strategies

Summary

Building successful MEV systems requires deep technical knowledge and significant infrastructure investment. For teams looking to accelerate development, partnering with an experienced trading bot development company can provide the expertise and infrastructure foundation needed for competitive MEV extraction.

Core Requirements:

Infrastructure — Dedicated nodes, low-latency networking, multi-region deployment

Detection — Fast transaction decoding, ABI databases, opportunity classification

Execution — Flashbots bundles for atomic, private, guaranteed execution

Gas Optimization — Dynamic pricing based on opportunity value and urgency

Simulation — Accurate state prediction accounting for pending transactions

Risk Management — Position limits, loss controls, infrastructure redundancy

Key Metrics to Track

Metric Target Why It Matters
Detection Rate >50% Finding opportunities
Win Rate >20% Competitive execution
Latency P95 <50ms Speed advantage
Gas Efficiency >60% Profit retention
Profit Accuracy >90% Simulation quality

Getting Started Checklist

  • [ ] Set up Geth node with MEV configuration
  • [ ] Implement WebSocket mempool subscription
  • [ ] Build transaction decoder with common ABIs
  • [ ] Create opportunity detection for one MEV type
  • [ ] Integrate Flashbots bundle submission
  • [ ] Test extensively on mainnet fork
  • [ ] Deploy with minimal capital
  • [ ] Monitor and iterate

MEV extraction is fundamentally a latency competition. The searcher who detects opportunities first and submits transactions fastest captures the profit. Continuous optimization across every component of the pipeline compounds into competitive advantage.


Further Reading


What MEV strategies are you exploring? Drop a comment below! 👇

Top comments (0)