← Back to Index

Tier 1 Implementation Plan - Gmail Pub/Sub + Event Router + Multi-Agent

Started: 2026-02-11 06:05 UTC
Approved by: Quan 🙌
Estimated time: 8 hours total


Phase 1: Gmail Pub/Sub Setup (2 hours)

Prerequisites Check

Hooks enabled: Already configured in openclaw.json

{
  "enabled": true,
  "token": "6978cffd551a84e06df6d76e13cd56a630223468ab40b428",
  "path": "/hooks"
}

Gmail OAuth tokens: Already have for 3 accounts

Tailscale: Should be available on VPS

Step 1.1: Run OpenClaw Gmail Setup Wizard (30 min)

You need to run this from the VPS SSH (not from inside agent):

# SSH into VPS
ssh node@144.202.121.97

# Run the Gmail Pub/Sub setup wizard
openclaw webhooks gmail setup

The wizard will:

  1. Check if gcloud, gogcli, tailscale are installed (installs if missing on macOS via brew)
  2. Guide you through Google Cloud Pub/Sub setup
  3. Create topics for each Gmail account
  4. Configure Gmail watch
  5. Set up Tailscale Funnel for webhook endpoint
  6. Write config to openclaw.json

Wizard will prompt for:

Expected result: Config updated, Gmail pushing to Pub/Sub, gogcli daemon running

Step 1.2: Verify Webhook Endpoint (5 min)

After wizard completes, verify:

# Check if gogcli daemon is running
ps aux | grep "gog gmail watch serve"

# Check OpenClaw config for Gmail hooks
cat ~/.openclaw/openclaw.json | grep -A 20 '"gmail"'

# Send test webhook
curl -X POST http://127.0.0.1:18789/hooks/wake \
  -H 'Authorization: Bearer 6978cffd551a84e06df6d76e13cd56a630223468ab40b428' \
  -H 'Content-Type: application/json' \
  -d '{"text":"Gmail Pub/Sub test","mode":"now"}'

Expected: I should receive the test message via Telegram

Step 1.3: Build Intelligent Classifier (60 min)

I'll build this:

Create tools/gmail-intelligent-triage.py:

#!/usr/bin/env python3
"""
Intelligent Gmail triage using Haiku for batch classification.
Reads memory/email-patterns.md for learned patterns.
Classifies emails as: Critical, Informational, Noise
"""

import json
import sys

def classify_emails(emails):
    """
    emails: list of {id, from, to, subject, snippet, date, unread}
    returns: list of {id, classification, reason}
    """
    # Load learned patterns
    with open('memory/email-patterns.md', 'r') as f:
        patterns = f.read()
    
    # Build prompt for Haiku
    prompt = f"""You are an intelligent email classifier for Quan's inbox.

Read these learned patterns:
{patterns}

Classify these {len(emails)} emails as:
- **Critical**: Real person asking Quan for action, professional services, time-sensitive business
- **Informational**: Support tickets, quote forms, opportunities (not urgent)
- **Noise**: Marketing, newsletters, automated notifications

Emails to classify:
"""
    
    for i, email in enumerate(emails):
        prompt += f"\nEmail {i+1}:\n"
        prompt += f"From: {email['from']}\n"
        prompt += f"Subject: {email['subject']}\n"
        prompt += f"Snippet: {email['snippet'][:200]}\n"
        prompt += f"Date: {email['date']}\n"
        prompt += f"Unread: {email['unread']}\n"
    
    prompt += "\nOutput JSON: [{\"id\": \"...\", \"class\": \"Critical|Informational|Noise\", \"reason\": \"...\"}]"
    
    # Call Haiku via OpenClaw sessions_spawn
    # (This will be implemented using the actual classification logic)
    
    return []

if __name__ == '__main__':
    emails_json = sys.stdin.read()
    emails = json.loads(emails_json)
    results = classify_emails(emails)
    print(json.dumps(results, indent=2))

Also create tools/gmail-pubsub-handler.py:

#!/usr/bin/env python3
"""
Gmail Pub/Sub event handler - called by OpenClaw webhook.
Fetches email details, classifies in batch, alerts if Critical.
"""

import json
import sys
import subprocess

def handle_gmail_event(event):
    """
    event: {source: 'gmail', account: 'quan@ztag.com', messageId: '...'}
    """
    account = event['account']
    message_id = event['messageId']
    
    # Fetch email details via Gmail API
    # (use existing gmail-fetch.py logic)
    
    # Check if already alerted
    with open('working/ops/alerted-email-ids.json', 'r') as f:
        alerted = json.load(f)
    
    if message_id in alerted:
        return  # Already processed
    
    # Classify using intelligent triage
    # classification = call gmail-intelligent-triage.py
    
    # If Critical, send alert via Telegram
    # If Informational, add to digest queue
    # If Noise, log only
    
    pass

if __name__ == '__main__':
    event_json = sys.stdin.read()
    event = json.loads(event_json)
    handle_gmail_event(event)

Step 1.4: Deploy Shadow Mode (15 min)

Config update for shadow mode (I'll prepare, you apply):

Add to openclaw.json:

{
  "hooks": {
    "enabled": true,
    "token": "6978cffd551a84e06df6d76e13cd56a630223468ab40b428",
    "path": "/hooks",
    "presets": ["gmail"],
    "gmail": {
      "shadowMode": true,
      "accounts": ["quan@ztag.com", "quan@gantom.com", "quan777@gmail.com"],
      "model": "anthropic/claude-haiku-4-5",
      "logPath": "working/ops/gmail-shadow-log.json"
    },
    "mappings": [
      {
        "match": { "path": "gmail" },
        "action": "agent",
        "wakeMode": "now",
        "name": "Gmail",
        "sessionKey": "hook:gmail:{{messages[0].id}}",
        "messageTemplate": "Gmail Pub/Sub event - classifying...",
        "model": "anthropic/claude-haiku-4-5",
        "deliver": false,
        "transform": {
          "module": "gmail-pubsub-handler"
        }
      }
    ]
  }
}

Shadow mode = logs decisions, doesn't send alerts (yet)

Step 1.5: Test & Tune (10 min)

Send yourself a test email to quan@ztag.com, verify:

  1. Pub/Sub pushes notification within 1-3 seconds
  2. OpenClaw receives webhook
  3. Haiku classifies email
  4. Decision logged to working/ops/gmail-shadow-log.json
  5. NO alert sent (shadow mode)

Review logs daily for 1 week, tune classifier based on misclassifications


Phase 2: Event Router Architecture (4 hours)

Step 2.1: Build Core Router (90 min)

Create tools/event-router.py:

#!/usr/bin/env python3
"""
Universal event router - dispatches events to plugins.
Auto-discovers plugins in tools/event-plugins/
"""

import importlib
import os
import json
import sys

class EventRouter:
    def __init__(self):
        self.plugins = self.load_plugins()
        self.config = self.load_config()
    
    def load_plugins(self):
        """Auto-discover all plugins in event-plugins/"""
        plugins = []
        plugin_dir = 'tools/event-plugins'
        
        if not os.path.exists(plugin_dir):
            os.makedirs(plugin_dir)
            return plugins
        
        for file in os.listdir(plugin_dir):
            if file.endswith('.py') and not file.startswith('_'):
                module_name = file[:-3]
                try:
                    module = importlib.import_module(f'event-plugins.{module_name}')
                    class_name = f'{module_name.title().replace("-", "")}EventHandler'
                    plugin_class = getattr(module, class_name)
                    plugins.append(plugin_class())
                    print(f"✓ Loaded plugin: {module_name}", file=sys.stderr)
                except Exception as e:
                    print(f"✗ Failed to load {module_name}: {e}", file=sys.stderr)
        
        return plugins
    
    def load_config(self):
        """Load event-config.json"""
        try:
            with open('tools/event-config.json', 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {"plugins": {}, "router": {"default_model": "anthropic/claude-haiku-4-5"}}
    
    def route(self, event):
        """Route event to appropriate plugin"""
        for plugin in self.plugins:
            if plugin.can_handle(event):
                plugin_name = plugin.__class__.__name__
                config = self.config.get('plugins', {}).get(plugin_name, {})
                
                if not config.get('enabled', True):
                    print(f"Plugin {plugin_name} disabled, skipping", file=sys.stderr)
                    return None
                
                return plugin.handle(event, config)
        
        print(f"No handler found for event: {event.get('source', 'unknown')}", file=sys.stderr)
        return None

if __name__ == '__main__':
    event_json = sys.stdin.read()
    event = json.loads(event_json)
    
    router = EventRouter()
    result = router.route(event)
    
    if result:
        print(json.dumps(result, indent=2))

Step 2.2: Create Plugin Template (30 min)

Create tools/event-plugins/_template.py:

#!/usr/bin/env python3
"""
Event plugin template - copy this to create new plugins.
Rename class to match your plugin (e.g., GmailEventHandler, CalendarEventHandler)
"""

class TemplateEventHandler:
    """
    Standard interface for event plugins.
    All plugins must implement: can_handle() and handle()
    """
    
    def can_handle(self, event):
        """
        Return True if this plugin handles this event type.
        
        Args:
            event: dict with at least {'source': 'gmail|calendar|drive|...'}
        
        Returns:
            bool: True if this plugin should handle the event
        """
        return event.get('source') == 'template'
    
    def handle(self, event, config):
        """
        Process the event and return action to take.
        
        Args:
            event: dict with event data
            config: dict with plugin-specific config from event-config.json
        
        Returns:
            dict: {
                'action': 'alert|digest|silent',
                'message': 'Alert text for Telegram' (if action=alert),
                'priority': 'critical|informational|noise'
            }
        """
        # Example processing logic
        source = event.get('source')
        data = event.get('data', {})
        
        # Your classification logic here
        # ...
        
        return {
            'action': 'alert',
            'message': f"Event from {source}: {data}",
            'priority': 'informational'
        }
    
    def get_config_schema(self):
        """
        Return config schema for this plugin.
        Used for documentation and validation.
        """
        return {
            'enabled': True,
            'cost_estimate_monthly': 0.00,
            'description': 'Template plugin for event handling'
        }

Step 2.3: Create Gmail Plugin (60 min)

Create tools/event-plugins/gmail.py:

#!/usr/bin/env python3
"""
Gmail event plugin - intelligent email triage
"""

import json
import subprocess

class GmailEventHandler:
    def can_handle(self, event):
        return event.get('source') == 'gmail'
    
    def handle(self, event, config):
        """Classify email and determine action"""
        account = event.get('account')
        messages = event.get('messages', [])
        
        if not messages:
            return {'action': 'silent'}
        
        # Load alerted IDs to prevent duplicates
        try:
            with open('working/ops/alerted-email-ids.json', 'r') as f:
                alerted = json.load(f)
        except FileNotFoundError:
            alerted = []
        
        # Filter out already-alerted
        new_messages = [m for m in messages if m['id'] not in alerted]
        
        if not new_messages:
            return {'action': 'silent'}
        
        # Classify using intelligent triage (call Haiku via subprocess)
        classifications = self.classify_emails(new_messages, config)
        
        # Separate by priority
        critical = [c for c in classifications if c['class'] == 'Critical']
        informational = [c for c in classifications if c['class'] == 'Informational']
        
        if critical:
            # Alert immediately
            messages_text = []
            for c in critical:
                msg = next(m for m in new_messages if m['id'] == c['id'])
                messages_text.append(f"{msg['from']} - {msg['subject'][:50]}")
                alerted.append(c['id'])
            
            # Save alerted IDs
            with open('working/ops/alerted-email-ids.json', 'w') as f:
                json.dump(alerted, f)
            
            return {
                'action': 'alert',
                'message': f"📧 {len(critical)} critical: {'; '.join(messages_text)}",
                'priority': 'critical'
            }
        
        elif informational:
            # Add to digest queue
            try:
                with open('working/ops/digest-queue.json', 'r') as f:
                    queue = json.load(f)
            except FileNotFoundError:
                queue = []
            
            for c in informational:
                msg = next(m for m in new_messages if m['id'] == c['id'])
                queue.append({
                    'source': 'gmail',
                    'account': account,
                    'from': msg['from'],
                    'subject': msg['subject'],
                    'date': msg['date']
                })
            
            with open('working/ops/digest-queue.json', 'w') as f:
                json.dump(queue, f)
            
            return {'action': 'silent'}  # Digest sent by hourly cron
        
        else:
            # All noise, log only
            return {'action': 'silent'}
    
    def classify_emails(self, emails, config):
        """Call gmail-intelligent-triage.py via Haiku"""
        # This would spawn a Haiku sub-agent for classification
        # For now, placeholder
        return []
    
    def get_config_schema(self):
        return {
            'enabled': True,
            'accounts': ['quan@ztag.com', 'quan@gantom.com', 'quan777@gmail.com'],
            'cost_estimate_monthly': 2.00,
            'batch_size': 20,
            'model': 'anthropic/claude-haiku-4-5'
        }

Step 2.4: Configuration File (15 min)

Create tools/event-config.json:

{
  "plugins": {
    "GmailEventHandler": {
      "enabled": true,
      "accounts": ["quan@ztag.com", "quan@gantom.com", "quan777@gmail.com"],
      "cost_estimate_monthly": 2.00,
      "batch_size": 20,
      "model": "anthropic/claude-haiku-4-5"
    },
    "CalendarEventHandler": {
      "enabled": false,
      "calendars": ["primary", "ztag", "gantom"],
      "meeting_prep_minutes": 15,
      "cost_estimate_monthly": 0.10,
      "model": "anthropic/claude-haiku-4-5"
    },
    "DriveEventHandler": {
      "enabled": false,
      "watch_folders": ["/meeting-notes", "/contracts"],
      "cost_estimate_monthly": 0.05,
      "model": "anthropic/claude-haiku-4-5"
    }
  },
  "router": {
    "default_model": "anthropic/claude-haiku-4-5",
    "flood_threshold": 30,
    "batch_window_seconds": 300
  }
}

Step 2.5: Documentation (45 min)

Create tools/event-plugins/README.md:

# Event Plugin System

## Overview
Universal event router that dispatches events to plugin handlers.
Auto-discovers plugins, configurable via JSON, extensible for any event source.

## Adding a New Event Source

1. Copy `_template.py` to `{source}.py` (e.g., `calendar.py`)
2. Implement `can_handle()` and `handle()` methods
3. Add config to `event-config.json`
4. Router auto-discovers plugin on restart

## Plugin Interface

Every plugin must implement:
- `can_handle(event) → bool`
- `handle(event, config) → dict`
- `get_config_schema() → dict`

## Event Format

```json
{
  "source": "gmail|calendar|drive|zoho|custom",
  "timestamp": "2026-02-11T06:00:00Z",
  "data": { ... source-specific data ... }
}

Testing Your Plugin

python3 test-plugin.py --plugin gmail --event test-data/gmail-event.json

Cost Tracking

Router logs costs per plugin to cost-tracking/event-costs-{date}.json

Minnie-Review generates weekly ROI report automatically.


---

## Phase 3: Multi-Agent Routing (2 hours)

### Step 3.1: Configure Agents (60 min)

**You need to update openclaw.json with multi-agent config:**

```json
{
  "agents": {
    "list": [
      {
        "id": "main",
        "workspace": "~/.openclaw/workspace",
        "tools": {
          "allow": ["*"]
        }
      },
      {
        "id": "research",
        "workspace": "~/.openclaw/workspace",
        "model": "anthropic/claude-haiku-4-5",
        "tools": {
          "allow": [
            "web_search",
            "web_fetch",
            "memory_search",
            "memory_get",
            "read"
          ],
          "deny": [
            "exec",
            "write",
            "edit",
            "browser",
            "cron",
            "gateway",
            "message"
          ]
        }
      },
      {
        "id": "accounting",
        "workspace": "~/.openclaw/workspace-charlie",
        "model": "anthropic/claude-haiku-4-5",
        "tools": {
          "allow": [
            "read",
            "write",
            "memory_search",
            "memory_get"
          ],
          "deny": [
            "exec",
            "browser",
            "cron",
            "gateway",
            "message"
          ]
        }
      }
    ]
  }
}

Step 3.2: Create Charlie Workspace (30 min)

# SSH to VPS
mkdir -p ~/.openclaw/workspace-charlie

# Create Charlie-specific files
cat > ~/.openclaw/workspace-charlie/AGENTS.md << 'EOF'
# Charlie Workspace

You are Minnie-Accounting, helping Charlie with Zoho Books (ZTAG) and Xero (Gantom) pre-reconciliation.

## Your Role
- Pre-reconcile bank transactions
- Categorize expenses
- Match invoices to payments
- Prepare monthly close reports
- Email summaries to charlie@ztag.com

## Trust Ladder
- Phase 1: Read-only, generate reports
- Phase 2: Suggest categorizations (Charlie approves)
- Phase 3: Auto-categorize with confidence scores
- Phase 4: Full reconciliation with exception handling

You never send as Charlie. All output is draft/report format for her review.
EOF

cat > ~/.openclaw/workspace-charlie/MEMORY.md << 'EOF'
# Charlie Accounting Memory

## Scope
- ZTAG: Zoho Books
- Gantom: Xero

## Common Categorizations
(Will be populated as we learn)

## Recurring Transactions
(Will be populated as we learn)

## Exception Patterns
(Will be populated as we learn)
EOF

Step 3.3: Test Agent Routing (30 min)

Test research agent:

# Via Telegram, send:
# @research What are the latest CA education grants?

Expected: Cheaper Haiku-based research, no ability to execute commands

Test accounting agent:

# Via webhook or cron:
# Route to accounting agent, generate sample report

Expected: Charlie workspace, restricted tools, Haiku model


Success Criteria

Phase 1 Complete:

Phase 2 Complete:

Phase 3 Complete:


Cost Tracking

Before Tier 1: $13.50/day ($405/month)

After Tier 1:

ROI verification:

Aggregate: 500-1000x ROI


Next Steps After Tier 1

Once shadow mode completes (1 week):

  1. Enable live alerts (turn off shadowMode)
  2. Build Calendar plugin (meeting prep automation)
  3. Build Drive plugin (auto-process meeting notes)
  4. Add browser automation skills
  5. Build Canvas dashboards

Status: Ready to begin Phase 1
Blocking: Need you to SSH to VPS and run openclaw webhooks gmail setup

Let me know when you're ready and I'll guide you through each step! 🚀