How to Design Self-Reflective Dual-Agent Governance Systems with Constitutional AI for Secure and Compliant Financial Operations

how-to-design-self-reflective-dual-agent-governance-systems-with-constitutional-ai-for-secure-and-compliant-financial-operations

Source: MarkTechPost

In this tutorial, we implement a dual-agent governance system that applies Constitutional AI principles to financial operations. We demonstrate how we separate execution and oversight by pairing a Worker Agent that performs financial actions with an Auditor Agent that enforces policy, safety, and compliance. By encoding governance rules directly into a formal constitution and combining rule-based checks with AI-assisted reasoning, we can build systems that are self-reflective, auditable, and resilient to risky or non-compliant behavior in high-stakes financial workflows. Check out the FULL CODES here.

!pip install -q pydantic anthropic python-dotenv   import json import re from typing import List, Dict, Any, Optional, Literal from pydantic import BaseModel, Field, validator from enum import Enum from datetime import datetime import os

We install and import the core libraries required to structure, validate, and govern our agent-based system. We rely on Pydantic for strongly typed data models, enums, and validation, while standard Python utilities handle timestamps, parsing, and environment configuration. Check out the FULL CODES here.

class PolicyViolationType(str, Enum):    """Types of policy violations"""    PII_EXPOSURE = "pii_exposure"    BUDGET_EXCEEDED = "budget_exceeded"    UNAUTHORIZED_ACTION = "unauthorized_action"    MISSING_JUSTIFICATION = "missing_justification"    SUSPICIOUS_PATTERN = "suspicious_pattern"   class SafetyPolicy(BaseModel):    """Individual safety policy rule"""    name: str    description: str    severity: Literal["low", "medium", "high", "critical"]    check_function: str    class Constitution(BaseModel):    """The 'Constitution' - A set of rules that govern agent behavior"""    policies: List[SafetyPolicy]    max_transaction_amount: float = 10000.0    require_approval_above: float = 5000.0    allowed_pii_fields: List[str] = ["name", "account_id"]       def get_policy_by_name(self, name: str) -> Optional[SafetyPolicy]:        return next((p for p in self.policies if p.name == name), None)   FINANCIAL_CONSTITUTION = Constitution(    policies=[        SafetyPolicy(            name="PII Protection",            description="Must not expose sensitive PII (SSN, full credit card, passwords)",            severity="critical",            check_function="Scan for SSN patterns, credit card numbers, passwords"        ),        SafetyPolicy(            name="Budget Limits",            description="Transactions must not exceed predefined budget limits",            severity="high",            check_function="Check if transaction amount exceeds max_transaction_amount"        ),        SafetyPolicy(            name="Action Authorization",            description="Only pre-approved action types are allowed",            severity="high",            check_function="Verify action type is in approved list"        ),        SafetyPolicy(            name="Justification Required",            description="All transactions above threshold must have justification",            severity="medium",            check_function="Check for justification field in high-value transactions"        ),        SafetyPolicy(            name="Pattern Detection",            description="Detect suspicious patterns (multiple rapid transactions, round numbers)",            severity="medium",            check_function="Analyze transaction patterns for anomalies"        )    ],    max_transaction_amount=10000.0,    require_approval_above=5000.0 ) 

We define the core constitutional framework that governs agent behavior by formalizing policy types, severities, and enforcement rules. We encode financial safety constraints such as PII protection, budget limits, authorization checks, and justification requirements as first-class, machine-readable policies. Check out the FULL CODES here.

class FinancialRequest(BaseModel):    """Input request to the Worker Agent"""    action: str     amount: Optional[float] = None    recipient: Optional[str] = None    description: str    justification: Optional[str] = None    metadata: Dict[str, Any] = Field(default_factory=dict)   class WorkerOutput(BaseModel):    """Output from the Worker Agent"""    request_id: str    action_taken: str    details: Dict[str, Any]    raw_response: str    timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())   class PolicyViolation(BaseModel):    """Detected policy violation"""    policy_name: str    violation_type: PolicyViolationType    severity: str    description: str    suggested_fix: Optional[str] = None   class AuditResult(BaseModel):    """Result from the Auditor Agent"""    approved: bool    violations: List[PolicyViolation] = Field(default_factory=list)    risk_score: float  # 0-100    feedback: str    revision_needed: bool       @classmethod    def validate_risk_score(cls, v):        if isinstance(v, (int, float)):            return max(0.0, min(100.0, v))        return v

We define strongly typed data models that structure how financial requests, agent outputs, and audit findings flow through the system. We use these schemas to ensure every action, decision, and violation is captured in a consistent, machine-validated format with full traceability. Check out the FULL CODES here.

class MockAIClient:    """Simulates the Anthropic API for this tutorial"""       def __init__(self):        self.call_count = 0       def messages_create(self, model: str, max_tokens: int, messages: List[Dict]) -> Any:        """Simulate API call"""        self.call_count += 1        user_msg = messages[-1]["content"]               if "WORKER AGENT" in user_msg or "financial request" in user_msg.lower():            return self._worker_response(user_msg)               elif "AUDITOR AGENT" in user_msg or "audit" in user_msg.lower():            return self._auditor_response(user_msg)               return self._default_response()       def _worker_response(self, msg: str) -> Any:        """Simulate worker agent processing a request"""               amount_match = re.search(r'$?(d+(?:,d{3})*(?:.d{2})?)', msg)        amount = float(amount_match.group(1).replace(',', '')) if amount_match else 0               if 'transfer' in msg.lower():            action = 'transfer'        elif 'payment' in msg.lower() or 'pay' in msg.lower():            action = 'payment'        elif 'report' in msg.lower():            action = 'report'        else:            action = 'general_query'               response = {            "action_taken": action,            "amount": amount,            "status": "completed",            "recipient": "John Doe" if amount > 0 else None,            "account_id": "ACC-12345",            "timestamp": datetime.now().isoformat()        }               if amount > 5000:            response["ssn"] = "123-45-6789"                if amount > 8000:            response["credit_card"] = "4532-1234-5678-9010"                class MockResponse:            def __init__(self, content):                self.content = [type('obj', (object,), {                    'type': 'text',                    'text': json.dumps(content, indent=2)                })]               return MockResponse(response)       def _auditor_response(self, msg: str) -> Any:        """Simulate auditor agent checking policies"""               violations = []               if 'ssn' in msg.lower() or re.search(r'd{3}-d{2}-d{4}', msg):            violations.append({                "policy": "PII Protection",                "type": "pii_exposure",                "severity": "critical",                "detail": "SSN detected in output"            })               if 'credit_card' in msg.lower() or re.search(r'd{4}-d{4}-d{4}-d{4}', msg):            violations.append({                "policy": "PII Protection",                "type": "pii_exposure",                "severity": "critical",                "detail": "Credit card number detected"            })               amount_match = re.search(r'"amount":s*(d+(?:.d+)?)', msg)        if amount_match:            amount = float(amount_match.group(1))            if amount > 10000:                violations.append({                    "policy": "Budget Limits",                    "type": "budget_exceeded",                    "severity": "high",                    "detail": f"Amount ${amount} exceeds limit of $10,000"                })            elif amount > 5000 and 'justification' not in msg.lower():                violations.append({                    "policy": "Justification Required",                    "type": "missing_justification",                    "severity": "medium",                    "detail": "High-value transaction lacks justification"                })               audit_result = {            "approved": len(violations) == 0,            "violations": violations,            "risk_score": min(len(violations) * 30, 100),            "feedback": "Transaction approved" if len(violations) == 0 else "Violations detected - revision required"        }               class MockResponse:            def __init__(self, content):                self.content = [type('obj', (object,), {                    'type': 'text',                    'text': json.dumps(content, indent=2)                })]               return MockResponse(audit_result)       def _default_response(self) -> Any:        class MockResponse:            def __init__(self):                self.content = [type('obj', (object,), {                    'type': 'text',                    'text': '{"status": "acknowledged"}'                })]        return MockResponse()

We simulate the behavior of a large language model by implementing a mock AI client that differentiates between worker and auditor roles. We intentionally inject policy violations such as PII leakage and budget issues to stress-test the governance logic under realistic failure conditions. Check out the FULL CODES here.

class WorkerAgent:    """Agent A - The Worker that processes financial requests"""       def __init__(self, client: MockAIClient):        self.client = client        self.role = "Financial Operations Worker"        self.processed_requests = []       def process_request(self, request: FinancialRequest) -> WorkerOutput:        """Process a financial request"""        print(f"n{'='*60}")        print(f"🔧 WORKER AGENT: Processing request...")        print(f"{'='*60}")        print(f"Action: {request.action}")        if request.amount:            print(f"Amount: ${request.amount:,.2f}")        else:            print("Amount: N/A")        print(f"Description: {request.description}")               prompt = self._build_worker_prompt(request)               response = self.client.messages_create(            model="claude-sonnet-4-20250514",            max_tokens=1000,            messages=[{"role": "user", "content": prompt}]        )               raw_response = response.content[0].text               try:            details = json.loads(raw_response)        except json.JSONDecodeError:            details = {"raw": raw_response}               output = WorkerOutput(            request_id=f"REQ-{len(self.processed_requests)+1:04d}",            action_taken=request.action,            details=details,            raw_response=raw_response        )               self.processed_requests.append(output)        print(f"n✅ Worker completed processing (ID: {output.request_id})")               return output       def _build_worker_prompt(self, request: FinancialRequest) -> str:        """Build prompt for worker agent"""        amount_str = f"${request.amount:,.2f}" if request.amount else "$0.00"        return f"""You are a WORKER AGENT processing a financial request.   Request Details: - Action: {request.action} - Amount: {amount_str} - Recipient: {request.recipient or 'N/A'} - Description: {request.description} - Justification: {request.justification or 'None provided'}   Process this request and return a JSON response with: - action_taken - amount - status - recipient - account_id - timestamp - Any other relevant details   Return ONLY valid JSON."""   class AuditorAgent:    """Agent B - The Auditor that validates worker output"""       def __init__(self, client: MockAIClient, constitution: Constitution):        self.client = client        self.constitution = constitution        self.role = "Governance Auditor"        self.audit_history = []       def audit(self, worker_output: WorkerOutput) -> AuditResult:        """Audit the worker's output against the constitution"""        print(f"n{'='*60}")        print(f"🔍 AUDITOR AGENT: Auditing output...")        print(f"{'='*60}")               violations = self._check_rules(worker_output)               prompt = self._build_auditor_prompt(worker_output, violations)               response = self.client.messages_create(            model="claude-sonnet-4-20250514",            max_tokens=1000,            messages=[{"role": "user", "content": prompt}]        )               raw_audit = response.content[0].text        try:            audit_data = json.loads(raw_audit)        except json.JSONDecodeError:            audit_data = {"approved": False, "violations": violations, "risk_score": 50}               result = AuditResult(            approved=audit_data.get("approved", False) and len(violations) == 0,            violations=violations,            risk_score=audit_data.get("risk_score", len(violations) * 25),            feedback=audit_data.get("feedback", "Audit completed"),            revision_needed=not audit_data.get("approved", False) or len(violations) > 0        )               self.audit_history.append(result)               self._display_audit_result(result)               return result       def _check_rules(self, output: WorkerOutput) -> List[PolicyViolation]:        """Perform rule-based constitutional checks"""        violations = []        details_str = json.dumps(output.details)               if re.search(r'd{3}-d{2}-d{4}', details_str):            violations.append(PolicyViolation(                policy_name="PII Protection",                violation_type=PolicyViolationType.PII_EXPOSURE,                severity="critical",                description="Social Security Number detected in output",                suggested_fix="Remove or mask SSN field"            ))               if re.search(r'd{4}[-s]?d{4}[-s]?d{4}[-s]?d{4}', details_str):             violations.append(PolicyViolation(                policy_name="PII Protection",                violation_type=PolicyViolationType.PII_EXPOSURE,                severity="critical",                description="Credit card number detected in output",                suggested_fix="Remove or tokenize credit card number"            ))               amount = output.details.get("amount", 0)        if amount > self.constitution.max_transaction_amount:            violations.append(PolicyViolation(                policy_name="Budget Limits",                violation_type=PolicyViolationType.BUDGET_EXCEEDED,                severity="high",                description=f"Amount ${amount:,.2f} exceeds limit of ${self.constitution.max_transaction_amount:,.2f}",                suggested_fix=f"Reduce amount to ${self.constitution.max_transaction_amount:,.2f} or request approval"            ))               if amount > self.constitution.require_approval_above:            if "justification" not in details_str.lower():                violations.append(PolicyViolation(                    policy_name="Justification Required",                    violation_type=PolicyViolationType.MISSING_JUSTIFICATION,                    severity="medium",                    description=f"Transaction of ${amount:,.2f} requires justification",                    suggested_fix="Add justification field explaining the transaction"                ))               return violations       def _build_auditor_prompt(self, output: WorkerOutput, violations: List[PolicyViolation]) -> str:        """Build prompt for auditor agent"""        return f"""You are an AUDITOR AGENT validating financial operations against a Constitution.   Constitution Policies: {json.dumps([p.dict() for p in self.constitution.policies], indent=2)}   Worker Output to Audit: {output.raw_response}   Already Detected Violations: {json.dumps([v.dict() for v in violations], indent=2)}   Perform additional analysis and return JSON with: - approved (boolean) - risk_score (0-100) - feedback (string) - Any additional concerns   Return ONLY valid JSON."""       def _display_audit_result(self, result: AuditResult):        """Display audit results in a readable format"""        print(f"n📊 AUDIT RESULTS:")        print(f"Status: {'✅ APPROVED' if result.approved else '❌ REJECTED'}")        print(f"Risk Score: {result.risk_score:.1f}/100")        print(f"Violations Found: {len(result.violations)}")               if result.violations:            print(f"n⚠️  POLICY VIOLATIONS:")            for i, v in enumerate(result.violations, 1):                print(f"n  {i}. {v.policy_name} [{v.severity.upper()}]")                print(f"     Type: {v.violation_type.value}")                print(f"     Issue: {v.description}")                if v.suggested_fix:                    print(f"     Fix: {v.suggested_fix}")               print(f"n💬 Feedback: {result.feedback}")        print(f"Revision Needed: {'Yes' if result.revision_needed else 'No'}")

We implement the core dual-agent logic by separating execution and governance responsibilities between a Worker Agent and an Auditor Agent. We allow the worker to focus purely on fulfilling financial requests, while we enforce constitutional rules through deterministic checks and AI-assisted auditing. By combining structured prompts, rule-based validation, and clear audit feedback, we create a self-reflective control loop that prioritizes safety, accountability, and compliance. Check out the FULL CODES here.

class GovernanceSystem:    """Orchestrates the dual-agent governance workflow"""       def __init__(self, constitution: Constitution):        self.client = MockAIClient()        self.worker = WorkerAgent(self.client)        self.auditor = AuditorAgent(self.client, constitution)        self.constitution = constitution        self.max_revision_attempts = 3       def process_with_governance(self, request: FinancialRequest) -> Dict[str, Any]:        """Main workflow: Worker processes, Auditor validates, loop if needed"""        print(f"n{'#'*60}")        print(f"# GOVERNANCE SYSTEM: New Request")        print(f"{'#'*60}")               attempt = 0        while attempt < self.max_revision_attempts:            attempt += 1            print(f"n🔄 Attempt {attempt}/{self.max_revision_attempts}")                       worker_output = self.worker.process_request(request)                       audit_result = self.auditor.audit(worker_output)                       if audit_result.approved:                print(f"n{'='*60}")                print(f"✅ FINAL RESULT: APPROVED")                print(f"{'='*60}")                return {                    "status": "approved",                    "output": worker_output.dict(),                    "audit": audit_result.dict(),                    "attempts": attempt                }                       critical_violations = [v for v in audit_result.violations if v.severity == "critical"]            if critical_violations:                print(f"n{'='*60}")                print(f"🛑 FINAL RESULT: REJECTED (Critical Violations)")                print(f"{'='*60}")                return {                    "status": "rejected",                    "reason": "critical_violations",                    "audit": audit_result.dict(),                    "attempts": attempt                }                       if attempt >= self.max_revision_attempts:                print(f"n{'='*60}")                print(f"🛑 FINAL RESULT: REJECTED (Max Attempts)")                print(f"{'='*60}")                return {                    "status": "rejected",                    "reason": "max_attempts_exceeded",                    "audit": audit_result.dict(),                    "attempts": attempt                }               return {"status": "error", "message": "Unexpected exit from loop"}

We orchestrate the complete governance workflow by coordinating the worker and auditor agents within a controlled revision loop. We evaluate each attempt against constitutional rules and immediately halt execution when critical violations are detected. Check out the FULL CODES here.

def run_examples():    """Run demonstration examples"""       print("="*80)    print(" DUAL-AGENT GOVERNANCE SYSTEM WITH CONSTITUTIONAL AI")    print(" Tutorial: Self-Reflective Financial Operations Agents")    print("="*80)       system = GovernanceSystem(FINANCIAL_CONSTITUTION)       print("nn" + "="*80)    print("EXAMPLE 1: Safe Transaction ($2,500)")    print("="*80)       request1 = FinancialRequest(        action="payment",        amount=2500.00,        recipient="Vendor Corp",        description="Monthly software license payment",        justification="Regular recurring payment for essential services"    )       result1 = system.process_with_governance(request1)       print("nn" + "="*80)    print("EXAMPLE 2: High-Value Transaction with PII Leak ($7,500)")    print("="*80)       request2 = FinancialRequest(        action="transfer",        amount=7500.00,        recipient="Executive",        description="Bonus payment to executive",        justification="Q4 performance bonus"    )       result2 = system.process_with_governance(request2)       print("nn" + "="*80)    print("EXAMPLE 3: Budget-Exceeding Transaction ($15,000)")    print("="*80)       request3 = FinancialRequest(        action="transfer",        amount=15000.00,        recipient="Supplier",        description="Large equipment purchase",        justification="New manufacturing equipment for production line"    )       result3 = system.process_with_governance(request3)       print("nn" + "="*80)    print(" SUMMARY OF RESULTS")    print("="*80)    print(f"nExample 1: {result1['status'].upper()}")    print(f"Example 2: {result2['status'].upper()} - {result2.get('reason', 'N/A')}")    print(f"Example 3: {result3['status'].upper()} - {result3.get('reason', 'N/A')}")       print(f"nnTotal API Calls: {system.client.call_count}")    print(f"Worker Processed: {len(system.worker.processed_requests)} requests")    print(f"Auditor Performed: {len(system.auditor.audit_history)} audits")       print("nn" + "="*80)    print(" ACTIVE CONSTITUTION")    print("="*80)    for policy in FINANCIAL_CONSTITUTION.policies:        print(f"n📜 {policy.name} [{policy.severity.upper()}]")        print(f"   {policy.description}")

We demonstrate the system end-to-end by running realistic financial scenarios that exercise both safe and unsafe behaviors. We show how the governance loop responds differently to compliant transactions, PII leaks, and budget violations while producing transparent audit outcomes. Check out the FULL CODES here.

if __name__ == "__main__":    run_examples()       print("nn" + "="*80)    print(" 🎓 TUTORIAL COMPLETE!")    print("="*80)    print("nKey Concepts Demonstrated:")    print("✓ Constitutional AI - Rule-based governance")    print("✓ Dual-Agent System - Worker + Auditor pattern")    print("✓ Policy Violation Detection - PII, Budget, Authorization")    print("✓ Iterative Revision Loop - Self-correction mechanism")    print("✓ Risk Scoring - Quantitative safety assessment")    print("nNext Steps:")    print("• Replace MockAIClient with real Anthropic API")    print("• Implement actual revision logic in Worker Agent")    print("• Add more sophisticated pattern detection")    print("• Integrate with real financial systems")    print("• Build logging and monitoring dashboard")    print("="*80)

We conclude the tutorial by executing all examples and clearly surfacing the core concepts demonstrated by the system. We recap how constitutional rules, dual-agent governance, violation detection, and risk scoring work together in practice.

In conclusion, we demonstrated how to operationalize Constitutional AI beyond theory and embed it into real-world financial decision-making pipelines. We illustrated how we detect and respond to PII leakage, budget overruns, and missing justifications while quantifying risk and enforcing hard governance boundaries. By orchestrating iterative review loops between worker and auditor agents, we demonstrated a practical blueprint for building trustworthy, compliant, and scalable AI-driven financial systems where safety and accountability are first-class design goals rather than afterthoughts.


Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.