import httpx
import asyncio
import re
async def execute_sql_with_review(query, database="production"):
"""Execute SQL query with human review for risky operations."""
async with httpx.AsyncClient() as client:
# Analyze query for risk
risk_analysis = analyze_sql_risk(query)
# Determine if review is needed
if risk_analysis["requires_review"]:
# Request human review
response = await client.post(
"https://api.humancheck.dev/reviews",
headers={
"Authorization": "Bearer your-api-key-here",
"Content-Type": "application/json"
},
json={
"task_type": "sql_execution",
"proposed_action": f"Execute SQL query on {database} database",
"agent_reasoning": f"Query contains {risk_analysis['risk_level']} risk operations: {', '.join(risk_analysis['risks'])}",
"confidence_score": 1.0 - risk_analysis["risk_score"], # Lower confidence = higher risk
"urgency": "high" if risk_analysis["risk_level"] == "critical" else "medium",
"metadata": {
"query": query,
"database": database,
"risk_level": risk_analysis["risk_level"],
"risk_score": risk_analysis["risk_score"],
"risks": risk_analysis["risks"],
"estimated_affected_rows": risk_analysis.get("affected_rows"),
"query_type": risk_analysis["query_type"]
},
"blocking": True # Wait for approval before executing
}
)
review = response.json()
# Get decision
decision = review.get("decision")
if not decision:
decision_response = await client.get(
f"https://api.humancheck.dev/reviews/{review['id']}/decision"
)
decision = decision_response.json()
# Process decision
if decision["decision_type"] == "approve":
# Execute query
result = await execute_sql(query, database)
return {"status": "executed", "result": result}
elif decision["decision_type"] == "modify":
# Execute modified query
modified_query = decision.get("modified_action") or extract_query_from_action(decision.get("modified_action", ""))
result = await execute_sql(modified_query, database)
return {"status": "executed", "result": result, "modified": True}
else:
return {"status": "rejected", "reason": decision.get("notes")}
else:
# Safe to execute without review
result = await execute_sql(query, database)
return {"status": "executed", "result": result, "auto_approved": True}
def analyze_sql_risk(query):
"""Analyze SQL query for risk factors."""
query_upper = query.upper().strip()
risks = []
risk_score = 0.0
query_type = "SELECT"
# Check for destructive operations
destructive_ops = {
"DELETE": 1.0,
"DROP": 1.0,
"TRUNCATE": 0.9,
"ALTER": 0.8,
"UPDATE": 0.7,
"INSERT": 0.3
}
for op, risk in destructive_ops.items():
if op in query_upper:
risks.append(op)
risk_score = max(risk_score, risk)
query_type = op
# Check for WHERE clause (affects row count)
if "WHERE" not in query_upper and query_type in ["DELETE", "UPDATE"]:
risks.append("NO_WHERE_CLAUSE")
risk_score = min(1.0, risk_score + 0.3)
# Check for LIMIT clause (safety measure)
if "LIMIT" not in query_upper and query_type in ["DELETE", "UPDATE"]:
risks.append("NO_LIMIT_CLAUSE")
risk_score = min(1.0, risk_score + 0.2)
# Check for schema changes
if any(keyword in query_upper for keyword in ["DROP TABLE", "DROP DATABASE", "ALTER TABLE"]):
risks.append("SCHEMA_CHANGE")
risk_score = 1.0
# Determine risk level
if risk_score >= 0.8:
risk_level = "critical"
elif risk_score >= 0.5:
risk_level = "high"
elif risk_score >= 0.3:
risk_level = "medium"
else:
risk_level = "low"
# Estimate affected rows (simplified)
affected_rows = estimate_affected_rows(query)
return {
"requires_review": risk_score >= 0.3,
"risk_level": risk_level,
"risk_score": risk_score,
"risks": risks,
"query_type": query_type,
"affected_rows": affected_rows
}
def estimate_affected_rows(query):
"""Estimate number of affected rows (simplified)."""
# In production, you'd run EXPLAIN or similar
# This is a simplified example
if "WHERE" not in query.upper():
return "potentially all rows"
return "unknown (depends on WHERE conditions)"
def extract_query_from_action(action_string):
"""Extract SQL query from modified action string."""
# Extract query from action string
# This is a simplified example
match = re.search(r'```sql\n(.*?)\n```', action_string, re.DOTALL)
if match:
return match.group(1)
return action_string