Skip to main content
This is an example implementation of a SQL execution review workflow using Humancheck. This demonstrates how to integrate human review for database operations, but Humancheck can be used for any system operation that requires human approval.

Overview

The SQL execution review workflow:
  1. Agent generates SQL query
  2. System analyzes query for risk (destructive operations, data scope, etc.)
  3. If risky, request human review
  4. Reviewer approves, rejects, or modifies query
  5. System executes approved query

Implementation

Basic SQL Execution Review

import httpx
import asyncio
import re

async def execute_sql_with_review(query, database="production"):
    """Execute SQL query with human review for risky operations."""
    async with httpx.AsyncClient() as client:
        # Analyze query for risk
        risk_analysis = analyze_sql_risk(query)
        
        # Determine if review is needed
        if risk_analysis["requires_review"]:
            # Request human review
            response = await client.post(
        "https://api.humancheck.dev/reviews",
        headers={
            "Authorization": "Bearer your-api-key-here",
            "Content-Type": "application/json"
        },
        json={
                    "task_type": "sql_execution",
                    "proposed_action": f"Execute SQL query on {database} database",
                    "agent_reasoning": f"Query contains {risk_analysis['risk_level']} risk operations: {', '.join(risk_analysis['risks'])}",
                    "confidence_score": 1.0 - risk_analysis["risk_score"],  # Lower confidence = higher risk
                    "urgency": "high" if risk_analysis["risk_level"] == "critical" else "medium",
                    "metadata": {
                        "query": query,
                        "database": database,
                        "risk_level": risk_analysis["risk_level"],
                        "risk_score": risk_analysis["risk_score"],
                        "risks": risk_analysis["risks"],
                        "estimated_affected_rows": risk_analysis.get("affected_rows"),
                        "query_type": risk_analysis["query_type"]
                    },
                    "blocking": True  # Wait for approval before executing
                }
            )
            review = response.json()
            
            # Get decision
            decision = review.get("decision")
            if not decision:
                decision_response = await client.get(
                    f"https://api.humancheck.dev/reviews/{review['id']}/decision"
                )
                decision = decision_response.json()
            
            # Process decision
            if decision["decision_type"] == "approve":
                # Execute query
                result = await execute_sql(query, database)
                return {"status": "executed", "result": result}
            elif decision["decision_type"] == "modify":
                # Execute modified query
                modified_query = decision.get("modified_action") or extract_query_from_action(decision.get("modified_action", ""))
                result = await execute_sql(modified_query, database)
                return {"status": "executed", "result": result, "modified": True}
            else:
                return {"status": "rejected", "reason": decision.get("notes")}
        else:
            # Safe to execute without review
            result = await execute_sql(query, database)
            return {"status": "executed", "result": result, "auto_approved": True}

def analyze_sql_risk(query):
    """Analyze SQL query for risk factors."""
    query_upper = query.upper().strip()
    
    risks = []
    risk_score = 0.0
    query_type = "SELECT"
    
    # Check for destructive operations
    destructive_ops = {
        "DELETE": 1.0,
        "DROP": 1.0,
        "TRUNCATE": 0.9,
        "ALTER": 0.8,
        "UPDATE": 0.7,
        "INSERT": 0.3
    }
    
    for op, risk in destructive_ops.items():
        if op in query_upper:
            risks.append(op)
            risk_score = max(risk_score, risk)
            query_type = op
    
    # Check for WHERE clause (affects row count)
    if "WHERE" not in query_upper and query_type in ["DELETE", "UPDATE"]:
        risks.append("NO_WHERE_CLAUSE")
        risk_score = min(1.0, risk_score + 0.3)
    
    # Check for LIMIT clause (safety measure)
    if "LIMIT" not in query_upper and query_type in ["DELETE", "UPDATE"]:
        risks.append("NO_LIMIT_CLAUSE")
        risk_score = min(1.0, risk_score + 0.2)
    
    # Check for schema changes
    if any(keyword in query_upper for keyword in ["DROP TABLE", "DROP DATABASE", "ALTER TABLE"]):
        risks.append("SCHEMA_CHANGE")
        risk_score = 1.0
    
    # Determine risk level
    if risk_score >= 0.8:
        risk_level = "critical"
    elif risk_score >= 0.5:
        risk_level = "high"
    elif risk_score >= 0.3:
        risk_level = "medium"
    else:
        risk_level = "low"
    
    # Estimate affected rows (simplified)
    affected_rows = estimate_affected_rows(query)
    
    return {
        "requires_review": risk_score >= 0.3,
        "risk_level": risk_level,
        "risk_score": risk_score,
        "risks": risks,
        "query_type": query_type,
        "affected_rows": affected_rows
    }

def estimate_affected_rows(query):
    """Estimate number of affected rows (simplified)."""
    # In production, you'd run EXPLAIN or similar
    # This is a simplified example
    if "WHERE" not in query.upper():
        return "potentially all rows"
    return "unknown (depends on WHERE conditions)"

def extract_query_from_action(action_string):
    """Extract SQL query from modified action string."""
    # Extract query from action string
    # This is a simplified example
    match = re.search(r'```sql\n(.*?)\n```', action_string, re.DOTALL)
    if match:
        return match.group(1)
    return action_string

Advanced Features

Query Validation

async def validate_and_execute_sql(query, database):
    """Validate query before review."""
    # Syntax validation
    syntax_valid = await validate_sql_syntax(query)
    if not syntax_valid:
        return {"error": "Invalid SQL syntax"}
    
    # Explain query (dry run)
    explain_result = await explain_query(query, database)
    
    # Check estimated cost/impact
    if explain_result["cost"] > 1000:  # High cost threshold
        # Request review even for SELECT queries
        return await execute_sql_with_review(query, database)
    
    return await execute_sql_with_review(query, database)

Read-Only Mode

async def execute_sql_safe(query, database):
    """Execute SQL in read-only mode for SELECT queries."""
    query_upper = query.upper().strip()
    
    # Allow SELECT queries without review
    if query_upper.startswith("SELECT"):
        return await execute_sql(query, database, read_only=True)
    
    # All other queries require review
    return await execute_sql_with_review(query, database)

Transaction Support

async def execute_transaction_with_review(queries, database):
    """Execute multiple queries as a transaction with review."""
    # Analyze all queries
    risk_analysis = analyze_transaction_risk(queries)
    
    if risk_analysis["requires_review"]:
        response = await client.post(
        "https://api.humancheck.dev/reviews",
        headers={
            "Authorization": "Bearer your-api-key-here",
            "Content-Type": "application/json"
        },
        json={
                "task_type": "sql_execution",
                "proposed_action": f"Execute transaction with {len(queries)} queries",
                "agent_reasoning": f"Transaction contains {risk_analysis['risk_level']} operations",
                "metadata": {
                    "queries": queries,
                    "query_count": len(queries),
                    "database": database,
                    "risk_level": risk_analysis["risk_level"],
                    "is_transaction": True
                },
                "blocking": True
            }
        )
        review = response.json()
        decision = review.get("decision")
        
        if decision["decision_type"] == "approve":
            # Execute as transaction
            return await execute_transaction(queries, database)

Routing Rules

Route SQL reviews to database administrators:
# Route destructive operations to DBAs
await client.post(
        "https://api.humancheck.dev/reviews",
        headers={
            "Authorization": "Bearer your-api-key-here",
            "Content-Type": "application/json"
        },
        json={
        "name": "Destructive SQL to DBA team",
        "organization_id": 1,
        "priority": 200,
        "conditions": {
            "task_type": {"operator": "=", "value": "sql_execution"},
            "metadata.risk_level": {"operator": "in", "value": ["high", "critical"]}
        },
        "assign_to_team_id": 4,  # DBA team
        "is_active": True
    }
)

Dashboard Integration

The SQL execution request appears in the dashboard with:
  • SQL query (with syntax highlighting)
  • Database name
  • Risk level and score
  • Risk factors identified
  • Estimated affected rows
  • Query type (SELECT, DELETE, UPDATE, etc.)
Reviewers can:
  • ✅ Approve query for execution
  • ❌ Reject with reason
  • ✏️ Modify query (add WHERE clause, LIMIT, etc.) before approving

Best Practices

  1. Always review destructive operations: DELETE, DROP, TRUNCATE should always require review
  2. Check for WHERE clauses: Queries without WHERE clauses are dangerous
  3. Add LIMIT clauses: Encourage LIMIT clauses for safety
  4. Provide context: Show why the query is needed
  5. Use read-only mode: For SELECT queries, use read-only database connections
  6. Log all executions: Maintain audit trail of all SQL executions
  7. Test queries first: Use EXPLAIN or similar to show query plan

Security Considerations

  • Principle of least privilege: Use database users with minimal permissions
  • Parameterized queries: Always use parameterized queries to prevent SQL injection
  • Query timeouts: Set timeouts to prevent long-running queries
  • Connection limits: Limit number of concurrent connections
  • Backup before destructive ops: Consider requiring backups before destructive operations

Next Steps