DEV Community

Developer213
Developer213

Posted on • Edited on

Tesr



import pyarrow as pa
import pandas as pd
from snowflake.connector import connect

def map_snowflake_schema_to_parquet(arrow_schema):
    """Map Snowflake Arrow schema to Parquet-compatible schema"""
    mapped_fields = []

    for field in arrow_schema:
        field_name = field.name
        field_type = field.type

        # Handle different data type mappings
        if pa.types.is_timestamp(field_type):
            # Snowflake timestamps are usually microseconds, preserve precision
            mapped_type = pa.timestamp('us')
        elif pa.types.is_date(field_type):
            mapped_type = pa.date32()
        elif pa.types.is_decimal(field_type):
            # Preserve decimal precision and scale
            mapped_type = pa.decimal128(field_type.precision, field_type.scale)
        elif pa.types.is_integer(field_type):
            # Map to appropriate integer types
            if field_type.bit_width == 64:
                mapped_type = pa.int64()
            elif field_type.bit_width == 32:
                mapped_type = pa.int32()
            elif field_type.bit_width == 16:
                mapped_type = pa.int16()
            else:
                mapped_type = pa.int8()
        elif pa.types.is_floating(field_type):
            if field_type.bit_width == 64:
                mapped_type = pa.float64()
            else:
                mapped_type = pa.float32()
        elif pa.types.is_boolean(field_type):
            mapped_type = pa.bool_()
        elif pa.types.is_binary(field_type):
            mapped_type = pa.binary()
        else:
            # Default to string for other types
            mapped_type = pa.string()

        mapped_fields.append(pa.field(field_name, mapped_type))

    return pa.schema(mapped_fields)

# Usage
conn = connect(...)
cursor = conn.cursor()
cursor.execute("SELECT * FROM your_table")
arrow_table = cursor.fetch_arrow_all()

# Print original schema for debugging
print("Original Snowflake Schema:")
print(arrow_table.schema)

# Create mapped schema
mapped_schema = map_snowflake_schema_to_parquet(arrow_table.schema)
print("\nMapped Parquet Schema:")
print(mapped_schema)

# Convert with mapped schema
df = arrow_table.to_pandas(schema=mapped_schema)
df.to_parquet('output.parquet')
Enter fullscreen mode Exit fullscreen mode

Top comments (0)