Background
The product component JobCenter uses Celery to implement an asynchronous task center, running both job-center-worker (celery worker) and job-center-scheduler (celery beat) processes, using MongoDB as the Backend to store messages, etc. (Celery has officially stated that it no longer maintains support for MongoDB). MongoDB is configured with ReplicaSet to ensure high availability.
Recently, we encountered the problem of no free channel ids in Celery/Kombu. After troubleshooting, the issue was solved in this PR. After considering the workload and maintainability of cherry-pick, we finally upgraded the celery and kombu components in the product from 3.x to 4.x.
A test colleague recently found that ifdown
of the storage network of a MongoDB node causes JobCenter to hang when conducting reliability tests.
Survey
Celery
First try to reproduce the problem, first try ifdown Primary node storage network, the phenomenon is reproduced; try ifdown Secondary node storage network, can not be reproduced; try to stop MongoDB service instead of ifdown, Primary or Secondary can not be reproduced.
Tried to stop MongoDB service instead of ifdown, neither Primary nor Secondary could be reproduced. It is presumed to be related to MongoDB connection processing.
Observe the logs of the phenomenon recovery, when the storage network is abnormal, there is no output in the logs. After the storage network is back to normal, we can see that Celery logs an exception when trying to connect to the Broker (MongoDB) and tries to reconnect.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
[2022-05-07 10:13:01,362: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/usr/lib/python2.7/site-packages/celery/worker/loops.py", line 121, in synloop
connection.drain_events(timeout=2.0)
File "/usr/lib/python2.7/site-packages/kombu/connection.py", line 315, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 963, in drain_events
get(self._deliver, timeout=timeout)
File "/usr/lib/python2.7/site-packages/kombu/utils/scheduling.py", line 56, in get
return self.fun(resource, callback, **kwargs)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 1001, in _drain_channel
return channel.drain_events(callback=callback, timeout=timeout)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 745, in drain_events
return self._poll(self.cycle, callback, timeout=timeout)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 402, in _poll
return cycle.get(callback)
File "/usr/lib/python2.7/site-packages/kombu/utils/scheduling.py", line 56, in get
return self.fun(resource, callback, **kwargs)
File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 405, in _get_and_deliver
message = self._get(queue)
File "/usr/lib/python2.7/site-packages/kombu/transport/mongodb.py", line 141, in _get
remove=True,
File "/usr/lib64/python2.7/site-packages/pymongo/collection.py", line 2315, in find_and_modify
allowable_errors=[_NO_OBJ_ERROR])
File "/usr/lib64/python2.7/site-packages/pymongo/collection.py", line 205, in _command
read_concern=read_concern)
File "/usr/lib64/python2.7/site-packages/pymongo/pool.py", line 218, in command
self._raise_connection_failure(error)
File "/usr/lib64/python2.7/site-packages/pymongo/pool.py", line 346, in _raise_connection_failure
raise error
AutoReconnect: connection closed
[2022-05-07 10:13:01,363: WARNING/MainProcess] Restoring 1 unacknowledged message(s)
|
The corresponding Celery code is in worker/consumer/consumer.py, and Blueprint is the Celery startup portal. You can see that exception handling is done in the blueprint.start(self)
stage, triggering reconnection for self.connection_errors
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
CONNECTION_RETRY = """\
consumer: Connection to broker lost. \
Trying to re-establish the connection...\
"""
...
def start(self):
blueprint = self.blueprint
while blueprint.state != CLOSE:
maybe_shutdown()
if self.restart_count:
try:
self._restart_state.step()
except RestartFreqExceeded as exc:
crit('Frequent restarts detected: %r', exc, exc_info=1)
sleep(1)
self.restart_count += 1
try:
blueprint.start(self)
except self.connection_errors as exc:
# If we're not retrying connections, no need to catch
# connection errors
if not self.app.conf.broker_connection_retry:
raise
if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
raise # Too many open files
maybe_shutdown()
if blueprint.state != CLOSE:
if self.connection:
self.on_connection_error_after_connected(exc)
else:
self.on_connection_error_before_connected(exc)
self.on_close()
blueprint.restart(self)
def on_connection_error_before_connected(self, exc):
error(CONNECTION_ERROR, self.conninfo.as_uri(), exc,
'Trying to reconnect...')
def on_connection_error_after_connected(self, exc):
warn(CONNECTION_RETRY, exc_info=True)
try:
self.connection.collect()
except Exception:
pass
|
self.connection_errors
corresponds to what is actually defined by Transport in Kombu, which can be viewed in kombu/kombu/transport/mongodb.py
. In the current version, it is defined as pymongo.errors.ConnectionFailure
. The common network connection exceptions AutoReconnect
or NetworkTimeout
in pymongo are inherited from ConnectionFailure
.
1
2
3
4
5
6
7
8
9
|
class Transport(virtual.Transport):
Channel = Channel
can_parse_url = True
polling_interval = 1
default_port = DEFAULT_PORT
connection_errors = (
virtual.Transport.connection_errors + (errors.ConnectionFailure, )
)
|
As of now, Celery can correctly handle exceptions reported by kombu, but when storing network exceptions, Kombu does not throw exceptions, so the problem investigation shifts from Celery to Kombu.
Kombu
Look at MongoDB Transport’s handling of the connection creation part. The code execution path is: client
-> _create_client
-> _open
-> _parse_uri
, where _open
is the actual connection creation process and the parameters used for the connection are returned in _parse_uri
. _parse_uri
eventually calls pymongo.uri_parser.parse_uri
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
def _open(self, scheme='mongodb://'):
hostname, dbname, options = self._parse_uri(scheme=scheme)
conf = self._prepare_client_options(options)
conf['host'] = hostname
env = _detect_environment()
if env == 'gevent':
from gevent import monkey
monkey.patch_all()
elif env == 'eventlet':
from eventlet import monkey_patch
monkey_patch()
mongoconn = MongoClient(**conf)
database = mongoconn[dbname]
version_str = mongoconn.server_info()['version']
version = tuple(map(int, version_str.split('.')))
if version < (1, 3):
raise VersionMismatch(E_SERVER_VERSION.format(version_str))
elif self.ttl and version < (2, 2):
raise VersionMismatch(E_NO_TTL_INDEXES.format(version_str))
return database
def _parse_uri(self, scheme='mongodb://'):
# See mongodb uri documentation:
# http://docs.mongodb.org/manual/reference/connection-string/
client = self.connection.client
hostname = client.hostname
if not hostname.startswith(scheme):
hostname = scheme + hostname
if not hostname[len(scheme):]:
hostname += self.default_hostname
if client.userid and '@' not in hostname:
head, tail = hostname.split('://')
credentials = client.userid
if client.password:
credentials += ':' + client.password
hostname = head + '://' + credentials + '@' + tail
port = client.port if client.port else self.default_port
parsed = uri_parser.parse_uri(hostname, port)
dbname = parsed['database'] or client.virtual_host
if dbname in ('/', None):
dbname = self.default_database
options = {
'auto_start_request': True,
'ssl': self.ssl,
'connectTimeoutMS': (int(self.connect_timeout * 1000)
if self.connect_timeout else None),
}
options.update(parsed['options'])
return hostname, dbname, options
|
Suppose our connection parameters are mongodb://192.168.1.1:27017,192.168.1.2:27017/yiran
, then the result parsed by pymongo.uri_parser.parse_uri
will be: {'username': None, 'nodelist ': [('192.168.1.1', 27017), ('192.168.1.2', 27017)], 'database': 'yiran', 'collection': None, 'password': None, 'options': {}}
.
socketTimeoutMS In our environment, there are no options specified in the MongoDB URI, so pymongo.uri_parser.parse_uri
results in options
that are empty. The options
used for the final connection are the options defined in _parse_uri
, where the class variable connect_timeout
is defined as None in Channel, so Kombu does not set socketTimeoutMS
for the final MongoDB connection, and if it does not set socketTimeoutMS
, the default is None and waits forever. When there is an exception in the network, the intuitive phenomenon will be hang.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class Channel(virtual.Channel):
"""MongoDB Channel."""
supports_fanout = True
# Mutable container. Shared by all class instances
_fanout_queues = {}
# Options
ssl = False
ttl = False
connect_timeout = None
capped_queue_size = 100000
calc_queue_size = True
|
Celery and Kombu parameter passing
Now that we have observed that the connection parameters are not as expected, why did the previous 3.x version have no problem? Switching to the 3.x branch to see the corresponding code, we can see that the general logic is similar, regarding the handling of options, there is a line in 3.x: options.update(client.transport_options)
, where the client is assigned at the beginning of the function, corresponding to self. connection.client
, and self.connection
is the parameter passed in by the Transport construct.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
def _parse_uri(self, scheme='mongodb://'):
# See mongodb uri documentation:
# http://docs.mongodb.org/manual/reference/connection-string/
client = self.connection.client
hostname = client.hostname
if not hostname.startswith(scheme):
hostname = scheme + hostname
if not hostname[len(scheme):]:
hostname += DEFAULT_HOST
if client.userid and '@' not in hostname:
head, tail = hostname.split('://')
credentials = client.userid
if client.password:
credentials += ':' + client.password
hostname = head + '://' + credentials + '@' + tail
port = client.port if client.port is not None else DEFAULT_PORT
parsed = uri_parser.parse_uri(hostname, port)
dbname = parsed['database'] or client.virtual_host
if dbname in ('/', None):
dbname = 'kombu_default'
options = {
'auto_start_request': True,
'ssl': client.ssl,
'connectTimeoutMS': (int(client.connect_timeout * 1000)
if client.connect_timeout else None),
}
options.update(client.transport_options)
options.update(parsed['options'])
return hostname, dbname, options
|
connection
corresponds to Connection
in Kombu, which hides Transport from the outside, and Celery establishes the connection during the initialization phase, calling the path celery/app/base.py:Celery._connection
-> celery/ app/amqp.py:AMQP.Connection
-> kombu/connection.py:Connection
. The transfer parameter transport_options
is the parameter configured in the Celery app declaration, and the specific configurable parameters can be found in the documentation: https://docs.celeryq.dev/en/stable/userguide/configuration.html.
In our scenario, the following parameters are declared.
1
2
3
4
5
6
7
8
|
BROKER_TRANSPORT_OPTIONS = {
"connect": False,
"maxPoolSize": 5 if "worker" in process_cmdline else 2,
"socketTimeoutMS": 5000,
"connectTimeoutMS": 5000,
"serverSelectionTimeoutMS": 5000,
"w": 0,
}
|
When Kombu uses MongoDB Transport, it will eventually create MongoDB connections with these parameters, so the phenomenon described in this question will not occur.
Celery change background
@rmihael reported an issue: Celery events are not removed from MongoDB broker #1047, indicating that after using Celery Flower (Celery monitoring component), the events in messages
are not removed, resulting in a large amount of MongoDB storage space. The issue discusses the eventual decision to use MongoDB TTL to resolve this issue.
In the Kombu 4.x development cycle, @daevaorn committed MongoDB TTL support and refactorings #537 to support MongoDB TTL, which contains a PR with The PR contains a number of TTL-unrelated commits and includes some refactoring, with the following commits.
- Complete unit tests suit for MongoDB transport
- Optional TTL support for MongoDB transport. AMQP TTL headers: x-messa…
- Rearrange methods at MongoDB channel class
- Another MongoDB transport clean up and refactor. Use of transport opt…
- Opt-out for queue size calculation
- Use natural sort for more FIFO semantic
- Fix docstrings
Optional TTL support for MongoDB transport.
is the most critical change, ignore the TTL changes and focus on the changes to establish MongoDB connections. In the Channel
class, some new class variables have been added to identify the current configuration, in _parse_uri
, SSL,connectTImeoutMS has been replaced with self
from client
, and options.update(client.transport_ options)
has been removed. options). The removal of
options.update(client.transport_options)` is the key to this problem.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
class Channel(virtual.Channel):
_client = None
supports_fanout = True
+
+ # Mutable containers. Shared by all class instances
_fanout_queues = {}
+ # Options
+ connect_timeout = None
+ ssl = False
+ capped_queue_size = 100000
+ ttl = False
+
+ from_transport_options = (
+ virtual.Channel.from_transport_options
+ + ('connect_timeout', 'ssl', 'ttl', 'capped_queue_size'))
+
+
def __init__(self, *vargs, **kwargs):
super(Channel, self).__init__(*vargs, **kwargs)
...
...
def _parse_uri(self, scheme='mongodb://'):
...
options = {
'auto_start_request': True,
- 'ssl': client.ssl,
- 'connectTimeoutMS': (int(client.connect_timeout * 1000)
- if client.connect_timeout else None),
+ 'ssl': self.ssl,
+ 'connectTimeoutMS': (int(self.connect_timeout * 1000)
+ if self.connect_timeout else None),
}
- options.update(client.transport_options)
options.update(parsed['options'])
|
Summary
Celery Kombu code management feels a bit unclear and it is very difficult to compare on multiple branches. Full set testing is necessary when upgrading major versions of necessary components.