Skip to content

Add force checkpoint functions for quorum queues and command line tool #13175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -925,26 +925,6 @@ which_module(3) -> rabbit_fifo_v3;
which_module(4) -> ?MODULE;
which_module(5) -> ?MODULE.

-define(AUX, aux_v3).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we implement the aux overview query as suggested below there shoudl be no need to move these records.


-record(checkpoint, {index :: ra:index(),
timestamp :: milliseconds(),
smallest_index :: undefined | ra:index(),
messages_total :: non_neg_integer(),
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
unused_1 = ?NIL}).
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(),
capacity :: term(),
gc = #aux_gc{} :: #aux_gc{}}).
-record(?AUX, {name :: atom(),
last_decorators_state :: term(),
capacity :: term(),
gc = #aux_gc{} :: #aux_gc{},
tick_pid :: undefined | pid(),
cache = #{} :: map(),
last_checkpoint :: #checkpoint{}}).

init_aux(Name) when is_atom(Name) ->
%% TODO: catch specific exception throw if table already exists
ok = ra_machine_ets:create_table(rabbit_fifo_usage,
Expand Down
20 changes: 20 additions & 0 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,23 @@
msg_ttl => non_neg_integer(),
created => non_neg_integer()
}.

-define(AUX, aux_v3).

-record(checkpoint, {index :: ra:index(),
timestamp :: milliseconds(),
smallest_index :: undefined | ra:index(),
messages_total :: non_neg_integer(),
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
unused_1 = ?NIL}).
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(),
capacity :: term(),
gc = #aux_gc{} :: #aux_gc{}}).
-record(?AUX, {name :: atom(),
last_decorators_state :: term(),
capacity :: term(),
gc = #aux_gc{} :: #aux_gc{},
tick_pid :: undefined | pid(),
cache = #{} :: map(),
last_checkpoint :: #checkpoint{}}).
36 changes: 35 additions & 1 deletion deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
force_vhost_queues_shrink_member_to_current_member/1,
force_all_queues_shrink_member_to_current_member/0]).

-export([force_checkpoint/2, force_checkpoint_on_queue/1]).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put each export on a new line.


%% for backwards compatibility
-export([file_handle_leader_reservation/1,
file_handle_other_reservation/0,
Expand Down Expand Up @@ -138,6 +140,7 @@
-define(RPC_TIMEOUT, 1000).
-define(START_CLUSTER_TIMEOUT, 5000).
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
-define(TICK_INTERVAL, 5000). %% the ra server tick time
-define(DELETE_TIMEOUT, 5000).
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
Expand Down Expand Up @@ -2084,6 +2087,38 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
rabbit_log:warning("Shrinking finished"),
ok.

force_checkpoint_on_queue(QName) ->
Node = node(),
QNameFmt = rabbit_misc:rs(QName),
case rabbit_db_queue:get_durable(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q),
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line is too long, keep to ~80 chars max

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also no need to use RPC for this.
Also this command will fail during mixed versions upgrades but perhaps this does not matter too much.

{ok, _Q} ->
{error, not_quorum_queue};
{error, _} = E ->
E
end.

force_checkpoint(VhostSpec, QueueSpec) ->
[begin
QName = amqqueue:get_name(Q),
case force_checkpoint_on_queue(QName) of
ok ->
{QName, {ok}};
{error, Err} ->
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w",
[rabbit_misc:rs(QName), Err]),
{QName, {error, Err}}
end
end
|| Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE),
is_match(amqqueue:get_vhost(Q), VhostSpec)
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].

is_minority(All, Up) ->
MinQuorum = length(All) div 2 + 1,
length(Up) < MinQuorum.
Expand Down Expand Up @@ -2144,4 +2179,3 @@ file_handle_other_reservation() ->

file_handle_release_reservation() ->
ok.

60 changes: 60 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("rabbit/src/rabbit_fifo.hrl").

-import(queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
Expand Down Expand Up @@ -97,6 +98,8 @@ groups() ->
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member,
force_checkpoint_on_queue,
force_checkpoint,
policy_repair,
gh_12635,
replica_states
Expand Down Expand Up @@ -1308,6 +1311,63 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].

force_checkpoint_on_queue(Config) ->
[Server0, _Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
RaName = ra_name(QQ),
QName = rabbit_misc:r(<<"/">>, queue, QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3),

% Wait for initial checkpoint and make sure it's 0; checkpoint hasn't been triggered yet.
rabbit_ct_helpers:await_condition(
fun() ->
{ok, #{aux := Aux}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not realise that ra:member_overview/1 returns the aux state as it is. That is my mistake and this should be changed in Ra to provide an overview not the full state. This isn't great for future compatibility.

It would be better if you wanted to query this to add a clause to handle_aux/4 to handle an overview message that returns a overview of the aux state. Then you don't have to move the records which I think may be a potential future maintenance issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into adding a clause to handle_aux/4 as suggested

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aaron-seo is this still something you are working on or should the core team take over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delay, please feel free to take over as needed; been pulled into other things currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually there is no need to modify Ra. just match against #{log := #{latest_checkopoint_index := _}} instead.

#aux_v3{last_checkpoint = #checkpoint{index = Index}} = Aux,
Index =:= 0
end),

rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint_on_queue, [QName]),

% Wait for initial checkpoint and make sure it's not 0
rabbit_ct_helpers:await_condition(
fun() ->
{ok, #{aux := Aux}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
#aux_v3{last_checkpoint = #checkpoint{index = Index}} = Aux,
Index =/= 0
end).

force_checkpoint(Config) ->
[Server0, _Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
QQName = rabbit_misc:r(<<"/">>, queue, QQ),
CQ = <<"force_checkpoint_cq">>,
RaName = ra_name(QQ),

?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

?assertEqual({'queue.declare_ok', CQ, 0, 0},
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])),

rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3),

ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint, [<<".*">>, <<".*">>]),
ExpectedRes = [{QQName, {ok}}],

% Result should only have quorum queue
?assertEqual(ExpectedRes, ForceCheckpointRes).

% Tests that, if the process of a QQ is dead in the moment of declaring a policy
% that affects such queue, when the process is made available again, the policy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
alias RabbitMQ.CLI.Core.{DocGuide}

@behaviour RabbitMQ.CLI.CommandBehaviour

defp default_opts,
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}

def switches(),
do: [
vhost_pattern: :string,
queue_pattern: :string,
errors_only: :boolean
]

def merge_defaults(args, opts) do
{args, Map.merge(default_opts(), opts)}
end

use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments

def run([], %{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned value can easily be formatted as JSON but formatter: "json" is not supported in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems formatter: "json" is supported (presumably via RabbitMQ.CLI.DefaultOutput
?)

aaronseo: ~/workplace/rabbitMq/rabbitmq-server/sbin $ ./rabbitmq-queues -n force_checkpoint2 force_checkpoint --formatter json
[
{"vhost":"/","name":"qq2","result":"ok"}
,{"vhost":"/","name":"qwe","result":"ok"}
,{"vhost":"/","name":"testqq","result":"ok"}
...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tabular-looking data (so, lists of maps), yes, you can at least try DefaultOutput and see if there may be any reasons to override what it defines.

node: node_name,
vhost_pattern: vhost_pat,
queue_pattern: queue_pat,
errors_only: errors_only
}) do
args = [vhost_pat, queue_pat]

case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
{:badrpc, _} = error ->
error

results when errors_only ->
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, res}
]

results ->
for {{:resource, vhost, _kind, name}, res} <- results,
do: [
{:vhost, vhost},
{:name, name},
{:result, res}
]
end
end

use RabbitMQ.CLI.DefaultOutput

def formatter(), do: RabbitMQ.CLI.Formatters.Table

def usage,
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"

def usage_additional do
[
["--queue-pattern <pattern>", "regular expression to match queue names"],
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
["--errors-only", "only list queues which reported an error"]
]
end

def usage_doc_guides() do
[
DocGuide.quorum_queues()
]
end

def help_section, do: :replication

def description,
do: "Forces checkpoints for all matching quorum queues"

def banner([], _) do
"Forcing checkpoint for all matching quorum queues..."
end
end
64 changes: 64 additions & 0 deletions deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do
use ExUnit.Case, async: false
import TestHelper

@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand

setup_all do
RabbitMQ.CLI.Core.Distribution.start()

:ok
end

setup context do
{:ok,
opts: %{
node: get_rabbit_hostname(),
timeout: context[:test_timeout] || 30000,
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "merge_defaults: defaults to reporting complete results" do
assert @command.merge_defaults([], %{}) ==
{[],
%{
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false
}}
end

test "validate: accepts no positional arguments" do
assert @command.validate([], %{}) == :ok
end

test "validate: any positional arguments fail validation" do
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two"], %{}) ==
{:validation_failure, :too_many_args}

assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
{:validation_failure, :too_many_args}
end

@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc", context do
assert match?(
{:badrpc, _},
@command.run(
[],
Map.merge(context[:opts], %{node: :jake@thedog})
)
)
end
end
Loading