import pyarrow as pa
import pandas as pd
from snowflake.connector import connect
def map_snowflake_schema_to_parquet(arrow_schema):
"""Map Snowflake Arrow schema to Parquet-compatible schema"""
mapped_fields = []
for field in arrow_schema:
field_name = field.name
field_type = field.type
# Handle different data type mappings
if pa.types.is_timestamp(field_type):
# Snowflake timestamps are usually microseconds, preserve precision
mapped_type = pa.timestamp('us')
elif pa.types.is_date(field_type):
mapped_type = pa.date32()
elif pa.types.is_decimal(field_type):
# Preserve decimal precision and scale
mapped_type = pa.decimal128(field_type.precision, field_type.scale)
elif pa.types.is_integer(field_type):
# Map to appropriate integer types
if field_type.bit_width == 64:
mapped_type = pa.int64()
elif field_type.bit_width == 32:
mapped_type = pa.int32()
elif field_type.bit_width == 16:
mapped_type = pa.int16()
else:
mapped_type = pa.int8()
elif pa.types.is_floating(field_type):
if field_type.bit_width == 64:
mapped_type = pa.float64()
else:
mapped_type = pa.float32()
elif pa.types.is_boolean(field_type):
mapped_type = pa.bool_()
elif pa.types.is_binary(field_type):
mapped_type = pa.binary()
else:
# Default to string for other types
mapped_type = pa.string()
mapped_fields.append(pa.field(field_name, mapped_type))
return pa.schema(mapped_fields)
# Usage
conn = connect(...)
cursor = conn.cursor()
cursor.execute("SELECT * FROM your_table")
arrow_table = cursor.fetch_arrow_all()
# Print original schema for debugging
print("Original Snowflake Schema:")
print(arrow_table.schema)
# Create mapped schema
mapped_schema = map_snowflake_schema_to_parquet(arrow_table.schema)
print("\nMapped Parquet Schema:")
print(mapped_schema)
# Convert with mapped schema
df = arrow_table.to_pandas(schema=mapped_schema)
df.to_parquet('output.parquet')
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)