Skip to content

Commit d0ccd4f

Browse files
Merge pull request #10205 from rabbitmq/mergify/bp/v3.12.x/pr-10203
Overwrite rabbit_mqtt_qos0_queue record from crashed node (backport #10203)
2 parents e953289 + d73f3cc commit d0ccd4f

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

+15-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ is_stateful() ->
6767
false.
6868

6969
-spec declare(amqqueue:amqqueue(), node()) ->
70-
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
71-
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
70+
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()}.
7271
declare(Q0, _Node) ->
7372
%% The queue gets persisted such that routing to this
7473
%% queue (via the topic exchange) works as usual.
@@ -85,6 +84,20 @@ declare(Q0, _Node) ->
8584
{arguments, amqqueue:get_arguments(Q0)},
8685
{user_who_performed_action, ActingUser}]),
8786
{new, Q};
87+
{absent, OldQ, nodedown} ->
88+
%% This case body can be deleted once Mnesia is unsupported.
89+
OldPid = amqqueue:get_pid(OldQ),
90+
OldNode = node(OldPid),
91+
rabbit_log_queue:debug(
92+
"Overwriting record of ~s of type ~s on node ~s since "
93+
"formerly hosting node ~s seems to be down (former pid ~p)",
94+
[rabbit_misc:rs(amqqueue:get_name(Q0)), ?MODULE, node(), OldNode, OldPid]),
95+
case rabbit_amqqueue:internal_declare(Q0, true) of
96+
{created, Q} ->
97+
{new, Q};
98+
Other ->
99+
Other
100+
end;
88101
Other ->
89102
Other
90103
end.

deps/rabbitmq_mqtt/test/shared_SUITE.erl

+35
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ subgroups() ->
109109
flow_quorum_queue,
110110
flow_stream,
111111
rabbit_mqtt_qos0_queue,
112+
rabbit_mqtt_qos0_queue_kill_node,
112113
cli_list_queues,
113114
maintenance,
114115
delete_create_queue,
@@ -1019,6 +1020,40 @@ rabbit_mqtt_qos0_queue(Config) ->
10191020
ok = emqtt:disconnect(Sub),
10201021
ok = emqtt:disconnect(Pub).
10211022

1023+
rabbit_mqtt_qos0_queue_kill_node(Config) ->
1024+
Topic = atom_to_binary(?FUNCTION_NAME),
1025+
Pub = connect(<<"publisher">>, Config, 2, []),
1026+
1027+
SubscriberId = <<"subscriber">>,
1028+
Sub0 = connect(SubscriberId, Config, 0, []),
1029+
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic, qos0),
1030+
ok = emqtt:publish(Pub, Topic, <<"m0">>, qos0),
1031+
ok = expect_publishes(Sub0, Topic, [<<"m0">>]),
1032+
1033+
process_flag(trap_exit, true),
1034+
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
1035+
%% Wait a bit to ensure that Mnesia deletes the queue record on nodes 1 and 2 from Mnesia
1036+
%% table rabbit_queue (but the queue record is still present in rabbit_durable_queue).
1037+
timer:sleep(500),
1038+
Sub1 = connect(SubscriberId, Config, 1, []),
1039+
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic, qos0),
1040+
ok = emqtt:publish(Pub, Topic, <<"m1">>, qos0),
1041+
ok = expect_publishes(Sub1, Topic, [<<"m1">>]),
1042+
1043+
%% Start node 0 to have a majority for Khepri.
1044+
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
1045+
ok = rabbit_ct_broker_helpers:kill_node(Config, 1),
1046+
%% This time, do not wait. Mnesia will contain the queue record in rabbit_durable_queue,
1047+
%% but this time Mnesia may or may not contain the queue record in rabbit_queue.
1048+
Sub2 = connect(SubscriberId, Config, 2, []),
1049+
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic, qos0),
1050+
ok = emqtt:publish(Pub, Topic, <<"m2">>, qos0),
1051+
ok = expect_publishes(Sub2, Topic, [<<"m2">>]),
1052+
1053+
ok = emqtt:disconnect(Sub2),
1054+
ok = emqtt:disconnect(Pub),
1055+
ok = rabbit_ct_broker_helpers:start_node(Config, 1).
1056+
10221057
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
10231058
management_plugin_connection(Config) ->
10241059
KeepaliveSecs = 99,

0 commit comments

Comments
 (0)