Overview
Webhook security is crucial for protecting your application from malicious requests. This guide covers signature verification, endpoint security, and best practices for handling webhook events safely.Always verify webhook signatures before processing events. Unverified webhooks can be exploited by attackers to trigger unauthorized actions in your system.
Signature Verification
How Webhook Signatures Work
HopNow signs every webhook payload using HMAC-SHA256 with your webhook secret. The signature is included in theX-Webhook-Signature header:
Copy
X-Webhook-Signature: sha256=a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0u1v2w3x4y5z6a7b8c9d0e1f2
Signature Verification Implementation
Copy
import hmac
import hashlib
def verify_webhook_signature(payload, signature, webhook_secret):
"""
Verify webhook signature
Args:
payload (str): Raw request body
signature (str): Signature from X-Webhook-Signature header
webhook_secret (str): Your webhook secret
Returns:
bool: True if signature is valid
"""
# Remove 'sha256=' prefix if present
if signature.startswith('sha256='):
signature = signature[7:]
# Create expected signature
expected_signature = hmac.new(
webhook_secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Use constant-time comparison to prevent timing attacks
return hmac.compare_digest(signature, expected_signature)
# Flask example
from flask import Flask, request, abort
import json
app = Flask(__name__)
WEBHOOK_SECRET = 'your_webhook_secret_here'
@app.route('/webhook', methods=['POST'])
def handle_webhook():
# Get raw payload and signature
payload = request.get_data(as_text=True)
signature = request.headers.get('X-Webhook-Signature')
# Verify signature
if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
abort(401) # Unauthorized
# Parse and process the webhook
try:
event = json.loads(payload)
process_webhook_event(event)
return '', 200
except Exception as e:
print(f"Webhook processing error: {e}")
return '', 500
def process_webhook_event(event):
"""Process verified webhook event"""
event_type = event.get('type')
if event_type == 'account.created':
handle_account_created(event['data'])
elif event_type == 'payin.completed':
handle_payin_completed(event['data'])
# Add other event handlers...
Endpoint Security
1. HTTPS Requirements
All webhook URLs must use HTTPS. HTTP URLs will be rejected during endpoint creation.
- Prevents man-in-the-middle attacks
- Ensures payload confidentiality
- Maintains signature integrity during transmission
2. IP Allowlisting
Restrict webhook requests to HopNow’s IP ranges:Copy
# Nginx example
location /webhook {
allow 203.0.113.0/24; # HopNow IP range 1
allow 198.51.100.0/24; # HopNow IP range 2
deny all;
proxy_pass http://your-app;
}
Copy
# Apache example
<Location "/webhook">
Require ip 203.0.113.0/24
Require ip 198.51.100.0/24
</Location>
3. Rate Limiting
Implement rate limiting to prevent abuse:Copy
from functools import wraps
from time import time
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def rate_limit(max_requests=100, window_seconds=3600):
"""Rate limiting decorator"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Use IP address as key
key = f"webhook_rate_limit:{request.remote_addr}"
# Get current request count
current = redis_client.get(key)
if current is None:
# First request in window
redis_client.setex(key, window_seconds, 1)
else:
current = int(current)
if current >= max_requests:
abort(429) # Too Many Requests
else:
redis_client.incr(key)
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/webhook', methods=['POST'])
@rate_limit(max_requests=50, window_seconds=3600)
def handle_webhook():
# Webhook handling logic...
pass
Security Best Practices
1. Idempotency Handling
Prevent duplicate processing of webhook events:Copy
import redis
redis_client = redis.Redis()
def is_duplicate_event(event_id):
"""Check if event has already been processed"""
key = f"processed_webhook:{event_id}"
return redis_client.exists(key)
def mark_event_processed(event_id):
"""Mark event as processed (expire after 24 hours)"""
key = f"processed_webhook:{event_id}"
redis_client.setex(key, 86400, 1)
@app.route('/webhook', methods=['POST'])
def handle_webhook():
# Verify signature first...
event = json.loads(payload)
event_id = event.get('id')
# Check for duplicate
if is_duplicate_event(event_id):
return '', 200 # Already processed, return success
try:
# Process the event
process_webhook_event(event)
# Mark as processed
mark_event_processed(event_id)
return '', 200
except Exception as e:
# Don't mark as processed if there was an error
print(f"Webhook processing failed: {e}")
return '', 500
2. Input Validation
Always validate webhook payloads:Copy
from jsonschema import validate, ValidationError
# Define expected webhook schema
WEBHOOK_SCHEMA = {
"type": "object",
"required": ["id", "type", "data", "created"],
"properties": {
"id": {"type": "string", "pattern": "^evt_[a-zA-Z0-9]+$"},
"type": {"type": "string", "enum": [
"account.created", "account.updated", "account.deactivated",
"payin.created", "payin.completed", "payin.failed",
"payout.created", "payout.completed", "payout.failed"
]},
"data": {"type": "object"},
"created": {"type": "string", "format": "date-time"}
}
}
def validate_webhook_payload(payload):
"""Validate webhook payload against schema"""
try:
validate(instance=payload, schema=WEBHOOK_SCHEMA)
return True
except ValidationError as e:
print(f"Webhook validation error: {e}")
return False
@app.route('/webhook', methods=['POST'])
def handle_webhook():
# Verify signature...
event = json.loads(payload)
# Validate payload structure
if not validate_webhook_payload(event):
return '', 400 # Bad Request
# Process validated event
process_webhook_event(event)
return '', 200
3. Error Handling and Logging
Implement comprehensive error handling:Copy
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.route('/webhook', methods=['POST'])
def handle_webhook():
start_time = datetime.utcnow()
event_id = None
try:
# Get payload and signature
payload = request.get_data(as_text=True)
signature = request.headers.get('X-Webhook-Signature')
# Log incoming webhook
logger.info("Webhook received", extra={
"ip_address": request.remote_addr,
"signature_present": bool(signature),
"payload_size": len(payload)
})
# Verify signature
if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
logger.warning("Webhook signature verification failed", extra={
"ip_address": request.remote_addr,
"signature": signature[:20] + "..." if signature else None
})
return '', 401
# Parse event
event = json.loads(payload)
event_id = event.get('id')
event_type = event.get('type')
logger.info("Processing webhook event", extra={
"event_id": event_id,
"event_type": event_type
})
# Check for duplicate
if is_duplicate_event(event_id):
logger.info("Duplicate webhook event ignored", extra={
"event_id": event_id
})
return '', 200
# Process event
process_webhook_event(event)
mark_event_processed(event_id)
# Log success
processing_time = (datetime.utcnow() - start_time).total_seconds()
logger.info("Webhook processed successfully", extra={
"event_id": event_id,
"event_type": event_type,
"processing_time_seconds": processing_time
})
return '', 200
except json.JSONDecodeError as e:
logger.error("Webhook JSON parsing failed", extra={
"error": str(e),
"payload_preview": payload[:100] if payload else None
})
return '', 400
except Exception as e:
logger.error("Webhook processing failed", extra={
"event_id": event_id,
"error": str(e),
"error_type": type(e).__name__
}, exc_info=True)
return '', 500
def process_webhook_event(event):
"""Process webhook event with error handling"""
event_type = event.get('type')
try:
if event_type == 'account.created':
handle_account_created(event['data'])
elif event_type == 'payin.completed':
handle_payin_completed(event['data'])
else:
logger.warning("Unknown event type", extra={
"event_type": event_type,
"event_id": event.get('id')
})
except Exception as e:
logger.error("Event handler failed", extra={
"event_type": event_type,
"event_id": event.get('id'),
"handler_error": str(e)
})
raise # Re-raise to trigger webhook retry
4. Timeout Handling
Ensure your webhook endpoint responds quickly:Copy
import signal
from contextlib import contextmanager
@contextmanager
def timeout(seconds):
"""Context manager for enforcing timeouts"""
def timeout_handler(signum, frame):
raise TimeoutError("Operation timed out")
# Set up timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
# Cancel timeout
signal.alarm(0)
@app.route('/webhook', methods=['POST'])
def handle_webhook():
try:
# Enforce 25-second timeout (webhook timeout is 30s)
with timeout(25):
# Webhook processing logic...
process_webhook_event(event)
return '', 200
except TimeoutError:
logger.error("Webhook processing timeout", extra={
"event_id": event.get('id')
})
return '', 500
Testing Webhook Security
1. Signature Testing
Test your signature verification with known values:Copy
def test_webhook_signature_verification():
"""Test webhook signature verification"""
webhook_secret = "test_secret_123"
payload = '{"id":"evt_test","type":"account.created"}'
# Create expected signature
expected_signature = hmac.new(
webhook_secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Test valid signature
assert verify_webhook_signature(
payload,
f"sha256={expected_signature}",
webhook_secret
) == True
# Test invalid signature
assert verify_webhook_signature(
payload,
"sha256=invalid_signature",
webhook_secret
) == False
# Test malformed signature
assert verify_webhook_signature(
payload,
"invalid_format",
webhook_secret
) == False
# Run the test
test_webhook_signature_verification()
print("Signature verification tests passed!")
2. Security Testing Checklist
- Signature Verification: Test with valid, invalid, and malformed signatures
- HTTPS Enforcement: Verify HTTP requests are rejected
- Rate Limiting: Test webhook rate limiting works correctly
- Idempotency: Verify duplicate events are handled properly
- Input Validation: Test with malformed and invalid payloads
- Timeout Handling: Ensure timeouts are handled gracefully
- Error Logging: Verify security events are logged properly
Monitoring and Alerting
Set up monitoring for webhook security events:Copy
def setup_webhook_monitoring():
"""Set up webhook security monitoring"""
# Monitor failed signature verifications
failed_signature_counter = Counter(
'webhook_signature_failures_total',
'Number of failed webhook signature verifications',
['ip_address']
)
# Monitor processing times
processing_time_histogram = Histogram(
'webhook_processing_seconds',
'Time spent processing webhook events',
['event_type']
)
# Monitor rate limit hits
rate_limit_counter = Counter(
'webhook_rate_limit_hits_total',
'Number of webhook rate limit violations',
['ip_address']
)
# Alert on high failure rates
def check_webhook_security_metrics():
"""Alert on webhook security issues"""
# Alert if signature failure rate > 10% in last hour
failure_rate = get_signature_failure_rate(hours=1)
if failure_rate > 0.1:
send_security_alert(
"High webhook signature failure rate",
f"Failure rate: {failure_rate:.1%}"
)
# Alert on processing timeouts
timeout_count = get_timeout_count(hours=1)
if timeout_count > 5:
send_alert(
"High webhook timeout rate",
f"Timeouts in last hour: {timeout_count}"
)
For questions about webhook security or to report security issues, contact our security team.