Skip to content

For 4.1.x, by @aaron-seo: introduce a command that would force QQs to take a checkpoint and truncate its segments #13548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
35 changes: 35 additions & 0 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
force_vhost_queues_shrink_member_to_current_member/1,
force_all_queues_shrink_member_to_current_member/0]).

-export([force_checkpoint/2, force_checkpoint_on_queue/1]).

%% for backwards compatibility
-export([file_handle_leader_reservation/1,
file_handle_other_reservation/0,
Expand Down Expand Up @@ -141,6 +143,7 @@
-define(RPC_TIMEOUT, 1000).
-define(START_CLUSTER_TIMEOUT, 5000).
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
-define(TICK_INTERVAL, 5000). %% the ra server tick time
-define(DELETE_TIMEOUT, 5000).
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
Expand Down Expand Up @@ -2089,6 +2092,38 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
rabbit_log:warning("Shrinking finished"),
ok.

force_checkpoint_on_queue(QName) ->
Node = node(),
QNameFmt = rabbit_misc:rs(QName),
case rabbit_db_queue:get_durable(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q),
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no need to do an rpc here as this function always addresses the quorum queue member on the node it runs on. is this the intent? to force a checkpoint on a single member if there is one locally (there are no checks to ensure that).

{ok, _Q} ->
{error, not_quorum_queue};
{error, _} = E ->
E
end.

force_checkpoint(VhostSpec, QueueSpec) ->
[begin
QName = amqqueue:get_name(Q),
case force_checkpoint_on_queue(QName) of
ok ->
{QName, {ok}};
{error, Err} ->
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w",
[rabbit_misc:rs(QName), Err]),
{QName, {error, Err}}
end
end
|| Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will include queues that do not have any members on the current node but the force_checkpoint_on_queue will only ask any local member to force a checkpoint.

is_match(amqqueue:get_vhost(Q), VhostSpec)
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].

is_minority(All, Up) ->
MinQuorum = length(All) div 2 + 1,
length(Up) < MinQuorum.
Expand Down
67 changes: 67 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("rabbit/src/rabbit_fifo.hrl").

-import(queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
Expand Down Expand Up @@ -97,6 +98,8 @@ groups() ->
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member,
force_checkpoint_on_queue,
force_checkpoint,
policy_repair,
gh_12635,
replica_states
Expand Down Expand Up @@ -1309,6 +1312,70 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].

force_checkpoint_on_queue(Config) ->
[Server0, _Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
RaName = ra_name(QQ),
QName = rabbit_misc:r(<<"/">>, queue, QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

N = 20_000,
rabbit_ct_client_helpers:publish(Ch, QQ, N),
wait_for_messages_ready([Server0], RaName, N),

%% The state before any checkpoints
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
#{log := #{latest_checkpoint_index := LCI}} = State,
LCI =:= undefined
end),

{ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]),

%% wait for longer than ?CHECK_MIN_INTERVAL_MS ms
timer:sleep(?CHECK_MIN_INTERVAL_MS + 1000),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint_on_queue, [QName]),

%% Wait for initial checkpoint and make sure it's not 0
rabbit_ct_helpers:await_condition(
fun() ->
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
#{log := #{latest_checkpoint_index := LCI}} = State,
(LCI =/= undefined) andalso (LCI >= N)
end).

force_checkpoint(Config) ->
[Server0, _Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
QQName = rabbit_misc:r(<<"/">>, queue, QQ),
CQ = <<"force_checkpoint_cq">>,
RaName = ra_name(QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

?assertEqual({'queue.declare_ok', CQ, 0, 0},
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])),

rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3),

ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint, [<<".*">>, <<".*">>]),
ExpectedRes = [{QQName, {ok}}],

% Result should only have quorum queue
?assertEqual(ExpectedRes, ForceCheckpointRes).

% Tests that, if the process of a QQ is dead in the moment of declaring a policy
% that affects such queue, when the process is made available again, the policy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
alias RabbitMQ.CLI.Core.{DocGuide}

@behaviour RabbitMQ.CLI.CommandBehaviour

defp default_opts,
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}

def switches(),
do: [
vhost_pattern: :string,
queue_pattern: :string,
errors_only: :boolean
]

def merge_defaults(args, opts) do
{args, Map.merge(default_opts(), opts)}
end

use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments

def run([], %{
node: node_name,
vhost_pattern: vhost_pat,
queue_pattern: queue_pat,
errors_only: errors_only
}) do
args = [vhost_pat, queue_pat]

case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
{:badrpc, _} = error ->
error

results when errors_only ->
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, res}
]

results ->
for {{:resource, vhost, _kind, name}, res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, res}
]
end
end

use RabbitMQ.CLI.DefaultOutput

def formatter(), do: RabbitMQ.CLI.Formatters.Table

def usage,
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"

def usage_additional do
[
["--queue-pattern <pattern>", "regular expression to match queue names"],
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
["--errors-only", "only list queues which reported an error"]
]
end

def usage_doc_guides() do
[
DocGuide.quorum_queues()
]
end

def help_section, do: :replication

def description,
do: "Forces checkpoints for all matching quorum queues"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it will only force a checkpoint on the local member of the matching queues, if there is a local member.


def banner([], _) do
"Forcing checkpoint for all matching quorum queues..."
end
end
64 changes: 64 additions & 0 deletions deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do
use ExUnit.Case, async: false
import TestHelper

@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand

setup_all do
RabbitMQ.CLI.Core.Distribution.start()

:ok
end

setup context do
{:ok,
opts: %{
node: get_rabbit_hostname(),
timeout: context[:test_timeout] || 30000,
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "merge_defaults: defaults to reporting complete results" do
assert @command.merge_defaults([], %{}) ==
{[],
%{
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "validate: accepts no positional arguments" do
assert @command.validate([], %{}) == :ok
end

test "validate: any positional arguments fail validation" do
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two"], %{}) ==
{:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
{:validation_failure, :too_many_args}
end

@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc", context do
assert match?(
{:badrpc, _},
@command.run(
[],
Map.merge(context[:opts], %{node: :jake@thedog})
)
)
end
end
Loading