Update "unnamed"
@@ -1,452 +0,0 @@
|
|||||||
# data-platform Plugin Implementation Plan (v4.0.0)
|
|
||||||
|
|
||||||
> **Origin:** [Change V04.0.0: Proposal](Change-V04.0.0:-Proposal)
|
|
||||||
> **Status:** Implemented
|
|
||||||
> **Date:** 2026-01-25
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Overview
|
|
||||||
|
|
||||||
Implement a new `data-platform` plugin for leo-claude-mktplace that addresses data workflow issues encountered in the personal-portfolio project:
|
|
||||||
- Lost data after multiple interactions (solved by Arrow IPC data_ref passing)
|
|
||||||
- dbt 1.9+ syntax deprecation (solved by pre-execution validation with `dbt parse`)
|
|
||||||
- Ungraceful PostgreSQL error handling (solved by SessionStart hook with warnings)
|
|
||||||
|
|
||||||
## Architecture Decisions
|
|
||||||
|
|
||||||
| Decision | Choice |
|
|
||||||
|----------|--------|
|
|
||||||
| Data Passing | Arrow IPC with data_ref |
|
|
||||||
| DB Auth | Environment variables (~/.config/claude/postgres.env) |
|
|
||||||
| dbt Discovery | Auto-detect + explicit override |
|
|
||||||
| dbt Validation | Pre-execution (`dbt parse`) |
|
|
||||||
| Plugin Structure | Single plugin, 3 MCP servers |
|
|
||||||
| Server Location | Root mcp-servers/ |
|
|
||||||
| Memory Management | 100k row limit with chunking |
|
|
||||||
| PostGIS Support | Yes, with geoalchemy2 |
|
|
||||||
| Agent Model | 2 agents (Ingestion + Analysis) |
|
|
||||||
| Commands | Core 6 |
|
|
||||||
| Startup Hook | Graceful DB warning (non-blocking) |
|
|
||||||
| MCP Framework | Manual SDK (following gitea pattern) |
|
|
||||||
|
|
||||||
## File Structure
|
|
||||||
|
|
||||||
```
|
|
||||||
mcp-servers/
|
|
||||||
└── data-platform/
|
|
||||||
├── mcp_server/
|
|
||||||
│ ├── __init__.py
|
|
||||||
│ ├── server.py # Main MCP server with routing
|
|
||||||
│ ├── config.py # Hybrid config (system + project)
|
|
||||||
│ ├── data_store.py # Arrow IPC DataFrame registry
|
|
||||||
│ ├── pandas_tools.py # pandas tool implementations
|
|
||||||
│ ├── postgres_tools.py # PostgreSQL/PostGIS tools
|
|
||||||
│ └── dbt_tools.py # dbt CLI wrapper tools
|
|
||||||
├── requirements.txt
|
|
||||||
├── pyproject.toml
|
|
||||||
└── README.md
|
|
||||||
|
|
||||||
plugins/
|
|
||||||
└── data-platform/
|
|
||||||
├── .claude-plugin/
|
|
||||||
│ └── plugin.json
|
|
||||||
├── .mcp.json
|
|
||||||
├── mcp-servers/
|
|
||||||
│ └── data-platform -> ../../../mcp-servers/data-platform # symlink
|
|
||||||
├── commands/
|
|
||||||
│ ├── ingest.md # /ingest command
|
|
||||||
│ ├── profile.md # /profile command
|
|
||||||
│ ├── schema.md # /schema command
|
|
||||||
│ ├── explain.md # /explain command
|
|
||||||
│ ├── lineage.md # /lineage command
|
|
||||||
│ └── run.md # /run command
|
|
||||||
├── agents/
|
|
||||||
│ ├── data-ingestion.md # Data loading and transformation
|
|
||||||
│ └── data-analysis.md # Exploration and profiling
|
|
||||||
├── hooks/
|
|
||||||
│ └── hooks.json # SessionStart DB check
|
|
||||||
├── README.md
|
|
||||||
└── claude-md-integration.md
|
|
||||||
```
|
|
||||||
|
|
||||||
## Implementation Phases
|
|
||||||
|
|
||||||
### Phase 1: Foundation (Issues #1-2)
|
|
||||||
|
|
||||||
**Files to create:**
|
|
||||||
- `mcp-servers/data-platform/mcp_server/__init__.py`
|
|
||||||
- `mcp-servers/data-platform/mcp_server/config.py`
|
|
||||||
- `mcp-servers/data-platform/mcp_server/data_store.py`
|
|
||||||
- `mcp-servers/data-platform/mcp_server/server.py` (skeleton)
|
|
||||||
- `mcp-servers/data-platform/requirements.txt`
|
|
||||||
- `mcp-servers/data-platform/pyproject.toml`
|
|
||||||
|
|
||||||
**config.py pattern** (from gitea):
|
|
||||||
```python
|
|
||||||
import os
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
def load_config():
|
|
||||||
# System-level credentials
|
|
||||||
system_env = Path.home() / ".config/claude/postgres.env"
|
|
||||||
if system_env.exists():
|
|
||||||
load_dotenv(system_env)
|
|
||||||
|
|
||||||
# Project-level settings
|
|
||||||
project_env = Path.cwd() / ".env"
|
|
||||||
if project_env.exists():
|
|
||||||
load_dotenv(project_env, override=True)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"postgres_url": os.getenv("POSTGRES_URL"),
|
|
||||||
"dbt_project_dir": os.getenv("DBT_PROJECT_DIR"),
|
|
||||||
"dbt_profiles_dir": os.getenv("DBT_PROFILES_DIR"),
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
**data_store.py** (Arrow IPC registry):
|
|
||||||
```python
|
|
||||||
import pyarrow as pa
|
|
||||||
import uuid
|
|
||||||
from typing import Dict, Optional
|
|
||||||
|
|
||||||
class DataStore:
|
|
||||||
_instance = None
|
|
||||||
_dataframes: Dict[str, pa.Table] = {}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_instance(cls):
|
|
||||||
if cls._instance is None:
|
|
||||||
cls._instance = cls()
|
|
||||||
return cls._instance
|
|
||||||
|
|
||||||
def store(self, df: pa.Table, name: Optional[str] = None) -> str:
|
|
||||||
data_ref = name or f"df_{uuid.uuid4().hex[:8]}"
|
|
||||||
self._dataframes[data_ref] = df
|
|
||||||
return data_ref
|
|
||||||
|
|
||||||
def get(self, data_ref: str) -> Optional[pa.Table]:
|
|
||||||
return self._dataframes.get(data_ref)
|
|
||||||
|
|
||||||
def list_refs(self) -> list:
|
|
||||||
return [{"ref": k, "rows": v.num_rows, "cols": v.num_columns}
|
|
||||||
for k, v in self._dataframes.items()]
|
|
||||||
```
|
|
||||||
|
|
||||||
### Phase 2: pandas-mcp Tools (Issue #3)
|
|
||||||
|
|
||||||
**Tools to implement in pandas_tools.py:**
|
|
||||||
|
|
||||||
| Tool | Description |
|
|
||||||
|------|-------------|
|
|
||||||
| `read_csv` | Load CSV with optional chunking |
|
|
||||||
| `read_parquet` | Load Parquet files |
|
|
||||||
| `read_json` | Load JSON/JSONL files |
|
|
||||||
| `to_csv` | Export DataFrame to CSV |
|
|
||||||
| `to_parquet` | Export DataFrame to Parquet |
|
|
||||||
| `describe` | Statistical summary |
|
|
||||||
| `head` | First N rows |
|
|
||||||
| `tail` | Last N rows |
|
|
||||||
| `filter` | Filter rows by condition |
|
|
||||||
| `select` | Select columns |
|
|
||||||
| `groupby` | Group and aggregate |
|
|
||||||
| `join` | Join two DataFrames |
|
|
||||||
| `list_data` | List all stored DataFrames |
|
|
||||||
| `drop_data` | Remove DataFrame from store |
|
|
||||||
|
|
||||||
**Memory management:**
|
|
||||||
```python
|
|
||||||
MAX_ROWS = 100_000
|
|
||||||
|
|
||||||
def read_csv(file_path: str, chunk_size: int = None) -> dict:
|
|
||||||
df = pd.read_csv(file_path)
|
|
||||||
if len(df) > MAX_ROWS:
|
|
||||||
return {
|
|
||||||
"warning": f"DataFrame has {len(df)} rows, exceeds {MAX_ROWS} limit",
|
|
||||||
"suggestion": f"Use chunk_size={MAX_ROWS} for chunked processing",
|
|
||||||
"preview": df.head(100).to_dict()
|
|
||||||
}
|
|
||||||
# Convert to Arrow and store
|
|
||||||
table = pa.Table.from_pandas(df)
|
|
||||||
data_ref = DataStore.get_instance().store(table)
|
|
||||||
return {"data_ref": data_ref, "rows": len(df), "columns": list(df.columns)}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Phase 3: postgres-mcp Tools (Issue #4)
|
|
||||||
|
|
||||||
**Tools to implement in postgres_tools.py:**
|
|
||||||
|
|
||||||
| Tool | Description |
|
|
||||||
|------|-------------|
|
|
||||||
| `pg_connect` | Test connection and return status |
|
|
||||||
| `pg_query` | Execute SELECT, return as data_ref |
|
|
||||||
| `pg_execute` | Execute INSERT/UPDATE/DELETE |
|
|
||||||
| `pg_tables` | List all tables in schema |
|
|
||||||
| `pg_columns` | Get column info for table |
|
|
||||||
| `pg_schemas` | List all schemas |
|
|
||||||
| `st_tables` | List PostGIS-enabled tables |
|
|
||||||
| `st_geometry_type` | Get geometry type of column |
|
|
||||||
| `st_srid` | Get SRID of geometry column |
|
|
||||||
| `st_extent` | Get bounding box of geometries |
|
|
||||||
|
|
||||||
**asyncpg implementation:**
|
|
||||||
```python
|
|
||||||
import asyncpg
|
|
||||||
from geoalchemy2 import Geometry
|
|
||||||
|
|
||||||
async def pg_query(query: str, params: list = None) -> dict:
|
|
||||||
config = load_config()
|
|
||||||
conn = await asyncpg.connect(config["postgres_url"])
|
|
||||||
try:
|
|
||||||
rows = await conn.fetch(query, *(params or []))
|
|
||||||
df = pd.DataFrame([dict(r) for r in rows])
|
|
||||||
if len(df) > MAX_ROWS:
|
|
||||||
return {"warning": "Result truncated", "data_ref": store_truncated(df)}
|
|
||||||
table = pa.Table.from_pandas(df)
|
|
||||||
data_ref = DataStore.get_instance().store(table)
|
|
||||||
return {"data_ref": data_ref, "rows": len(df)}
|
|
||||||
finally:
|
|
||||||
await conn.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
### Phase 4: dbt-mcp Tools (Issue #5)
|
|
||||||
|
|
||||||
**Tools to implement in dbt_tools.py:**
|
|
||||||
|
|
||||||
| Tool | Description |
|
|
||||||
|------|-------------|
|
|
||||||
| `dbt_parse` | Validate project (pre-execution) |
|
|
||||||
| `dbt_run` | Run models with selection |
|
|
||||||
| `dbt_test` | Run tests |
|
|
||||||
| `dbt_build` | Run + test |
|
|
||||||
| `dbt_compile` | Compile SQL without executing |
|
|
||||||
| `dbt_ls` | List resources |
|
|
||||||
| `dbt_docs_generate` | Generate documentation |
|
|
||||||
| `dbt_lineage` | Get model dependencies |
|
|
||||||
|
|
||||||
**Pre-execution validation pattern:**
|
|
||||||
```python
|
|
||||||
import subprocess
|
|
||||||
import json
|
|
||||||
|
|
||||||
def dbt_run(select: str = None, exclude: str = None) -> dict:
|
|
||||||
config = load_config()
|
|
||||||
project_dir = config.get("dbt_project_dir") or find_dbt_project()
|
|
||||||
|
|
||||||
# ALWAYS validate first
|
|
||||||
parse_result = subprocess.run(
|
|
||||||
["dbt", "parse", "--project-dir", project_dir],
|
|
||||||
capture_output=True, text=True
|
|
||||||
)
|
|
||||||
if parse_result.returncode != 0:
|
|
||||||
return {
|
|
||||||
"error": "dbt parse failed - fix issues before running",
|
|
||||||
"details": parse_result.stderr,
|
|
||||||
"suggestion": "Check for deprecated syntax (dbt 1.9+)"
|
|
||||||
}
|
|
||||||
|
|
||||||
# Execute run
|
|
||||||
cmd = ["dbt", "run", "--project-dir", project_dir]
|
|
||||||
if select:
|
|
||||||
cmd.extend(["--select", select])
|
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
||||||
return {"success": result.returncode == 0, "output": result.stdout}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Phase 5: Plugin Wrapper (Issue #6)
|
|
||||||
|
|
||||||
**plugins/data-platform/.claude-plugin/plugin.json:**
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"name": "data-platform",
|
|
||||||
"version": "1.0.0",
|
|
||||||
"description": "Data engineering tools with pandas, PostgreSQL/PostGIS, and dbt integration",
|
|
||||||
"author": "Leo Miranda",
|
|
||||||
"license": "MIT",
|
|
||||||
"hooks": "hooks/hooks.json",
|
|
||||||
"commands": "commands/",
|
|
||||||
"agents": "agents/",
|
|
||||||
"mcp": ".mcp.json"
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
**plugins/data-platform/.mcp.json:**
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"mcpServers": {
|
|
||||||
"data-platform": {
|
|
||||||
"type": "stdio",
|
|
||||||
"command": "${CLAUDE_PLUGIN_ROOT}/mcp-servers/data-platform/.venv/bin/python",
|
|
||||||
"args": ["-m", "mcp_server.server"],
|
|
||||||
"cwd": "${CLAUDE_PLUGIN_ROOT}/mcp-servers/data-platform"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
**plugins/data-platform/hooks/hooks.json:**
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"hooks": [
|
|
||||||
{
|
|
||||||
"event": "SessionStart",
|
|
||||||
"type": "command",
|
|
||||||
"command": ["${CLAUDE_PLUGIN_ROOT}/mcp-servers/data-platform/.venv/bin/python", "-c", "from mcp_server.postgres_tools import check_connection; check_connection()"],
|
|
||||||
"timeout": 5000,
|
|
||||||
"onError": "warn"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
**Agents:**
|
|
||||||
|
|
||||||
`agents/data-ingestion.md`:
|
|
||||||
```markdown
|
|
||||||
# Data Ingestion Agent
|
|
||||||
|
|
||||||
You are a data ingestion specialist. Your role is to help users load, transform, and prepare data for analysis.
|
|
||||||
|
|
||||||
## Available Tools
|
|
||||||
- pandas: read_csv, read_parquet, read_json, filter, select, groupby, join
|
|
||||||
- postgres: pg_query, pg_execute
|
|
||||||
|
|
||||||
## Workflow
|
|
||||||
1. Understand the data source and format
|
|
||||||
2. Load data with appropriate chunking for large files
|
|
||||||
3. Transform as needed (filter, select, aggregate)
|
|
||||||
4. Store results with meaningful data_ref names
|
|
||||||
```
|
|
||||||
|
|
||||||
`agents/data-analysis.md`:
|
|
||||||
```markdown
|
|
||||||
# Data Analysis Agent
|
|
||||||
|
|
||||||
You are a data analysis specialist. Your role is to help users explore, profile, and understand their data.
|
|
||||||
|
|
||||||
## Available Tools
|
|
||||||
- pandas: describe, head, tail, list_data
|
|
||||||
- postgres: pg_tables, pg_columns
|
|
||||||
- dbt: dbt_lineage, dbt_docs_generate
|
|
||||||
|
|
||||||
## Workflow
|
|
||||||
1. List available data (list_data or pg_tables)
|
|
||||||
2. Profile data structure and statistics
|
|
||||||
3. Identify patterns and anomalies
|
|
||||||
4. Provide insights and recommendations
|
|
||||||
```
|
|
||||||
|
|
||||||
**Commands:**
|
|
||||||
|
|
||||||
| Command | File | Description |
|
|
||||||
|---------|------|-------------|
|
|
||||||
| `/ingest` | commands/ingest.md | Load data from files or database |
|
|
||||||
| `/profile` | commands/profile.md | Generate data profile and statistics |
|
|
||||||
| `/schema` | commands/schema.md | Show database/DataFrame schema |
|
|
||||||
| `/explain` | commands/explain.md | Explain dbt model lineage |
|
|
||||||
| `/lineage` | commands/lineage.md | Visualize data dependencies |
|
|
||||||
| `/run` | commands/run.md | Execute dbt models |
|
|
||||||
|
|
||||||
### Phase 6: Documentation & Integration
|
|
||||||
|
|
||||||
**Files to update:**
|
|
||||||
- `.claude-plugin/marketplace.json` - Add data-platform plugin entry
|
|
||||||
- `CHANGELOG.md` - Add v4.0.0 section under [Unreleased]
|
|
||||||
- `README.md` - Update plugin table
|
|
||||||
|
|
||||||
**Files to create:**
|
|
||||||
- `plugins/data-platform/README.md`
|
|
||||||
- `plugins/data-platform/claude-md-integration.md`
|
|
||||||
- `mcp-servers/data-platform/README.md`
|
|
||||||
|
|
||||||
## Sprint Structure (projman)
|
|
||||||
|
|
||||||
**Milestone:** Sprint 1 - data-platform Plugin (v4.0.0)
|
|
||||||
|
|
||||||
### Gitea Issues to Create
|
|
||||||
|
|
||||||
| # | Title | Labels | Effort |
|
|
||||||
|---|-------|--------|--------|
|
|
||||||
| 1 | [Sprint 01] feat: MCP server foundation and config | Type/Feature, Priority/High, Complexity/Medium, Effort/M, Tech/Python, Component/Backend | 1-2 days |
|
|
||||||
| 2 | [Sprint 01] feat: Arrow IPC data registry with memory limits | Type/Feature, Priority/High, Complexity/Medium, Effort/M, Tech/Python, Component/Backend | 1-2 days |
|
|
||||||
| 3 | [Sprint 01] feat: pandas-mcp core data operations (14 tools) | Type/Feature, Priority/High, Complexity/Complex, Effort/L, Tech/Python, Component/Backend | 3-5 days |
|
|
||||||
| 4 | [Sprint 01] feat: postgres-mcp database tools with PostGIS | Type/Feature, Priority/High, Complexity/Complex, Effort/L, Tech/Python, Tech/PostgreSQL, Component/Database | 3-5 days |
|
|
||||||
| 5 | [Sprint 01] feat: dbt-mcp build tools with pre-validation | Type/Feature, Priority/High, Complexity/Complex, Effort/L, Tech/Python, Component/Backend | 3-5 days |
|
|
||||||
| 6 | [Sprint 01] feat: Plugin wrapper, commands, and agents | Type/Feature, Priority/Medium, Complexity/Medium, Effort/M, Component/Docs | 1-2 days |
|
|
||||||
| 7 | [Sprint 01] docs: Documentation and marketplace integration | Type/Documentation, Priority/Medium, Complexity/Simple, Effort/S, Component/Docs | 2-4 hours |
|
|
||||||
|
|
||||||
### Issue Dependencies
|
|
||||||
|
|
||||||
```
|
|
||||||
#1 (foundation) ─┬─> #2 (data registry)
|
|
||||||
│
|
|
||||||
├─> #3 (pandas-mcp) ──┐
|
|
||||||
│ │
|
|
||||||
├─> #4 (postgres-mcp) ├─> #6 (plugin wrapper) ─> #7 (docs)
|
|
||||||
│ │
|
|
||||||
└─> #5 (dbt-mcp) ─────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
**Parallel Execution Batches:**
|
|
||||||
1. Batch 1: #1 (foundation)
|
|
||||||
2. Batch 2: #2, #3, #4, #5 (can run in parallel after foundation)
|
|
||||||
3. Batch 3: #6 (plugin wrapper - needs all tools complete)
|
|
||||||
4. Batch 4: #7 (docs - final)
|
|
||||||
|
|
||||||
## Verification Steps
|
|
||||||
|
|
||||||
1. **MCP Server starts:**
|
|
||||||
```bash
|
|
||||||
cd mcp-servers/data-platform
|
|
||||||
python -m venv .venv
|
|
||||||
source .venv/bin/activate
|
|
||||||
pip install -r requirements.txt
|
|
||||||
python -m mcp_server.server
|
|
||||||
```
|
|
||||||
|
|
||||||
2. **Tools are registered:**
|
|
||||||
- Start Claude Code in a test project
|
|
||||||
- Run `/ingest` command
|
|
||||||
- Verify MCP tools appear in tool list
|
|
||||||
|
|
||||||
3. **Data persistence:**
|
|
||||||
- Load a CSV file with `/ingest`
|
|
||||||
- Run multiple commands referencing the data_ref
|
|
||||||
- Verify data persists across tool calls
|
|
||||||
|
|
||||||
4. **PostgreSQL connection:**
|
|
||||||
- Configure `~/.config/claude/postgres.env`
|
|
||||||
- Start new session
|
|
||||||
- Verify SessionStart hook shows connection status (warning if unavailable)
|
|
||||||
|
|
||||||
5. **dbt validation:**
|
|
||||||
- Run `/run` on a dbt project with deprecated syntax
|
|
||||||
- Verify pre-execution validation catches issues
|
|
||||||
- Fix syntax and re-run successfully
|
|
||||||
|
|
||||||
6. **Validation script:**
|
|
||||||
```bash
|
|
||||||
./scripts/validate-marketplace.sh
|
|
||||||
```
|
|
||||||
|
|
||||||
## Dependencies
|
|
||||||
|
|
||||||
```
|
|
||||||
# requirements.txt
|
|
||||||
mcp>=1.0.0
|
|
||||||
pandas>=2.0.0
|
|
||||||
pyarrow>=14.0.0
|
|
||||||
asyncpg>=0.29.0
|
|
||||||
geoalchemy2>=0.14.0
|
|
||||||
python-dotenv>=1.0.0
|
|
||||||
dbt-core>=1.9.0
|
|
||||||
dbt-postgres>=1.9.0
|
|
||||||
```
|
|
||||||
|
|
||||||
## Out of Scope (v4.1.0+)
|
|
||||||
|
|
||||||
- Integration with projman sprint tracking
|
|
||||||
- Cross-plugin DataFrame sharing
|
|
||||||
- Visualization components (deferred to v5.0.0)
|
|
||||||
- Advanced dbt features (seeds, snapshots, exposures)
|
|
||||||
978
unnamed.md
978
unnamed.md
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user