Waterfall enrich HubSpot contacts across Apollo, Clearbit, and PDL using an agent skill
medium complexityCost: Usage-based
Prerequisites
Prerequisites
- Claude Code, Cursor, or another AI coding agent that supports skills
- HubSpot private app token stored as
HUBSPOT_TOKEN(scopes:crm.objects.contacts.read,crm.objects.contacts.write) - Apollo API key stored as
APOLLO_API_KEY - Clearbit API key stored as
CLEARBIT_API_KEY - People Data Labs API key stored as
PDL_API_KEY
Overview
Waterfall enrichment is one of the most powerful patterns in RevOps, but it involves complex branching logic. An agent skill wraps the entire waterfall into a single command — run /waterfall-enrich and it cascades through Apollo, Clearbit, and PDL automatically, writing results to HubSpot with source attribution.
Step 1: Create the skill directory
mkdir -p .claude/skills/waterfall-enrich/scriptsStep 2: Write the SKILL.md file
Create .claude/skills/waterfall-enrich/SKILL.md:
---
name: waterfall-enrich
description: Waterfall enriches HubSpot contacts missing key fields. Tries Apollo first, then Clearbit for gaps, then People Data Labs as a final fallback. Writes enriched data and source attribution back to HubSpot.
disable-model-invocation: true
allowed-tools: Bash(python *)
---
Run waterfall enrichment on HubSpot contacts missing job title:
1. Run: `python $SKILL_DIR/scripts/waterfall.py`
2. Review the per-contact enrichment results
3. Check source attribution to see which providers filled which contactsStep 3: Write the waterfall script
Create .claude/skills/waterfall-enrich/scripts/waterfall.py:
#!/usr/bin/env python3
"""
Waterfall Enrichment: HubSpot → Apollo → Clearbit → PDL → HubSpot
Tries each provider in sequence. Only calls the next provider for missing fields.
"""
import os
import sys
import time
try:
import requests
except ImportError:
os.system("pip install requests -q")
import requests
# --- Config ---
HUBSPOT_TOKEN = os.environ.get("HUBSPOT_TOKEN")
APOLLO_API_KEY = os.environ.get("APOLLO_API_KEY")
CLEARBIT_API_KEY = os.environ.get("CLEARBIT_API_KEY")
PDL_API_KEY = os.environ.get("PDL_API_KEY")
required = {"HUBSPOT_TOKEN": HUBSPOT_TOKEN, "APOLLO_API_KEY": APOLLO_API_KEY}
missing = [k for k, v in required.items() if not v]
if missing:
print(f"ERROR: Missing required env vars: {', '.join(missing)}")
sys.exit(1)
HS_HEADERS = {"Authorization": f"Bearer {HUBSPOT_TOKEN}", "Content-Type": "application/json"}
REQUIRED_FIELDS = ["jobtitle", "company", "phone", "linkedin_url", "industry"]
# --- Provider functions ---
def enrich_apollo(email):
resp = requests.post(
"https://api.apollo.io/api/v1/people/match",
headers={"x-api-key": APOLLO_API_KEY, "Content-Type": "application/json"},
json={"email": email}
)
resp.raise_for_status()
p = resp.json().get("person")
if not p:
return {}
return {
"jobtitle": p.get("title"),
"company": p.get("organization", {}).get("name"),
"phone": (p.get("phone_numbers") or [{}])[0].get("sanitized_number"),
"linkedin_url": p.get("linkedin_url"),
"industry": p.get("organization", {}).get("industry"),
}
def enrich_clearbit(email):
if not CLEARBIT_API_KEY:
return {}
resp = requests.get(
f"https://person.clearbit.com/v2/people/find?email={email}",
headers={"Authorization": f"Bearer {CLEARBIT_API_KEY}"}
)
if resp.status_code == 404:
return {}
resp.raise_for_status()
d = resp.json()
handle = d.get("linkedin", {}).get("handle")
return {
"jobtitle": d.get("employment", {}).get("title"),
"company": d.get("employment", {}).get("name"),
"linkedin_url": f"https://linkedin.com/in/{handle}" if handle else None,
}
def enrich_pdl(email):
if not PDL_API_KEY:
return {}
resp = requests.post(
"https://api.peopledatalabs.com/v5/person/enrich",
headers={"x-api-key": PDL_API_KEY, "Content-Type": "application/json"},
json={"email": email}
)
if resp.status_code == 404:
return {}
resp.raise_for_status()
d = resp.json().get("data", resp.json())
phones = d.get("phone_numbers") or []
return {
"jobtitle": d.get("job_title"),
"company": d.get("job_company_name"),
"phone": phones[0] if phones else None,
"linkedin_url": d.get("linkedin_url"),
"industry": d.get("industry"),
}
PROVIDERS = [("apollo", enrich_apollo), ("clearbit", enrich_clearbit), ("pdl", enrich_pdl)]
def waterfall(email):
merged = {}
sources = []
for name, fn in PROVIDERS:
if all(merged.get(f) for f in REQUIRED_FIELDS):
break
try:
result = fn(email)
filled = False
for k, v in result.items():
if v and not merged.get(k):
merged[k] = v
filled = True
if filled:
sources.append(name)
except Exception as e:
print(f" WARN: {name} failed: {e}")
time.sleep(0.5)
merged["enrichment_source"] = "+".join(sources) if sources else "none"
return merged
# --- Main ---
print("Searching for unenriched contacts...")
contacts = []
after = 0
while True:
resp = requests.post(
"https://api.hubapi.com/crm/v3/objects/contacts/search",
headers=HS_HEADERS,
json={
"filterGroups": [{"filters": [{
"propertyName": "jobtitle",
"operator": "NOT_HAS_PROPERTY"
}]}],
"properties": ["email", "jobtitle", "company"],
"limit": 100,
"after": after
}
)
resp.raise_for_status()
data = resp.json()
contacts.extend(data["results"])
if data.get("paging", {}).get("next"):
after = data["paging"]["next"]["after"]
else:
break
print(f"Found {len(contacts)} contacts to enrich\n")
enriched = 0
for contact in contacts:
email = contact["properties"].get("email")
if not email:
continue
domain = email.split("@")[-1].lower()
if domain in ("gmail.com", "yahoo.com", "hotmail.com", "outlook.com"):
continue
print(f" {email}...")
fields = waterfall(email)
properties = {k: v for k, v in fields.items() if v}
if properties:
requests.patch(
f"https://api.hubapi.com/crm/v3/objects/contacts/{contact['id']}",
headers=HS_HEADERS,
json={"properties": properties}
).raise_for_status()
enriched += 1
print(f" -> {fields['enrichment_source']} | {[k for k in properties if k != 'enrichment_source']}")
print(f"\nDone. Enriched {enriched}/{len(contacts)} contacts.")Step 4: Run the skill
# Via Claude Code
/waterfall-enrich
# Or directly
python .claude/skills/waterfall-enrich/scripts/waterfall.pyStep 5: Schedule it (optional)
# .github/workflows/waterfall-enrich.yml
name: Waterfall Enrichment
on:
schedule:
- cron: '0 */6 * * *' # Every 6 hours
workflow_dispatch: {}
jobs:
enrich:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install requests
- run: python .claude/skills/waterfall-enrich/scripts/waterfall.py
env:
HUBSPOT_TOKEN: ${{ secrets.HUBSPOT_TOKEN }}
APOLLO_API_KEY: ${{ secrets.APOLLO_API_KEY }}
CLEARBIT_API_KEY: ${{ secrets.CLEARBIT_API_KEY }}
PDL_API_KEY: ${{ secrets.PDL_API_KEY }}Cost
- Apollo: 1 credit/enrichment — called for every contact. ($49/mo Basic = 900 credits)
- Clearbit: Volume-based starting at $99/mo — called for ~30% of contacts.
- People Data Labs: $0.03-0.10/enrichment — called for ~10-15% of contacts.
- The waterfall saves money: For 100 contacts, you might use 100 Apollo + 30 Clearbit + 10 PDL credits instead of 100 credits at each provider.
Optional providers
The script handles missing Clearbit or PDL keys gracefully — it skips those providers. You can start with Apollo-only and add providers later as needed.
When to use this approach
- You want to test the waterfall pattern before committing to a platform
- You want full control over provider order and field-merging logic
- You need to run enrichment ad-hoc ("enrich the contacts we imported today")
- You want enrichment logic version-controlled alongside your code
When to move to a dedicated tool
- You need real-time enrichment on contact creation (not batch)
- Multiple team members need to modify provider settings without touching code
- You want visual monitoring of which providers are being called and their success rates
Need help implementing this?
We build and optimize automation systems for mid-market businesses. Let's discuss the right approach for your team.