Skip to content

Commit b3d3ca7

Browse files
committed
Close #10543. Add If-Unused and If-Empty support for delete_queue for QQs
1 parent 72b2506 commit b3d3ca7

File tree

2 files changed

+46
-49
lines changed

2 files changed

+46
-49
lines changed

Diff for: deps/rabbit/docs/rabbitmqctl.8

+2-2
Original file line numberDiff line numberDiff line change
@@ -2130,9 +2130,9 @@ Evaluates an Erlang expression on the target node
21302130
.It Ar queue_name
21312131
The name of the queue to delete.
21322132
.It Ar --if-empty
2133-
Delete the queue if it is empty (has no messages ready for delivery)
2133+
Delete the queue if it is empty (has no messages ready for delivery). Ignored for Streams
21342134
.It Ar --if-unused
2135-
Delete the queue only if it has no consumers
2135+
Delete the queue only if it has no consumers. Ignored for Streams
21362136
.El
21372137
.Pp
21382138
Deletes a queue.

Diff for: deps/rabbit/src/rabbit_quorum_queue.erl

+44-47
Original file line numberDiff line numberDiff line change
@@ -713,59 +713,56 @@ restart_server({_, _} = Ref) ->
713713
-spec delete(amqqueue:amqqueue(),
714714
boolean(), boolean(),
715715
rabbit_types:username()) ->
716-
{ok, QLen :: non_neg_integer()} |
717-
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
718-
delete(Q, true, _IfEmpty, _ActingUser) when ?amqqueue_is_quorum(Q) ->
719-
{protocol_error, not_implemented,
720-
"cannot delete ~ts. queue.delete operations with if-unused flag set are not supported by quorum queues",
721-
[rabbit_misc:rs(amqqueue:get_name(Q))]};
722-
delete(Q, _IfUnused, true, _ActingUser) when ?amqqueue_is_quorum(Q) ->
723-
{protocol_error, not_implemented,
724-
"cannot delete ~ts. queue.delete operations with if-empty flag set are not supported by quorum queues",
725-
[rabbit_misc:rs(amqqueue:get_name(Q))]};
726-
delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
716+
{ok, QLen :: non_neg_integer()}.
717+
delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
727718
{Name, _} = amqqueue:get_pid(Q),
728719
QName = amqqueue:get_name(Q),
729720
QNodes = get_nodes(Q),
730-
%% TODO Quorum queue needs to support consumer tracking for IfUnused
731-
Timeout = ?DELETE_TIMEOUT,
732-
{ok, ReadyMsgs, _} = stat(Q),
733-
Servers = [{Name, Node} || Node <- QNodes],
734-
case ra:delete_cluster(Servers, Timeout) of
735-
{ok, {_, LeaderNode} = Leader} ->
736-
MRef = erlang:monitor(process, Leader),
737-
receive
738-
{'DOWN', MRef, process, _, _} ->
721+
722+
{ok, ReadyMsgs, Consumers} = stat(Q),
723+
IsEmpty = ReadyMsgs == 0,
724+
IsUnused = Consumers == 0,
725+
if IfEmpty and not(IsEmpty) -> {error, not_empty};
726+
IfUnused and not(IsUnused) -> {error, in_use};
727+
true ->
728+
Timeout = ?DELETE_TIMEOUT,
729+
Servers = [{Name, Node} || Node <- QNodes],
730+
case ra:delete_cluster(Servers, Timeout) of
731+
{ok, {_, LeaderNode} = Leader} ->
732+
MRef = erlang:monitor(process, Leader),
733+
receive
734+
{'DOWN', MRef, process, _, _} ->
739735
ok
740-
after Timeout ->
741-
erlang:demonitor(MRef, [flush]),
742-
ok = force_delete_queue(Servers)
743-
end,
744-
notify_decorators(QName, shutdown),
745-
ok = delete_queue_data(Q, ActingUser),
746-
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
747-
?RPC_TIMEOUT),
748-
{ok, ReadyMsgs};
749-
{error, {no_more_servers_to_try, Errs}} ->
750-
case lists:all(fun({{error, noproc}, _}) -> true;
751-
(_) -> false
752-
end, Errs) of
753-
true ->
754-
%% If all ra nodes were already down, the delete
755-
%% has succeed
756-
delete_queue_data(Q, ActingUser),
736+
after Timeout ->
737+
erlang:demonitor(MRef, [flush]),
738+
ok = force_delete_queue(Servers)
739+
end,
740+
notify_decorators(QName, shutdown),
741+
ok = delete_queue_data(Q, ActingUser),
742+
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
743+
?RPC_TIMEOUT),
757744
{ok, ReadyMsgs};
758-
false ->
759-
%% attempt forced deletion of all servers
760-
rabbit_log:warning(
761-
"Could not delete quorum '~ts', not enough nodes "
762-
" online to reach a quorum: ~255p."
745+
{error, {no_more_servers_to_try, Errs}} ->
746+
case lists:all(fun({{error, noproc}, _}) -> true;
747+
(_) -> false
748+
end, Errs) of
749+
true ->
750+
%% If all ra nodes were already down, the delete
751+
%% has succeed
752+
delete_queue_data(Q, ActingUser),
753+
{ok, ReadyMsgs};
754+
false ->
755+
%% attempt forced deletion of all servers
756+
rabbit_log:warning(
757+
"Could not delete quorum '~ts', not enough nodes "
758+
" online to reach a quorum: ~255p."
763759
" Attempting force delete.",
764-
[rabbit_misc:rs(QName), Errs]),
765-
ok = force_delete_queue(Servers),
766-
notify_decorators(QName, shutdown),
767-
delete_queue_data(Q, ActingUser),
768-
{ok, ReadyMsgs}
760+
[rabbit_misc:rs(QName), Errs]),
761+
ok = force_delete_queue(Servers),
762+
notify_decorators(QName, shutdown),
763+
delete_queue_data(Q, ActingUser),
764+
{ok, ReadyMsgs}
765+
end
769766
end
770767
end.
771768

0 commit comments

Comments
 (0)