File-Based ETL Agent Blueprint
AI agent that builds ETL pipelines from CSV and JSON files: infers schemas, validates data, transforms and cleans, validates output, and generates documentation. Self-contained, no external DB needed.
File-Based ETL Agent
An AI agent that builds end-to-end ETL pipelines from flat files. It infers schemas, validates input, cleans and transforms data, validates output quality, and generates pipeline documentation. Entirely file-based — reads CSVs/JSONs in, writes transformed files out. No database or orchestrator required.
Note:
This agent is designed for data analysts and engineers who need repeatable transformations on file-based data. Point it at a directory of CSVs, describe the target schema, and it builds the pipeline. Works with local files only — no cloud or database dependencies.
Agent File Structure
Setup
Install Dependencies
Install the OpenAI client plus pandas for data operations.
pip install openai pandas
Create config.json
Configure input and output directories.
{
"openai_api_key": "sk-...",
"model": "gpt-4o",
"max_iterations": 10,
"input_directory": "./data/raw",
"output_directory": "./data/processed",
"pipeline_file": "./pipeline.py",
"max_input_rows": 1000
}
Note:
The agent writes transformation code to pipeline_file. Point it at a new file or one you're comfortable overwriting.
Verify
Run the agent on a sample data directory.
python agent.py --task "Clean and deduplicate sales data, output as CSV with consistent date format, remove rows with null revenue"
The agent should analyze input files, generate a pipeline script, run it, and validate the output.
System Prompt
You are a data engineer specializing in file-based ETL pipelines. Your job
is to build, run, and validate data transformation pipelines. Follow this protocol:
1. THOUGHT: What data is available? What does the user need?
2. ACTION: List input files, sample data, infer schemas
3. Design the pipeline: which files to read, what transformations to apply,
how to validate output
4. Write the pipeline as a Python script using pandas
5. Run the pipeline — if it fails, analyze the error and fix the script
6. Validate output: check row counts, null percentages, schema consistency
7. FINAL_PIPELINE: The working pipeline script + validation report + documentation
Rules:
- Use pandas for all transformations — no Spark, no Dask, no external DBs
- The pipeline must be idempotent (running it twice produces the same output)
- Handle common data issues: nulls, duplicates, type mismatches, encoding errors
- Validate that the output matches the user's stated requirements
- Write clear comments in the pipeline script explaining each step
- If input data is > 1000 rows, sample it for analysis but process the full dataset
### Tool Definitions
<ParameterGrid title="Agent Tools" items={[
{ param: "list_input_files", description: "List all CSV and JSON files in the input directory with file sizes and row counts.", values: "none" },
{ param: "sample_data", description: "Read the first N rows of a file plus schema info (column names, dtypes, null counts).", values: "path: string, n_rows?: int (default 50)" },
{ param: "infer_schema", description: "Infer the schema of a file: column names, inferred types, unique value counts, null percentages, sample values.", values: "path: string" },
{ param: "write_pipeline", description: "Write the ETL pipeline script to the configured pipeline file. Overwrites existing content.", values: "code: string" },
{ param: "run_pipeline", description: "Execute the pipeline script and capture stdout, stderr, and exit code.", values: "none" },
{ param: "validate_output", description: "Check output files: row count, null %, schema consistency, dedup check, and comparison against input stats.", values: "none" }
]} />
### Tool Implementation
```python
# tools.py
import os
import json
import subprocess
import pandas as pd
INPUT_DIR = None
OUTPUT_DIR = None
PIPELINE_FILE = None
MAX_INPUT_ROWS = 1000
def list_input_files():
if not os.path.exists(INPUT_DIR):
return f"ERROR: Input directory not found: {INPUT_DIR}"
files = []
for f in sorted(os.listdir(INPUT_DIR)):
full = os.path.join(INPUT_DIR, f)
if f.endswith((".csv", ".json", ".jsonl")):
size_kb = os.path.getsize(full) / 1024
try:
if f.endswith(".csv"):
df = pd.read_csv(full, nrows=0)
elif f.endswith(".jsonl"):
df = pd.read_json(full, lines=True, nrows=0)
else:
df = pd.read_json(full, nrows=0)
cols = list(df.columns)
full_df = pd.read_csv(full) if f.endswith(".csv") else pd.read_json(full, lines=f.endswith(".jsonl"))
files.append(f"{f} | {size_kb:.0f}KB | {len(full_df)} rows | columns: {', '.join(cols[:10])}")
except Exception as e:
files.append(f"{f} | {size_kb:.0f}KB | ERROR: {str(e)[:80]}")
return "\n".join(files) if files else f"No CSV/JSON files in {INPUT_DIR}"
def sample_data(path, n_rows=50):
full = os.path.join(INPUT_DIR, path) if not path.startswith("/") else path
if not os.path.exists(full):
return f"ERROR: File not found: {path}"
try:
# Get full row count without loading all data
total_rows = _count_rows(full)
# Load sample for inspection
df = _read_file(full)
sample = df.head(n_rows)
shape_note = f"{total_rows} rows" if total_rows else f"{len(df)} rows (sampled)"
return f"Shape: ({shape_note}, {len(df.columns)} columns)\nColumns: {list(df.columns)}\nDtypes:\n{df.dtypes.to_string()}\n\nFirst {min(n_rows, len(df))} rows:\n{sample.to_string()}"
except Exception as e:
return f"ERROR reading {path}: {e}"
def infer_schema(path):
full = os.path.join(INPUT_DIR, path) if not path.startswith("/") else path
if not os.path.exists(full):
return f"ERROR: File not found: {path}"
try:
df = _read_file(full)
info = []
for col in df.columns:
null_count = df[col].isnull().sum()
null_pct = (null_count / len(df)) * 100
unique = df[col].nunique()
dtype = str(df[col].dtype)
sample_vals = df[col].dropna().head(3).tolist()
info.append({
"column": col,
"dtype": dtype,
"null_pct": round(null_pct, 1),
"unique_values": unique,
"sample": sample_vals
})
return json.dumps(info, indent=2, default=str)
except Exception as e:
return f"ERROR: {e}"
def write_pipeline(code):
os.makedirs(os.path.dirname(PIPELINE_FILE) if os.path.dirname(PIPELINE_FILE) else ".", exist_ok=True)
with open(PIPELINE_FILE, "w") as f:
f.write(code)
return f"Pipeline written to {PIPELINE_FILE} ({len(code)} bytes)"
def run_pipeline():
try:
result = subprocess.run(
["python", PIPELINE_FILE],
capture_output=True, text=True, timeout=120,
cwd=os.path.dirname(PIPELINE_FILE) or "."
)
output = result.stdout[-4000:] if len(result.stdout) > 4000 else result.stdout
errors = result.stderr[-2000:] if len(result.stderr) > 2000 else result.stderr
return f"Exit code: {result.returncode}\n\nSTDOUT:\n{output}\n\nSTDERR:\n{errors}"
except subprocess.TimeoutExpired:
return "Pipeline timed out after 120s. Check for infinite loops or very large datasets."
def validate_output():
if not os.path.exists(OUTPUT_DIR):
return "ERROR: Output directory does not exist. Pipeline may have failed to create it."
files = [f for f in os.listdir(OUTPUT_DIR) if f.endswith((".csv", ".json", ".jsonl"))]
if not files:
return "WARNING: No output files found. Pipeline may have produced no data."
report = []
for f in sorted(files):
full = os.path.join(OUTPUT_DIR, f)
try:
df = _read_file(full)
nulls = {c: round(df[c].isnull().sum() / len(df) * 100, 1) for c in df.columns}
dups = df.duplicated().sum()
report.append({
"file": f,
"rows": len(df),
"columns": list(df.columns),
"null_percentages": nulls,
"duplicate_rows": int(dups),
"size_kb": round(os.path.getsize(full) / 1024, 1)
})
except Exception as e:
report.append({"file": f, "error": str(e)})
return json.dumps(report, indent=2, default=str)
def _read_file(path):
if path.endswith(".csv"):
return pd.read_csv(path, nrows=MAX_INPUT_ROWS)
elif path.endswith(".jsonl"):
return pd.read_json(path, lines=True, nrows=MAX_INPUT_ROWS)
else:
return pd.read_json(path, nrows=MAX_INPUT_ROWS)
def _count_rows(path):
"""Count rows in a file without loading all data."""
if path.endswith(".csv"):
with open(path, "r") as f:
return sum(1 for _ in f) - 1 # subtract header
elif path.endswith(".jsonl"):
with open(path, "r") as f:
return sum(1 for line in f if line.strip())
else:
df = pd.read_json(path, nrows=0)
return None # JSON arrays can't be counted without full load
Agent Initialization
# agent.py
import json
import argparse
from openai import OpenAI
import tools as agent_tools
TOOL_SCHEMAS = [
{
"type": "function",
"function": {
"name": "list_input_files",
"description": "List CSV/JSON files in the input directory with sizes and row counts",
"parameters": {"type": "object", "properties": {}, "required": []}
}
},
{
"type": "function",
"function": {
"name": "sample_data",
"description": "Read first N rows of a file with schema and dtype info",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"n_rows": {"type": "integer", "default": 50}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "infer_schema",
"description": "Infer schema: column types, null %, unique counts, sample values",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "write_pipeline",
"description": "Write the ETL pipeline script to the configured pipeline file",
"parameters": {
"type": "object",
"properties": {"code": {"type": "string"}},
"required": ["code"]
}
}
},
{
"type": "function",
"function": {
"name": "run_pipeline",
"description": "Execute the pipeline script and return stdout/stderr",
"parameters": {"type": "object", "properties": {}, "required": []}
}
},
{
"type": "function",
"function": {
"name": "validate_output",
"description": "Check output files: row count, null %, duplicates, schema",
"parameters": {"type": "object", "properties": {}, "required": []}
}
}
]
SYSTEM_PROMPT = """You are a data engineer specializing in file-based ETL pipelines.
Your job is to build, run, and validate data transformation pipelines.
Protocol:
1. THOUGHT: What data is available? What does the user need?
2. ACTION: List input files, sample data, infer schemas
3. Design the pipeline: which files to read, what transformations,
how to validate output
4. Write the pipeline as a Python script using pandas
5. Run the pipeline — fix errors until it works
6. Validate output against user requirements
7. FINAL_PIPELINE: working pipeline script + validation report + documentation
Rules:
- Use pandas for all transformations — no external DBs or orchestrators
- Pipeline must be idempotent
- Handle: nulls, duplicates, type mismatches, encoding errors
- Validate output matches user requirements
- Write clear comments in the pipeline
- Sample data for analysis, process full dataset for final run"""
def run_agent(task: str, config: dict):
client = OpenAI(api_key=config["openai_api_key"])
model = config.get("model", "gpt-4o")
agent_tools.INPUT_DIR = config.get("input_directory", "./data/raw")
agent_tools.OUTPUT_DIR = config.get("output_directory", "./data/processed")
agent_tools.PIPELINE_FILE = config.get("pipeline_file", "./pipeline.py")
agent_tools.MAX_INPUT_ROWS = config.get("max_input_rows", 1000)
os.makedirs(agent_tools.OUTPUT_DIR, exist_ok=True)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Build an ETL pipeline for this task: {task}"}
]
for i in range(config.get("max_iterations", 10)):
response = client.chat.completions.create(
model=model,
messages=messages,
tools=TOOL_SCHEMAS,
temperature=0.1
)
msg = response.choices[0].message
messages.append(msg)
if msg.content and "FINAL_PIPELINE:" in msg.content:
return msg.content.split("FINAL_PIPELINE:", 1)[1].strip()
if not msg.tool_calls:
messages.append({
"role": "user",
"content": "Continue. Use tools to analyze data, write the pipeline, run it, and validate. End with FINAL_PIPELINE."
})
continue
for tool_call in msg.tool_calls:
func_name = tool_call.function.name
func_args = json.loads(tool_call.function.arguments)
func = getattr(agent_tools, func_name)
result = func(**func_args)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
return "Agent reached max iterations."
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--task", required=True, help="ETL task description")
parser.add_argument("--config", default="config.json")
args = parser.parse_args()
with open(args.config) as f:
config = json.load(f)
result = run_agent(args.task, config)
print(result)
print(f"\nPipeline saved to: {config.get('pipeline_file', './pipeline.py')}")
print(f"Output files in: {config.get('output_directory', './data/processed')}")
Walkthrough
Cleaning and deduplicating a messy sales dataset.
Agent surveys input files
list_input_files() returns:
- sales_2024.csv | 854KB | 12,430 rows | columns: date, product, revenue, region, rep
- customers.json | 112KB | 2,840 rows | columns: id, name, segment, since
infer_schema("sales_2024.csv") reveals issues: 8% null revenue, mixed date formats (MM/DD/YYYY and YYYY-MM-DD), duplicate rows by product+date+region.
Agent writes the pipeline
write_pipeline(code) creates a script that:
- Parses dates with
pd.to_datetime(..., errors='coerce')handling mixed formats - Drops rows with null revenue
- Deduplicates on
['date', 'product', 'region'] - Joins with customers.json on customer_id
- Outputs clean_sales.csv and summary_stats.json
Runs and fixes the pipeline
run_pipeline() fails on first attempt — pd.merge() raises a KeyError because the CSV column is rep but the script references customer_id. The agent reads the error, corrects the column name, rewrites the script, and runs again. Passes on second attempt.
Validates output
validate_output() confirms:
- clean_sales.csv: 11,238 rows (1,192 duplicates removed), 0% null revenue, consistent date format
- summary_stats.json: revenue by region, top products, monthly trend
Delivers FINAL_PIPELINE
The agent returns the working pipeline script plus a validation report:
FINAL_PIPELINE:
- Input: 12,430 raw rows from sales_2024.csv + 2,840 customers
- Output: 11,238 clean rows in clean_sales.csv
- Removed: 1,192 duplicates, 0 null-revenue rows
- Fixed: mixed date formats, column name mismatch (rep → customer_id)
- Pipeline is idempotent and documented with inline comments
Customization
Pipeline Settings
Values: path relative to agent.py
Values: path relative to agent.py
Values: path to .py file
Values: 100-10000 (default 1000)
Note:
Reusable pipeline. The generated pipeline script is a standalone Python file. After the agent creates it, you can run it directly with python pipeline.py — no agent needed for subsequent runs. This is the key difference from other blueprints: the output is a reusable asset.
Key Takeaway
A file-based ETL agent is most valuable when you have messy datasets that need repeatable cleaning. The agent generates a documented, idempotent pipeline you can commit to your repo. For one-off data analysis, a direct pandas script is faster. Use the agent when the pipeline will run regularly or when non-technical team members need to understand what the transformations do.
Related Articles
Incident Runbook Agent Blueprint
AI agent that reads your on-call runbook, analyzes incident details, classifies severity, matches remediation steps, generates timelines, and drafts postmortems. Self-contained — works with markdown runbooks and pasted error logs.
Data Insights Agent Blueprint
AI agent that analyzes datasets: statistical summaries, correlation analysis, outlier detection, chart generation, and written insight reports. Runs on CSV and JSON files with pandas and matplotlib.
Multi-Agent Orchestrator Blueprint
Manager agent that delegates tasks to specialized workers (researcher, coder, writer). CrewAI-style architecture with task decomposition, delegation, and result aggregation. Self-contained — all agents share one LLM.