Skip to main content

Overview

This guide covers best practices for implementing robust, secure, and scalable webhook handlers for the HopNow API.

Security

Always Verify Signatures

Never trust webhook data without signature verification:
import hmac
import hashlib

def verify_webhook_signature(payload, signature, secret):
    """Verify webhook signature"""
    expected_signature = hmac.new(
        secret.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()

    if not hmac.compare_equal(signature, expected_signature):
        raise SecurityError("Invalid webhook signature")

@app.post("/webhooks")
async def handle_webhook(request):
    # Get signature from header
    signature = request.headers.get("X-Webhook-Signature")

    # Get raw payload
    payload = await request.body()

    # Verify before processing
    verify_webhook_signature(payload, signature, WEBHOOK_SECRET)

    # Now safe to process
    event = json.loads(payload)
    process_event(event)

Use HTTPS Only

Always use HTTPS endpoints for webhooks:
# Good
url = "https://api.example.com/webhooks"

# Bad - will be rejected
url = "http://api.example.com/webhooks"  # ❌ Insecure

Rotate Secrets Regularly

Implement secret rotation for webhook endpoints:
def rotate_webhook_secret():
    """Rotate webhook secret quarterly"""
    # Generate new secret
    new_secret = generate_secure_secret()

    # Update endpoint with new secret
    update_webhook_endpoint(
        endpoint_id="we_123",
        secret=new_secret
    )

    # Update your local configuration
    update_local_config("WEBHOOK_SECRET", new_secret)

Reliability

Return 200 Quickly

Respond within 5 seconds to avoid timeouts:
@app.post("/webhooks")
async def handle_webhook(request):
    # Verify signature
    verify_webhook_signature(request)

    # Queue for async processing
    event = await request.json()
    background_tasks.add_task(process_webhook, event)

    # Return 200 immediately
    return {"status": "received"}

async def process_webhook(event):
    """Process webhook asynchronously"""
    try:
        # Your business logic here
        await update_order_status(event)
        await send_email_notification(event)
    except Exception as e:
        logger.error(f"Webhook processing failed: {e}")
        # Don't raise - already responded to webhook

Implement Idempotency

Handle duplicate events gracefully:
class WebhookEventTracker:
    def __init__(self):
        self.processed_events = set()

    def is_processed(self, event_id):
        """Check if event already processed"""
        return event_id in self.processed_events

    def mark_processed(self, event_id):
        """Mark event as processed"""
        self.processed_events.add(event_id)
        # Also persist to database for durability
        db.save_processed_event(event_id)

tracker = WebhookEventTracker()

async def process_webhook(event):
    event_id = event["id"]

    if tracker.is_processed(event_id):
        logger.info(f"Event {event_id} already processed, skipping")
        return

    # Process event
    handle_event(event)

    # Mark as processed
    tracker.mark_processed(event_id)

Handle Out-of-Order Events

Events may arrive out of sequence:
def handle_payout_event(event):
    """Handle payout events with order checking"""
    payout_id = event["data"]["id"]
    event_time = event["created"]
    new_status = event["data"]["status"]

    # Get current payout state
    current_payout = db.get_payout(payout_id)

    # Check if this event is newer than current state
    if current_payout and current_payout["updated"] > event_time:
        logger.info(f"Ignoring outdated event for {payout_id}")
        return

    # Update with new state
    db.update_payout(payout_id, {
        "status": new_status,
        "updated": event_time
    })

Scalability

Use Message Queues

For high volume, use message queues:
from celery import Celery

app = Celery('webhooks')

@app.post("/webhooks")
async def handle_webhook(request):
    # Verify signature
    verify_webhook_signature(request)

    # Enqueue for processing
    event = await request.json()
    process_webhook_task.delay(event)

    return {"status": "queued"}

@app.task(retry_backoff=True, max_retries=3)
def process_webhook_task(event):
    """Process webhook with retry logic"""
    try:
        handle_event(event)
    except TransientError as e:
        # Retry on transient errors
        raise process_webhook_task.retry(exc=e)

Rate Limiting

Protect your endpoints from excessive webhooks:
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter

@app.post("/webhooks")
@RateLimiter(times=100, seconds=60)  # 100 requests per minute
async def handle_webhook(request):
    # Your webhook handler
    pass

Monitoring

Log Everything

Comprehensive logging for debugging:
import logging

logger = logging.getLogger(__name__)

async def process_webhook(event):
    event_id = event["id"]
    event_type = event["type"]

    logger.info(f"Processing webhook: {event_id} ({event_type})")

    try:
        result = await handle_event(event)
        logger.info(f"Webhook processed successfully: {event_id}", {
            "event_type": event_type,
            "processing_time_ms": result["duration"]
        })
    except Exception as e:
        logger.error(f"Webhook processing failed: {event_id}", {
            "event_type": event_type,
            "error": str(e),
            "stack_trace": traceback.format_exc()
        })
        raise

Track Metrics

Monitor webhook performance:
from prometheus_client import Counter, Histogram

webhook_received = Counter('webhooks_received_total', 'Total webhooks received', ['event_type'])
webhook_processed = Counter('webhooks_processed_total', 'Successfully processed', ['event_type'])
webhook_failed = Counter('webhooks_failed_total', 'Failed webhooks', ['event_type'])
webhook_duration = Histogram('webhook_processing_seconds', 'Webhook processing time')

async def process_webhook(event):
    event_type = event["type"]
    webhook_received.labels(event_type=event_type).inc()

    with webhook_duration.time():
        try:
            await handle_event(event)
            webhook_processed.labels(event_type=event_type).inc()
        except Exception as e:
            webhook_failed.labels(event_type=event_type).inc()
            raise

Alert on Failures

Set up alerts for webhook issues:
def check_webhook_health():
    """Alert if webhook error rate is high"""
    error_rate = get_webhook_error_rate(last_minutes=15)

    if error_rate > 0.05:  # > 5% error rate
        send_alert(
            "High webhook error rate",
            f"Error rate: {error_rate:.2%} in last 15 minutes"
        )

Testing

Local Testing with ngrok

Test webhooks locally:
# Start ngrok tunnel
ngrok http 8000

# Use ngrok URL in webhook endpoint
create_webhook_endpoint(
    customer_id="cus_test_123",
    url="https://abc123.ngrok.io/webhooks",
    events=["payout.created", "payout.completed"]
)

Mock Webhook Events

Create test harness for webhook events:
def simulate_webhook_event(event_type, data):
    """Simulate webhook for testing"""
    event = {
        "id": f"evt_test_{uuid.uuid4()}",
        "type": event_type,
        "created": datetime.now(UTC).isoformat(),
        "data": data
    }

    # Generate valid signature
    payload = json.dumps(event)
    signature = hmac.new(
        TEST_SECRET.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()

    # Call webhook handler
    response = requests.post(
        "http://localhost:8000/webhooks",
        json=event,
        headers={"X-Webhook-Signature": signature}
    )

    return response

# Usage
simulate_webhook_event("payout.completed", {
    "id": "pout_test_123",
    "status": "completed"
})

Error Handling

Graceful Degradation

Handle webhook failures without breaking your app:
async def process_webhook(event):
    """Process webhook with fallback logic"""
    try:
        # Primary processing
        await handle_event(event)
    except DatabaseError as e:
        # Queue for retry
        logger.error(f"Database error, queuing for retry: {e}")
        retry_queue.enqueue(event)
    except ValidationError as e:
        # Log and alert, but don't retry
        logger.error(f"Invalid webhook data: {e}")
        send_alert("Invalid webhook received", event)
    except Exception as e:
        # Unknown error - log and queue
        logger.error(f"Unexpected error: {e}")
        retry_queue.enqueue(event)

Common Pitfalls

❌ Blocking Operations

Don’t perform slow operations synchronously:
# Bad
@app.post("/webhooks")
async def handle_webhook(request):
    event = await request.json()
    send_email(event)  # ❌ Blocks webhook response
    return {"status": "ok"}

# Good
@app.post("/webhooks")
async def handle_webhook(request):
    event = await request.json()
    background_tasks.add_task(send_email, event)  # ✅ Async
    return {"status": "ok"}

❌ No Idempotency

Don’t process duplicates:
# Bad - no duplicate checking
def handle_payout_completed(event):
    payout_id = event["data"]["id"]
    mark_order_paid(payout_id)  # ❌ May mark twice

# Good - idempotent
def handle_payout_completed(event):
    event_id = event["id"]
    if is_processed(event_id):
        return
    payout_id = event["data"]["id"]
    mark_order_paid(payout_id)
    mark_processed(event_id)

Summary Checklist

  • ✅ Verify all webhook signatures
  • ✅ Use HTTPS endpoints only
  • ✅ Return 200 within 5 seconds
  • ✅ Implement idempotency checking
  • ✅ Handle out-of-order events
  • ✅ Process webhooks asynchronously
  • ✅ Log all webhook activity
  • ✅ Monitor error rates and latency
  • ✅ Test with ngrok and mock events
  • ✅ Implement graceful error handling