DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

From TB-Scale MongoDB to Doris: 5 Critical Challenges and Fixes with Apache SeaTunnel

Recently, in several data middle-platform projects, we have frequently used Apache SeaTunnel to synchronize data from MongoDB to Apache Doris. Honestly, this task looks simple, but once you actually get started, you realize there are quite a few pitfalls. Especially in production environments, where data volume is large and structures are complex, a small oversight can easily lead to problems.

This article does not intend to repeat those basic configuration steps—there are already plenty of them online. I want to focus on the areas in real production environments where people are most likely to stumble. Particularly when you are dealing with TB-level MongoDB collections that need to be stably synchronized to Doris for real-time analysis, the following five pitfalls are almost inevitable. I will combine specific error logs, troubleshooting approaches, and the solutions our team has explored to help you eliminate these issues one by one.

1. Data Type Mapping: BSON-to-SQL Conversion Issues

MongoDB’s BSON type system and Doris’s SQL type system appear to be automatically mappable on the surface, but in reality they hide quite a few “surprises.” The most typical examples are the handling of Decimal128 and ObjectId.

1.1 Precision Loss Issue with Decimal128

MongoDB uses Decimal128 to store high-precision numeric values, such as financial transaction amounts. SeaTunnel maps it to Doris’s DECIMAL type by default, but there is a key limitation here: Doris’s DECIMAL supports a maximum precision of 38 digits, while Decimal128 supports 34 decimal digits. If you do not explicitly specify precision in the SeaTunnel schema, you are likely to encounter the following error:

java.lang.ArithmeticException: Non-terminating decimal expansion; no exact representable decimal result
Enter fullscreen mode Exit fullscreen mode

The solution is to explicitly declare the precision in the schema. Do not rely on automatic inference—control it manually:

source {
  MongoDB {
    uri = "mongodb://user:password@host:27017"
    database = "finance"
    collection = "transactions"
    schema = {
      fields {
        _id = string
        amount = "decimal(38, 18)"  # Explicitly specify total precision 38, scale 18
        currency = string
        timestamp = timestamp
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Note: If the Decimal128 values in your data contain more than 18 decimal places, you need to adjust according to your actual situation. In one of our e-commerce projects, coupon calculation required high precision, so we used decimal(38, 24).

1.2 ObjectId and Nested Document Serialization Pitfalls

The _id field in MongoDB is of type ObjectId by default, and SeaTunnel converts it to a string. This seems fine—until you discover primary key conflicts in the Doris table—because after ObjectId is converted to a string, Doris’s UNIQUE KEY check may cause issues.

Nested documents are even more troublesome. A common structure in MongoDB looks like this:

{
  "_id": ObjectId("507f1f77bcf86cd799439011"),
  "user": {
    "name": "Zhang San",
    "address": {
      "city": "Beijing",
      "district": "Chaoyang"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

By default, SeaTunnel converts the entire user object into a JSON string and stores it in a VARCHAR field in Doris. If you want to directly query user.address.city in Doris, you have to use JSON functions to parse it, which results in poor performance.

Our approach is to flatten nested fields in advance using the transform plugin in SeaTunnel:

transform {
  # Flatten nested fields
  sql {
    query = """
      SELECT 
        _id,
        user.name as user_name,
        user.address.city as city,
        user.address.district as district
      FROM mongodb_source
    """
  }
}

sink {
  Doris {
    fenodes = "fe1:8030,fe2:8030"
    username = "admin"
    password = "***"
    database = "analytics"
    table = "user_flat"
    # The table structure is now flat, and query performance is high
  }
}
Enter fullscreen mode Exit fullscreen mode

If the nesting level is too deep or uncertain, you may also consider using the MAP type in Doris, but note that it is only supported in version 2.0 and above.

2. Connection and Timeout Configuration: High-Concurrency Challenges in Production

In a test environment with dozens of records, everything works fine. Once you move to production, connection timeouts, cursor timeouts, and memory overflows all appear.

2.1 MongoDB Connection Pool and Cursor Timeout

Several key parameters in the MongoDB source plugin of SeaTunnel are easy to overlook:

Parameter Default Value Production Recommendation Description
cursor.no-timeout true false Setting to true may cause excessive accumulation of server-side cursors
fetch.size 2048 8192–16384 Adjust based on document size to reduce network round trips
max.time-min 600 30 Query timeout (minutes) to prevent long-running queries from exhausting cluster resources
partition.split-key _id Depends on business logic Partition key for parallel reads

We once encountered a major pitfall: setting cursor.no-timeout=true together with large data queries caused hundreds of cursors to accumulate on the MongoDB server side, each consuming memory and nearly bringing the cluster down. Later, we changed it to:

source {
  MongoDB {
    uri = "mongodb://user:password@host1:27017,host2:27017/?replicaSet=rs0&readPreference=secondaryPreferred"
    database = "logs"
    collection = "access_logs"
    cursor.no-timeout = false
    fetch.size = 16384
    max.time-min = 30
    partition.split-key = "_id"
    partition.split-size = 1048576  # 1MB per partition

    # Only synchronize data from the last 7 days to avoid full table scans
    match.query = "{timestamp: {$gte: ISODate('2024-01-01T00:00:00Z')}}"
  }
}
Enter fullscreen mode Exit fullscreen mode

2.2 Stream Load Tuning in Doris

On the Doris Sink side, the core lies in Stream Load batch parameters. The default configuration is friendly for small data volumes, but production environments require tuning:

sink {
  Doris {
    fenodes = "fe1:8030,fe2:8030,fe3:8030"
    username = "sync_user"
    password = "***"
    database = "dw"
    table = "fact_table"

    sink.label-prefix = "seatunnel_sync"
    sink.enable-2pc = true  # Enable two-phase commit to ensure Exactly-Once
    sink.buffer-size = 524288  # 512KB, default 256KB is too small
    sink.buffer-count = 5  # Number of buffers
    doris.batch.size = 5000  # 5000 rows per batch, default 1024

    # Key: advanced Stream Load parameters
    doris.config = {
      format = "json"
      read_json_by_line = "true"
      strip_outer_array = "true"
      num_as_string = "true"  # Convert numbers to string to avoid type issues

      # Connection and timeout control
      connect_timeout = "10"
      socket_timeout = "30"

      # Partial update mode (if the table uses Unique model)
      partial_columns = "true"
      merge_type = "MERGE"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

There is a detail here: sink.label-prefix must be unique for each task; otherwise, Doris will reject duplicate import labels. We use the pattern "seatunnel_${job_id}_${timestamp}".

3. Performance Bottleneck Identification and Optimization: From Hours to Minutes

When synchronization tasks run slowly, it is usually not caused by a single reason, but by multiple factors stacking together.

3.1 Diagnostic Toolchain

First, you need to know where the bottleneck is. Our commonly used monitoring combination:

  1. SeaTunnel’s own logs: enable DEBUG level to see the reading progress of each partition
  2. MongoDB Profiler: temporarily enable to confirm whether the query uses indexes
  3. Doris FE/BE monitoring: use show proc '/current_queries' to check import status
  4. System monitoring: CPU, memory, network IO

There was once a case where synchronization speed was stuck at 1000 records per second and could not improve. After investigation, we found:

  • On the MongoDB side: the query used the $or operator and did not use an index
  • Network: cross-availability-zone transmission with high latency
  • On the Doris side: BE node disk IO was saturated

3.2 Partition Strategy Optimization

SeaTunnel supports parallel reading based on partition.split-key. However, using _id as the default partition key is not always optimal.

If the data has a natural time dimension, such as a log table, partitioning by a time field works better:

source {
  MongoDB {
    # Assume each record has an event_time field
    partition.split-key = "event_time"
    partition.split-size = 3600000  # Partition by 1 hour

    # Combine with query conditions to avoid full table scans
    match.query = """
      {
        event_time: {
          $gte: ISODate("2024-01-01T00:00:00Z"),
          $lt: ISODate("2024-01-02T00:00:00Z")
        }
      }
    """
  }
}
Enter fullscreen mode Exit fullscreen mode

If the data distribution is uneven, you can first analyze key distribution using an aggregation query:

// Execute in MongoShell
db.collection.aggregate([
  { $bucketAuto: { groupBy: "$shard_key", buckets: 10 } }
])
Enter fullscreen mode Exit fullscreen mode

3.3 Memory and GC Tuning

SeaTunnel is based on the JVM, and GC issues are common with large data volumes. Our production JVM parameters:

# seatunnel_env.sh or startup script
export JAVA_OPTS="-Xmx8g -Xms8g \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:ParallelGCThreads=4 \
-XX:ConcGCThreads=2 \
-XX:+AlwaysPreTouch \
-XX:+UseStringDeduplication \
-XX:+PrintGCDetails \
-XX:+PrintGCDateStamps \
-Xloggc:/var/log/seatunnel/gc.log"
Enter fullscreen mode Exit fullscreen mode

The key point is -XX:+AlwaysPreTouch, which pre-allocates memory at startup to avoid runtime jitter.

4. Data Consistency and Error Handling: Exactly-Once Implementation Details

Data synchronization must not lose data, nor duplicate data. SeaTunnel supports Exactly-Once semantics, but it must be configured correctly.

4.1 Pitfalls of Two-Phase Commit (2PC)

Setting sink.enable-2pc = true in the Doris Sink enables two-phase commit, theoretically ensuring Exactly-Once. However, we once encountered a strange issue: after a task failed and retried, data was duplicated.

The reason was repeated use of labels. When SeaTunnel retries after failure using the same label-prefix, Doris may treat it as the same import task and skip certain data.

Solution: include timestamp and attempt count in the label:

sink {
  Doris {
    sink.label-prefix = "sync_${table_name}_${now()}_${attempt_num}"
    sink.enable-2pc = true
    sink.max-retries = 3
    sink.check-interval = 5000  # Check every 5 seconds
  }
}
Enter fullscreen mode Exit fullscreen mode

4.2 Dirty Data and Type Conversion Errors

MongoDB is schema-less; the same field may be a string in one row and a number in another. Doris has a strict schema and will report errors on type mismatch.

The needs_unsupported_type_casting parameter in SeaTunnel can help:

sink {
  Doris {
    # Attempt to automatically convert incompatible types, e.g., Decimal to Double
    needs_unsupported_type_casting = true

    # But it is recommended to handle this in the transform layer
  }
}

transform {
  # Normalize types before writing
  sql {
    query = """
      SELECT 
        CAST(amount AS DOUBLE) as amount_double,
        COALESCE(name, '') as name_safe,  # Handle null
        REGEXP_REPLACE(description, '[\\x00-\\x1F]', '') as description_clean
      FROM source_table
    """
  }
}
Enter fullscreen mode Exit fullscreen mode

4.3 Resume from Breakpoint and Checkpoint

SeaTunnel supports Checkpoint, but the storage backend must be configured correctly. We use HDFS:

env {
  execution.parallelism = 8
  job.mode = "BATCH"

  # Checkpoint configuration
  checkpoint.interval = 60000  # Once per minute
  checkpoint.timeout = 600000  # 10-minute timeout
  checkpoint.max-concurrent-checkpoints = 1

  state.backend = "hdfs"
  state.checkpoints.dir = "hdfs://namenode:8020/seatunnel/checkpoints"
  state.savepoints.dir = "hdfs://namenode:8020/seatunnel/savepoints"

  # Restore from the latest checkpoint after task failure
  execution.savepoint-restore.enabled = true
}
Enter fullscreen mode Exit fullscreen mode

A detail here: if the Checkpoint frequency is too high, it affects performance; if too low, too much data may be reprocessed during recovery. We generally decide based on data volume—for example, one Checkpoint per one million rows processed.

5. Operations Monitoring and Alerting: From Passive Firefighting to Proactive Prevention

This last one is not a technical pitfall, but it is more fatal than technical ones—lack of monitoring. Often, you only discover that the sync task has failed long ago when users report incorrect data.

5.1 Key Metric Monitoring

We monitor the following metrics in Prometheus (exposed via SeaTunnel JMX):

Metric Description Alert Threshold
seatunnel_source_records_total Total records read from source Zero for 5 consecutive minutes
seatunnel_sink_records_total Total records written to sink Difference from source exceeds 10%
seatunnel_sink_duration_seconds Sink write duration P95 > 10 seconds
seatunnel_sink_errors_total Total sink write errors Alert on any error
seatunnel_checkpoint_duration Checkpoint duration > 30 seconds
seatunnel_jvm_memory_used JVM memory usage > 80%

Grafana dashboard configuration example:

-- Sync lag monitoring
SELECT 
  time_bucket('1m', timestamp) as time,
  source_max_timestamp - sink_max_timestamp as lag_seconds
FROM (
  -- Maximum timestamp from source
  SELECT MAX(event_time) as source_max_timestamp
  FROM mongodb_source_table
  WHERE event_time > now() - interval '1 hour'
) source,
(
  -- Maximum timestamp from sink
  SELECT MAX(event_time) as sink_max_timestamp  
  FROM doris_target_table
  WHERE event_time > now() - interval '1 hour'
) sink
GROUP BY 1
ORDER BY 1 DESC
Enter fullscreen mode Exit fullscreen mode

5.2 Automated Repair Scripts

Some common errors can be automatically fixed. For example, insufficient Doris table space:

#!/bin/bash
# auto_extend_doris.sh

ERROR_LOG=$1
TABLE_NAME=$(grep -o "table [a-zA-Z0-9_]*" "$ERROR_LOG" | head -1 | cut -d' ' -f2)

if [[ -n "$TABLE_NAME" ]]; then
  # Check table partition usage
  USAGE=$(mysql -h doris-fe -P 9030 -u admin -p'***' -e \
    "SHOW PARTITIONS FROM $TABLE_NAME WHERE UsedPercent > 90;" | wc -l)

  if [[ $USAGE -gt 0 ]]; then
    # Automatically add partition
    mysql -h doris-fe -P 9030 -u admin -p'***' <<EOF
    ALTER TABLE $TABLE_NAME ADD PARTITION p_$(date +%Y%m%d) 
    VALUES [("$(date +%Y-%m-%d)"), ("$(date -d '+7 days' +%Y-%m-%d)"));
EOF
    echo "Automatic partition extension completed, restart SeaTunnel task"
    systemctl restart seatunnel-worker
  fi
fi
Enter fullscreen mode Exit fullscreen mode

5.3 Data Quality Validation

Automatically validate after synchronization completes:

# validate_sync.py
import pymongo
import pymysql
from datetime import datetime, timedelta

def validate_counts():
    # MongoDB count
    mongo_client = pymongo.MongoClient("mongodb://host:27017")
    mongo_count = mongo_client.db.collection.count_documents({
        "update_time": {"$gte": datetime.utcnow() - timedelta(hours=1)}
    })

    # Doris count  
    doris_conn = pymysql.connect(host="doris-fe", port=9030,
                                 user="admin", password="***", database="dw")
    with doris_conn.cursor() as cursor:
        cursor.execute("""
            SELECT COUNT(*) 
            FROM target_table 
            WHERE update_time >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
        """)
        doris_count = cursor.fetchone()[0]

    # Allow 1% deviation (considering deletes, updates, etc.)
    diff_ratio = abs(mongo_count - doris_count) / max(mongo_count, 1)

    if diff_ratio > 0.01:
        send_alert(f"Data inconsistency: MongoDB={mongo_count}, Doris={doris_count}, Difference={diff_ratio:.2%}")
        return False
    return True
Enter fullscreen mode Exit fullscreen mode

After building this monitoring system, our team has never again been woken up by midnight alarms—not because there are no problems, but because issues are automatically handled before impacting the business.

These weren’t theoretical issues — they cost us real time in production. Data synchronization is not just about correct configuration; the real challenge lies in stable operation in production environments. Next time you encounter SeaTunnel synchronization issues, start troubleshooting from these five aspects—you will most likely find the direction. Every environment has its own particularities, but the core problem-solving ideas are universal.

Top comments (0)