diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 69dc09b97c19..d5e6a0364c66 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -77,6 +77,8 @@ force_vhost_queues_shrink_member_to_current_member/1, force_all_queues_shrink_member_to_current_member/0]). +-export([force_checkpoint/2, force_checkpoint_on_queue/1]). + %% for backwards compatibility -export([file_handle_leader_reservation/1, file_handle_other_reservation/0, @@ -141,6 +143,7 @@ -define(RPC_TIMEOUT, 1000). -define(START_CLUSTER_TIMEOUT, 5000). -define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT +-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000). -define(TICK_INTERVAL, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). -define(MEMBER_CHANGE_TIMEOUT, 20_000). @@ -2089,6 +2092,38 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis rabbit_log:warning("Shrinking finished"), ok. +force_checkpoint_on_queue(QName) -> + Node = node(), + QNameFmt = rabbit_misc:rs(QName), + case rabbit_db_queue:get_durable(QName) of + {ok, Q} when ?amqqueue_is_classic(Q) -> + {error, classic_queue_not_supported}; + {ok, Q} when ?amqqueue_is_quorum(Q) -> + {RaName, _} = amqqueue:get_pid(Q), + rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]), + rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT); + {ok, _Q} -> + {error, not_quorum_queue}; + {error, _} = E -> + E + end. + +force_checkpoint(VhostSpec, QueueSpec) -> + [begin + QName = amqqueue:get_name(Q), + case force_checkpoint_on_queue(QName) of + ok -> + {QName, {ok}}; + {error, Err} -> + rabbit_log:warning("~ts: failed to force checkpoint, error: ~w", + [rabbit_misc:rs(QName), Err]), + {QName, {error, Err}} + end + end + || Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE), + is_match(amqqueue:get_vhost(Q), VhostSpec) + andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]. + is_minority(All, Up) -> MinQuorum = length(All) div 2 + 1, length(Up) < MinQuorum. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 6a3167bdcc51..534555365ddb 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). +-include_lib("rabbit/src/rabbit_fifo.hrl"). -import(queue_utils, [wait_for_messages_ready/3, wait_for_messages_pending_ack/3, @@ -97,6 +98,8 @@ groups() -> force_shrink_member_to_current_member, force_all_queues_shrink_member_to_current_member, force_vhost_queues_shrink_member_to_current_member, + force_checkpoint_on_queue, + force_checkpoint, policy_repair, gh_12635, replica_states @@ -1309,6 +1312,70 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> ?assertEqual(3, length(Nodes0)) end || Q <- QQs, VHost <- VHosts]. +force_checkpoint_on_queue(Config) -> + [Server0, _Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + RaName = ra_name(QQ), + QName = rabbit_misc:r(<<"/">>, queue, QQ), + + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + N = 20_000, + rabbit_ct_client_helpers:publish(Ch, QQ, N), + wait_for_messages_ready([Server0], RaName, N), + + %% The state before any checkpoints + rabbit_ct_helpers:await_condition( + fun() -> + {ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), + #{log := #{latest_checkpoint_index := LCI}} = State, + LCI =:= undefined + end), + + {ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), + ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]), + + %% wait for longer than ?CHECK_MIN_INTERVAL_MS ms + timer:sleep(?CHECK_MIN_INTERVAL_MS + 1000), + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_checkpoint_on_queue, [QName]), + + %% Wait for initial checkpoint and make sure it's not 0 + rabbit_ct_helpers:await_condition( + fun() -> + {ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), + ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]), + #{log := #{latest_checkpoint_index := LCI}} = State, + (LCI =/= undefined) andalso (LCI >= N) + end). + +force_checkpoint(Config) -> + [Server0, _Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + QQName = rabbit_misc:r(<<"/">>, queue, QQ), + CQ = <<"force_checkpoint_cq">>, + RaName = ra_name(QQ), + + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, + declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])), + + rabbit_ct_client_helpers:publish(Ch, QQ, 3), + wait_for_messages_ready([Server0], RaName, 3), + + ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_checkpoint, [<<".*">>, <<".*">>]), + ExpectedRes = [{QQName, {ok}}], + + % Result should only have quorum queue + ?assertEqual(ExpectedRes, ForceCheckpointRes). % Tests that, if the process of a QQ is dead in the moment of declaring a policy % that affects such queue, when the process is made available again, the policy diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex new file mode 100644 index 000000000000..bdc587fc83bb --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex @@ -0,0 +1,88 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do + alias RabbitMQ.CLI.Core.{DocGuide} + + @behaviour RabbitMQ.CLI.CommandBehaviour + + defp default_opts, + do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false} + + def switches(), + do: [ + vhost_pattern: :string, + queue_pattern: :string, + errors_only: :boolean + ] + + def merge_defaults(args, opts) do + {args, Map.merge(default_opts(), opts)} + end + + use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments + + def run([], %{ + node: node_name, + vhost_pattern: vhost_pat, + queue_pattern: queue_pat, + errors_only: errors_only + }) do + args = [vhost_pat, queue_pat] + + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do + {:badrpc, _} = error -> + error + + results when errors_only -> + for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:result, res} + ] + + results -> + for {{:resource, vhost, _kind, name}, res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:result, res} + ] + end + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.Table + + def usage, + do: "force_checkpoint [--vhost-pattern ] [--queue-pattern ]" + + def usage_additional do + [ + ["--queue-pattern ", "regular expression to match queue names"], + ["--vhost-pattern ", "regular expression to match virtual host names"], + ["--errors-only", "only list queues which reported an error"] + ] + end + + def usage_doc_guides() do + [ + DocGuide.quorum_queues() + ] + end + + def help_section, do: :replication + + def description, + do: "Forces checkpoints for all matching quorum queues" + + def banner([], _) do + "Forcing checkpoint for all matching quorum queues..." + end +end diff --git a/deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs b/deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs new file mode 100644 index 000000000000..67c2ac38552e --- /dev/null +++ b/deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs @@ -0,0 +1,64 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + {:ok, + opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || 30000, + vhost_pattern: ".*", + queue_pattern: ".*", + errors_only: false + }} + end + + test "merge_defaults: defaults to reporting complete results" do + assert @command.merge_defaults([], %{}) == + {[], + %{ + vhost_pattern: ".*", + queue_pattern: ".*", + errors_only: false + }} + end + + test "validate: accepts no positional arguments" do + assert @command.validate([], %{}) == :ok + end + + test "validate: any positional arguments fail validation" do + assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args} + + assert @command.validate(["quorum-queue-a", "two"], %{}) == + {:validation_failure, :too_many_args} + + assert @command.validate(["quorum-queue-a", "two", "three"], %{}) == + {:validation_failure, :too_many_args} + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc", context do + assert match?( + {:badrpc, _}, + @command.run( + [], + Map.merge(context[:opts], %{node: :jake@thedog}) + ) + ) + end +end