Python Script for Processing CSV Files and Logging
This Python script processes CSV files containing first_name
and last_name
columns, extracts unique combinations of these columns, and saves the results to a new CSV file. It also includes robust logging to track the process and handle errors.
What Does This Script Do?
-
Logging Setup:
- Logs messages to both a log file (
process.log
) and the console. - Supports different log levels:
info
,warning
,error
, anddebug
.
- Logs messages to both a log file (
-
Data Processing:
- Reads all CSV files from a specified input directory.
- Extracts unique
first_name
andlast_name
combinations from each file. - Tracks the source file for each unique combination.
-
Error Handling:
- Skips files that don’t have the required columns (
first_name
andlast_name
). - Logs errors if any issues occur during file processing.
- Skips files that don’t have the required columns (
-
Output:
- Saves the combined unique data to a new CSV file in a specified output directory.
Code
import pandas as pd
import glob
import os
import logging
# -------------------------
# Logging Setup
# -------------------------
# Define log directory and file
log_dir = r"C:\Users\YourUsername\Documents\output\logs"
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "process.log")
# Configure logger
logger = logging.getLogger("DataProcessLogger")
logger.setLevel(logging.INFO)
# File handler for logging to a file
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
# Stream handler for logging to stdout
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_formatter = logging.Formatter('%(message)s')
stream_handler.setFormatter(stream_formatter)
# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
def log_message(message, level="info"):
"""
Log a message.
level options: "info", "warning", "error", "debug".
Messages are sent to both stdout and the log file.
"""
level = level.lower()
if level == "info":
logger.info(message)
elif level == "warning":
logger.warning(message)
elif level == "error":
logger.error(message)
else:
logger.debug(message)
# -------------------------
# Data Output Helper Function
# -------------------------
def save_data(data, filename, output_dir):
"""
Save a DataFrame to a CSV file in a specified output directory.
Logs a message indicating where the data was saved.
"""
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, filename)
data.to_csv(output_file, index=False)
log_message(f"Data saved to {output_file}", level="info")
# -------------------------
# Load All CSV Files into a Dictionary
# -------------------------
# Define the input directory for CSV files
input_dir = r"C:\Users\YourUsername\Documents\data\\"
csv_files = glob.glob(input_dir + "*.csv")
# Dictionary to hold the original DataFrames; keys will be file names
all_dfs = {}
for file in csv_files:
try:
file_name = os.path.basename(file)
# Force all columns to be read as text
df = pd.read_csv(file, dtype=str)
# Remove columns with no name (commonly appear as 'Unnamed: ...')
df = df.loc[:, ~df.columns.str.contains('^Unnamed')]
# Remove rows that are completely empty
df.dropna(how='all', inplace=True)
# Store the cleaned DataFrame
all_dfs[file_name] = df
log_message(f"Loaded file: {file_name}", level="info")
except Exception as e:
log_message(f"Error loading file {file}: {e}", level="error")
# -------------------------
# Process DataFrames for Unique first_name/last_name Pairs
# -------------------------
# Specify the file name prefix to search for
file_prefix = "testXXX_YYY"
# List to hold each file's unique first_name/last_name pairs
unique_dfs = []
for file_name, df in all_dfs.items():
# Only process files with names starting with the desired prefix
if file_name.startswith(file_prefix):
if {'first_name', 'last_name'}.issubset(df.columns):
unique_df = df[['first_name', 'last_name']].drop_duplicates()
# Append the source file name as a new column for traceability
unique_df['source_file'] = file_name
unique_dfs.append(unique_df)
log_message(f"Processed file: {file_name}", level="info")
else:
log_message(f"Skipping file: {file_name} - Missing required columns ('first_name', 'last_name')", level="warning")
else:
log_message(f"Skipping file: {file_name} - File name does not start with '{file_prefix}'", level="info")
# Combine the unique first_name/last_name pairs from all processed files
if unique_dfs:
combined_unique_df = pd.concat(unique_dfs, ignore_index=True)
else:
combined_unique_df = pd.DataFrame()
log_message("No files processed for unique first_name/last_name combinations.", level="warning")
# -------------------------
# Save the Combined Unique Data (Real Data Output)
# -------------------------
# Define the output directory for real data files
data_output_dir = r"C:\Users\YourUsername\Documents\output\data"
save_data(combined_unique_df, "unique_first_name_last_name.csv", data_output_dir)
Saving the original data frame as well as specific columns from a mapping
This function creates two CSV outputs:
- Mapped Output: If a mapping exists for the file, it selects only the specified columns and saves the result with a _mapped.csv suffix.
- All Columns Output: It saves the complete DataFrame with all columns using an _all.csv suffix.
# Example column mapping: keys are file names, values are lists of columns to save.
column_mapping = {
"test_example.csv": ["first_name", "last_name", "amount"]
}
def save_data_both(data, filename, output_dir, col_mapping=None):
"""
Save a DataFrame as two CSV files:
1. One file with columns defined in the mapping (if mapping exists).
2. One file with all columns.
The two files will be saved with suffixes '_mapped' and '_all', respectively.
"""
# Create the output directory if it doesn't exist.
os.makedirs(output_dir, exist_ok=True)
# 1. Save mapped columns if a mapping exists for this file.
if col_mapping and filename in col_mapping:
mapped_cols = col_mapping[filename]
data_mapped = data[mapped_cols]
# Build the output file name with suffix '_mapped'
base_name = os.path.splitext(filename)[0]
output_file_mapped = os.path.join(output_dir, f"{base_name}_mapped.csv")
data_mapped.to_csv(output_file_mapped, index=False)
print(f"Mapped data saved to {output_file_mapped}")
# 2. Save all columns
base_name = os.path.splitext(filename)[0]
output_file_all = os.path.join(output_dir, f"{base_name}_all.csv")
data.to_csv(output_file_all, index=False)
print(f"All data saved to {output_file_all}")
# Example usage:
# Assume we have a DataFrame loaded from a file "test_example.csv"
# Define the output directory
output_dir = r"C:\Users\YourUsername\Documents\output\data"
# Save both mapped and all columns outputs for the file.
save_data_both(df, "test_example.csv", output_dir, col_mapping=column_mapping)
Top comments (0)