Files
leo-claude-mktplace/mcp-servers/data-platform/mcp_server/pandas_tools.py
lmiranda 4ed3ed7e14 fix(data-platform): reset index after filter to prevent extra column
The filter tool was adding an __index_level_0__ column to results
because pandas query() preserves the original index, which gets
converted to a column when storing the DataFrame.

Added .reset_index(drop=True) after query() to drop the preserved
index and create a clean sequential index.

Fixes #203

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 17:25:37 -05:00

501 lines
15 KiB
Python

"""
pandas MCP Tools.
Provides DataFrame operations with Arrow IPC data_ref persistence.
"""
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any, Union
from .data_store import DataStore
from .config import load_config
logger = logging.getLogger(__name__)
class PandasTools:
"""pandas data manipulation tools with data_ref persistence"""
def __init__(self):
self.store = DataStore.get_instance()
config = load_config()
self.max_rows = config.get('max_rows', 100_000)
self.store.set_max_rows(self.max_rows)
def _check_and_store(
self,
df: pd.DataFrame,
name: Optional[str] = None,
source: Optional[str] = None
) -> Dict:
"""Check row limit and store DataFrame if within limits"""
check = self.store.check_row_limit(len(df))
if check['exceeded']:
return {
'error': 'row_limit_exceeded',
**check,
'preview': df.head(100).to_dict(orient='records')
}
data_ref = self.store.store(df, name=name, source=source)
return {
'data_ref': data_ref,
'rows': len(df),
'columns': list(df.columns),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()}
}
async def read_csv(
self,
file_path: str,
name: Optional[str] = None,
chunk_size: Optional[int] = None,
**kwargs
) -> Dict:
"""
Load CSV file into DataFrame.
Args:
file_path: Path to CSV file
name: Optional name for data_ref
chunk_size: If provided, process in chunks
**kwargs: Additional pandas read_csv arguments
Returns:
Dict with data_ref or error info
"""
path = Path(file_path)
if not path.exists():
return {'error': f'File not found: {file_path}'}
try:
if chunk_size:
# Chunked processing - return iterator info
chunks = []
for i, chunk in enumerate(pd.read_csv(path, chunksize=chunk_size, **kwargs)):
chunk_ref = self.store.store(chunk, name=f"{name or 'chunk'}_{i}", source=file_path)
chunks.append({'ref': chunk_ref, 'rows': len(chunk)})
return {
'chunked': True,
'chunks': chunks,
'total_chunks': len(chunks)
}
df = pd.read_csv(path, **kwargs)
return self._check_and_store(df, name=name, source=file_path)
except Exception as e:
logger.error(f"read_csv failed: {e}")
return {'error': str(e)}
async def read_parquet(
self,
file_path: str,
name: Optional[str] = None,
columns: Optional[List[str]] = None
) -> Dict:
"""
Load Parquet file into DataFrame.
Args:
file_path: Path to Parquet file
name: Optional name for data_ref
columns: Optional list of columns to load
Returns:
Dict with data_ref or error info
"""
path = Path(file_path)
if not path.exists():
return {'error': f'File not found: {file_path}'}
try:
table = pq.read_table(path, columns=columns)
df = table.to_pandas()
return self._check_and_store(df, name=name, source=file_path)
except Exception as e:
logger.error(f"read_parquet failed: {e}")
return {'error': str(e)}
async def read_json(
self,
file_path: str,
name: Optional[str] = None,
lines: bool = False,
**kwargs
) -> Dict:
"""
Load JSON/JSONL file into DataFrame.
Args:
file_path: Path to JSON file
name: Optional name for data_ref
lines: If True, read as JSON Lines format
**kwargs: Additional pandas read_json arguments
Returns:
Dict with data_ref or error info
"""
path = Path(file_path)
if not path.exists():
return {'error': f'File not found: {file_path}'}
try:
df = pd.read_json(path, lines=lines, **kwargs)
return self._check_and_store(df, name=name, source=file_path)
except Exception as e:
logger.error(f"read_json failed: {e}")
return {'error': str(e)}
async def to_csv(
self,
data_ref: str,
file_path: str,
index: bool = False,
**kwargs
) -> Dict:
"""
Export DataFrame to CSV file.
Args:
data_ref: Reference to stored DataFrame
file_path: Output file path
index: Whether to include index
**kwargs: Additional pandas to_csv arguments
Returns:
Dict with success status
"""
df = self.store.get_pandas(data_ref)
if df is None:
return {'error': f'DataFrame not found: {data_ref}'}
try:
df.to_csv(file_path, index=index, **kwargs)
return {
'success': True,
'file_path': file_path,
'rows': len(df),
'size_bytes': Path(file_path).stat().st_size
}
except Exception as e:
logger.error(f"to_csv failed: {e}")
return {'error': str(e)}
async def to_parquet(
self,
data_ref: str,
file_path: str,
compression: str = 'snappy'
) -> Dict:
"""
Export DataFrame to Parquet file.
Args:
data_ref: Reference to stored DataFrame
file_path: Output file path
compression: Compression codec
Returns:
Dict with success status
"""
table = self.store.get(data_ref)
if table is None:
return {'error': f'DataFrame not found: {data_ref}'}
try:
pq.write_table(table, file_path, compression=compression)
return {
'success': True,
'file_path': file_path,
'rows': table.num_rows,
'size_bytes': Path(file_path).stat().st_size
}
except Exception as e:
logger.error(f"to_parquet failed: {e}")
return {'error': str(e)}
async def describe(self, data_ref: str) -> Dict:
"""
Get statistical summary of DataFrame.
Args:
data_ref: Reference to stored DataFrame
Returns:
Dict with statistical summary
"""
df = self.store.get_pandas(data_ref)
if df is None:
return {'error': f'DataFrame not found: {data_ref}'}
try:
desc = df.describe(include='all')
info = self.store.get_info(data_ref)
return {
'data_ref': data_ref,
'shape': {'rows': len(df), 'columns': len(df.columns)},
'columns': list(df.columns),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'memory_mb': info.memory_bytes / (1024 * 1024) if info else None,
'null_counts': df.isnull().sum().to_dict(),
'statistics': desc.to_dict()
}
except Exception as e:
logger.error(f"describe failed: {e}")
return {'error': str(e)}
async def head(self, data_ref: str, n: int = 10) -> Dict:
"""
Get first N rows of DataFrame.
Args:
data_ref: Reference to stored DataFrame
n: Number of rows
Returns:
Dict with rows as records
"""
df = self.store.get_pandas(data_ref)
if df is None:
return {'error': f'DataFrame not found: {data_ref}'}
try:
head_df = df.head(n)
return {
'data_ref': data_ref,
'total_rows': len(df),
'returned_rows': len(head_df),
'columns': list(df.columns),
'data': head_df.to_dict(orient='records')
}
except Exception as e:
logger.error(f"head failed: {e}")
return {'error': str(e)}
async def tail(self, data_ref: str, n: int = 10) -> Dict:
"""
Get last N rows of DataFrame.
Args:
data_ref: Reference to stored DataFrame
n: Number of rows
Returns:
Dict with rows as records
"""
df = self.store.get_pandas(data_ref)
if df is None:
return {'error': f'DataFrame not found: {data_ref}'}
try:
tail_df = df.tail(n)
return {
'data_ref': data_ref,
'total_rows': len(df),
'returned_rows': len(tail_df),
'columns': list(df.columns),
'data': tail_df.to_dict(orient='records')
}
except Exception as e:
logger.error(f"tail failed: {e}")
return {'error': str(e)}
async def filter(
self,
data_ref: str,
condition: str,
name: Optional[str] = None
) -> Dict:
"""
Filter DataFrame rows by condition.
Args:
data_ref: Reference to stored DataFrame
condition: pandas query string (e.g., "age > 30 and city == 'NYC'")
name: Optional name for result data_ref
Returns:
Dict with new data_ref for filtered result
"""
df = self.store.get_pandas(data_ref)
if df is None:
return {'error': f'DataFrame not found: {data_ref}'}
try:
filtered = df.query(condition).reset_index(drop=True)
result_name = name or f"{data_ref}_filtered"
return self._check_and_store(
filtered,
name=result_name,
source=f"filter({data_ref}, '{condition}')"
)
except Exception as e:
logger.error(f"filter failed: {e}")
return {'error': str(e)}
async def select(
self,
data_ref: str,
columns: List[str],
name: Optional[str] = None
) -> Dict:
"""
Select specific columns from DataFrame.
Args:
data_ref: Reference to stored DataFrame
columns: List of column names to select
name: Optional name for result data_ref
Returns:
Dict with new data_ref for selected columns
"""
df = self.store.get_pandas(data_ref)
if df is None:
return {'error': f'DataFrame not found: {data_ref}'}
try:
# Validate columns exist
missing = [c for c in columns if c not in df.columns]
if missing:
return {
'error': f'Columns not found: {missing}',
'available_columns': list(df.columns)
}
selected = df[columns]
result_name = name or f"{data_ref}_select"
return self._check_and_store(
selected,
name=result_name,
source=f"select({data_ref}, {columns})"
)
except Exception as e:
logger.error(f"select failed: {e}")
return {'error': str(e)}
async def groupby(
self,
data_ref: str,
by: Union[str, List[str]],
agg: Dict[str, Union[str, List[str]]],
name: Optional[str] = None
) -> Dict:
"""
Group DataFrame and aggregate.
Args:
data_ref: Reference to stored DataFrame
by: Column(s) to group by
agg: Aggregation dict (e.g., {"sales": "sum", "count": "mean"})
name: Optional name for result data_ref
Returns:
Dict with new data_ref for aggregated result
"""
df = self.store.get_pandas(data_ref)
if df is None:
return {'error': f'DataFrame not found: {data_ref}'}
try:
grouped = df.groupby(by).agg(agg).reset_index()
# Flatten column names if multi-level
if isinstance(grouped.columns, pd.MultiIndex):
grouped.columns = ['_'.join(col).strip('_') for col in grouped.columns]
result_name = name or f"{data_ref}_grouped"
return self._check_and_store(
grouped,
name=result_name,
source=f"groupby({data_ref}, by={by})"
)
except Exception as e:
logger.error(f"groupby failed: {e}")
return {'error': str(e)}
async def join(
self,
left_ref: str,
right_ref: str,
on: Optional[Union[str, List[str]]] = None,
left_on: Optional[Union[str, List[str]]] = None,
right_on: Optional[Union[str, List[str]]] = None,
how: str = 'inner',
name: Optional[str] = None
) -> Dict:
"""
Join two DataFrames.
Args:
left_ref: Reference to left DataFrame
right_ref: Reference to right DataFrame
on: Column(s) to join on (if same name in both)
left_on: Left join column(s)
right_on: Right join column(s)
how: Join type ('inner', 'left', 'right', 'outer')
name: Optional name for result data_ref
Returns:
Dict with new data_ref for joined result
"""
left_df = self.store.get_pandas(left_ref)
right_df = self.store.get_pandas(right_ref)
if left_df is None:
return {'error': f'DataFrame not found: {left_ref}'}
if right_df is None:
return {'error': f'DataFrame not found: {right_ref}'}
try:
joined = pd.merge(
left_df, right_df,
on=on, left_on=left_on, right_on=right_on,
how=how
)
result_name = name or f"{left_ref}_{right_ref}_joined"
return self._check_and_store(
joined,
name=result_name,
source=f"join({left_ref}, {right_ref}, how={how})"
)
except Exception as e:
logger.error(f"join failed: {e}")
return {'error': str(e)}
async def list_data(self) -> Dict:
"""
List all stored DataFrames.
Returns:
Dict with list of stored DataFrames and their info
"""
refs = self.store.list_refs()
return {
'count': len(refs),
'total_memory_mb': self.store.total_memory_mb(),
'max_rows_limit': self.max_rows,
'dataframes': refs
}
async def drop_data(self, data_ref: str) -> Dict:
"""
Remove a DataFrame from storage.
Args:
data_ref: Reference to drop
Returns:
Dict with success status
"""
if self.store.drop(data_ref):
return {'success': True, 'dropped': data_ref}
return {'error': f'DataFrame not found: {data_ref}'}