Generate a weekly Salesforce pipeline report using an agent skill

low complexityCost: Usage-based

Prerequisites

Prerequisites
  • Claude Code or another agent that supports the Agent Skills standard
  • Salesforce instance URL stored as SALESFORCE_INSTANCE_URL environment variable
  • Salesforce access token stored as SALESFORCE_ACCESS_TOKEN environment variable
  • Slack Bot Token stored as SLACK_BOT_TOKEN environment variable
  • Slack channel ID stored as SLACK_CHANNEL_ID environment variable

Overview

This approach creates an agent skill that connects to Salesforce, runs SOQL queries to pull pipeline data, calculates key metrics, and posts a formatted report to Slack. Unlike the schedule-based approaches, this runs on-demand — useful for ad-hoc pipeline reviews or when leadership asks "where do we stand?"

Step 1: Create the skill

Create .claude/skills/sf-pipeline-report/SKILL.md:

---
name: sf-pipeline-report
description: Generate a Salesforce pipeline report and post it to Slack
disable-model-invocation: true
allowed-tools: Bash(python *)
---
 
Pull pipeline data from Salesforce, calculate metrics, and post a formatted report to Slack.
 
Run: `python $SKILL_DIR/scripts/pipeline_report.py`

Step 2: Write the script

Create .claude/skills/sf-pipeline-report/scripts/pipeline_report.py:

#!/usr/bin/env python3
import os, sys, requests, json
from datetime import datetime
 
INSTANCE_URL = os.environ.get("SALESFORCE_INSTANCE_URL")
ACCESS_TOKEN = os.environ.get("SALESFORCE_ACCESS_TOKEN")
SLACK_TOKEN = os.environ.get("SLACK_BOT_TOKEN")
SLACK_CHANNEL = os.environ.get("SLACK_CHANNEL_ID")
 
if not all([INSTANCE_URL, ACCESS_TOKEN, SLACK_TOKEN, SLACK_CHANNEL]):
    print("ERROR: Set SALESFORCE_INSTANCE_URL, SALESFORCE_ACCESS_TOKEN, SLACK_BOT_TOKEN, SLACK_CHANNEL_ID")
    sys.exit(1)
 
SF_HEADERS = {
    "Authorization": f"Bearer {ACCESS_TOKEN}",
    "Content-Type": "application/json",
}
 
def sf_query(soql):
    resp = requests.get(
        f"{INSTANCE_URL}/services/data/v59.0/query",
        headers=SF_HEADERS,
        params={"q": soql},
    )
    resp.raise_for_status()
    return resp.json().get("records", [])
 
# Pipeline by stage
pipeline = sf_query(
    "SELECT StageName, COUNT(Id) deal_count, SUM(Amount) total_amount "
    "FROM Opportunity WHERE IsClosed = false AND Amount != null "
    "GROUP BY StageName ORDER BY SUM(Amount) DESC"
)
 
# Closed this week
closed = sf_query(
    "SELECT COUNT(Id) closed_count, SUM(Amount) closed_amount "
    "FROM Opportunity WHERE CloseDate = THIS_WEEK AND IsWon = true"
)
 
# New this week
new_deals = sf_query(
    "SELECT COUNT(Id) new_count, SUM(Amount) new_amount "
    "FROM Opportunity WHERE CreatedDate = THIS_WEEK"
)
 
# Calculate metrics
total_pipeline = sum(s.get("total_amount", 0) or 0 for s in pipeline)
total_deals = sum(s.get("deal_count", 0) for s in pipeline)
avg_deal = total_pipeline / total_deals if total_deals > 0 else 0
 
closed_count = closed[0].get("closed_count", 0) if closed else 0
closed_amount = closed[0].get("closed_amount", 0) or 0 if closed else 0
new_count = new_deals[0].get("new_count", 0) if new_deals else 0
new_amount = new_deals[0].get("new_amount", 0) or 0 if new_deals else 0
 
stage_lines = []
for s in pipeline:
    name = s["StageName"]
    count = s["deal_count"]
    amount = s.get("total_amount", 0) or 0
    stage_lines.append(f"• *{name}*: {count} deals — ${amount:,.0f}")
 
stage_breakdown = "\n".join(stage_lines)
 
# Post to Slack
from slack_sdk import WebClient
slack = WebClient(token=SLACK_TOKEN)
 
today = datetime.now().strftime("%B %d, %Y")
 
slack.chat_postMessage(
    channel=SLACK_CHANNEL,
    text=f"Weekly Pipeline Report — {today}",
    blocks=[
        {"type": "header", "text": {"type": "plain_text", "text": f"📊 Pipeline Report — {today}"}},
        {"type": "section", "fields": [
            {"type": "mrkdwn", "text": f"*Total Pipeline*\n${total_pipeline:,.0f}"},
            {"type": "mrkdwn", "text": f"*Open Deals*\n{total_deals}"},
            {"type": "mrkdwn", "text": f"*Avg Deal Size*\n${avg_deal:,.0f}"},
            {"type": "mrkdwn", "text": f"*Closed This Week*\n{closed_count} — ${closed_amount:,.0f}"},
            {"type": "mrkdwn", "text": f"*New This Week*\n{new_count} — ${new_amount:,.0f}"},
        ]},
        {"type": "section", "text": {"type": "mrkdwn", "text": f"*By Stage*\n{stage_breakdown}"}},
    ]
)
 
print(f"Pipeline report posted to Slack:")
print(f"  Total pipeline: ${total_pipeline:,.0f} across {total_deals} deals")
print(f"  Closed this week: {closed_count} deals (${closed_amount:,.0f})")
print(f"  New this week: {new_count} deals (${new_amount:,.0f})")

Step 3: Run it

/sf-pipeline-report

Step 4: Schedule (optional)

For weekly delivery, schedule via Cowork or cron:

# crontab — run every Monday at 8 AM
0 8 * * 1 cd /path/to/project && python .claude/skills/sf-pipeline-report/scripts/pipeline_report.py

When to use this approach

  • You want pipeline reports on demand — run it before a sales meeting
  • You prefer a script you can modify and version control
  • You want to combine pipeline data with other data sources in a single report
  • You don't want to maintain Salesforce Flows or Connected Apps for n8n

Need help implementing this?

We build and optimize automation systems for mid-market businesses. Let's discuss the right approach for your team.