Fix TaskRevokedError: Celery task 'orders.tasks.process_payment' raised unexpected: kombu.exceptions.OperationalError: [Errno 111] Connection refused in Django
This error means a Celery task could not connect to the message broker (Redis or RabbitMQ), or the task itself raised an unhandled exception. Fix it by verifying the broker is running and accessible, configuring task retries with exponential backoff, and adding proper error handling inside the task function.
Reading the Stack Trace
Here's what each line means:
- File "/app/orders/tasks.py", line 15, in process_payment: The Celery task calls the payment gateway, which is an external HTTP service that may be temporarily unavailable.
- File "/app/payments/gateway.py", line 42, in charge: The payment API request failed because the external service is unreachable or timed out.
- celery.exceptions.Retry: Retry in 60s: ConnectionError('HTTPSConnectionPool...'): Celery is retrying the task after 60 seconds, but without exponential backoff or a max retry limit.
Common Causes
1. No retry logic in task
The task has no retry mechanism, so transient failures from external services cause permanent task failure.
@shared_task
def process_payment(order_id):
order = Order.objects.get(id=order_id)
result = payment_gateway.charge(order.total, order.payment_token)
order.status = 'paid'
order.save() # Never reached if charge fails
2. Broker connection not configured for production
The Celery broker URL points to localhost in production where the broker runs on a separate host.
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0' # Wrong in production
3. Task not idempotent
The task is not idempotent, so retrying it causes duplicate charges or side effects.
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):
order = Order.objects.get(id=order_id)
payment_gateway.charge(order.total, order.payment_token) # Charged again on retry!
order.status = 'paid'
order.save()
The Fix
Add retry logic with exponential backoff for transient network errors. Check if the order is already paid to make the task idempotent. Use acks_late so the task is re-delivered if the worker crashes mid-execution. Log all failures for monitoring.
@shared_task
def process_payment(order_id):
order = Order.objects.get(id=order_id)
result = payment_gateway.charge(order.total, order.payment_token)
order.status = 'paid'
order.save()
import logging
from celery import shared_task
from requests.exceptions import ConnectionError, Timeout
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
max_retries=5,
default_retry_delay=60,
retry_backoff=True,
retry_backoff_max=600,
acks_late=True,
)
def process_payment(self, order_id):
order = Order.objects.get(id=order_id)
if order.status == 'paid':
logger.info(f'Order {order_id} already paid, skipping.')
return
try:
result = payment_gateway.charge(order.total, order.payment_token)
order.status = 'paid'
order.payment_id = result['transaction_id']
order.save()
except (ConnectionError, Timeout) as exc:
logger.warning(f'Payment failed for order {order_id}, retrying: {exc}')
raise self.retry(exc=exc)
except Exception as exc:
logger.error(f'Payment permanently failed for order {order_id}: {exc}')
order.status = 'failed'
order.save()
raise
Testing the Fix
import pytest
from unittest.mock import patch, MagicMock
from django.test import TestCase
from orders.tasks import process_payment
from orders.models import Order
class TestProcessPaymentTask(TestCase):
def setUp(self):
self.order = Order.objects.create(
total=99.99,
payment_token='tok_test',
status='pending',
)
@patch('orders.tasks.payment_gateway')
def test_successful_payment(self, mock_gateway):
mock_gateway.charge.return_value = {'transaction_id': 'txn_123'}
process_payment(self.order.id)
self.order.refresh_from_db()
assert self.order.status == 'paid'
assert self.order.payment_id == 'txn_123'
@patch('orders.tasks.payment_gateway')
def test_connection_error_retries(self, mock_gateway):
from requests.exceptions import ConnectionError
mock_gateway.charge.side_effect = ConnectionError('Connection refused')
with pytest.raises(ConnectionError):
process_payment(self.order.id)
@patch('orders.tasks.payment_gateway')
def test_already_paid_order_skipped(self, mock_gateway):
self.order.status = 'paid'
self.order.save()
process_payment(self.order.id)
mock_gateway.charge.assert_not_called()
@patch('orders.tasks.payment_gateway')
def test_permanent_failure_marks_failed(self, mock_gateway):
mock_gateway.charge.side_effect = ValueError('Invalid token')
with pytest.raises(ValueError):
process_payment(self.order.id)
self.order.refresh_from_db()
assert self.order.status == 'failed'
Run your tests:
pytest
Pushing Through CI/CD
git checkout -b fix/celery-task-retry-logic,git add orders/tasks.py,git commit -m "fix: add retry logic and idempotency to payment task",git push origin fix/celery-task-retry-logic
Your CI config should look something like this:
name: CI
on:
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
services:
redis:
image: redis:7
ports:
- 6379:6379
postgres:
image: postgres:15
env:
POSTGRES_DB: test_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- run: pip install -r requirements.txt
- run: pytest --tb=short -q
env:
CELERY_BROKER_URL: redis://localhost:6379/0
The Full Manual Process: 18 Steps
Here's every step you just went through to fix this one bug:
- Notice the error alert or see it in your monitoring tool
- Open the error dashboard and read the stack trace
- Identify the file and line number from the stack trace
- Open your IDE and navigate to the file
- Read the surrounding code to understand context
- Reproduce the error locally
- Identify the root cause
- Write the fix
- Run the test suite locally
- Fix any failing tests
- Write new tests covering the edge case
- Run the full test suite again
- Create a new git branch
- Commit and push your changes
- Open a pull request
- Wait for code review
- Merge and deploy to production
- Monitor production to confirm the error is resolved
Total time: 30-60 minutes. For one bug.
Or Let bugstack Fix It in Under 2 minutes
Every step above? bugstack does it automatically.
Step 1: Install the SDK
pip install bugstack
Step 2: Initialize
import bugstack
bugstack.init(api_key=os.environ["BUGSTACK_API_KEY"])
Step 3: There is no step 3.
bugstack handles everything from here:
- Captures the stack trace and request context
- Pulls the relevant source files from your GitHub repo
- Analyzes the error and understands the code context
- Generates a minimal, verified fix
- Runs your existing test suite
- Pushes through your CI/CD pipeline
- Deploys to production (or opens a PR for review)
Time from error to fix deployed: Under 2 minutes.
Human involvement: zero.
Try bugstack Free →No credit card. 5-minute setup. Cancel anytime.
Deploying the Fix (Manual Path)
- Run the full test suite locally including mocked Celery task tests.
- Open a pull request with the retry and idempotency changes.
- Wait for CI checks to pass on the PR.
- Have a teammate review and approve the PR.
- Merge to main and monitor task success rates in staging.
Frequently Asked Questions
BugStack runs the fix through your existing test suite, generates tests for success, retry, and permanent failure scenarios, and validates that task idempotency is maintained before marking it safe to deploy.
BugStack never pushes directly to production. Every fix goes through a pull request with full CI checks, so your team can review it before merging.
Use Flower for real-time monitoring, or configure Celery result backend to store task results in the database. Set up alerts for task failure rates using your monitoring tool.
Redis is simpler to set up and works well for most use cases. RabbitMQ is more robust for message guarantees and complex routing. Use Redis unless you need advanced messaging features.