-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Add force checkpoint functions for quorum queues and command line tool #13175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,8 @@ | |
force_vhost_queues_shrink_member_to_current_member/1, | ||
force_all_queues_shrink_member_to_current_member/0]). | ||
|
||
-export([force_checkpoint/2, force_checkpoint_on_queue/1]). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put each export on a new line. |
||
|
||
%% for backwards compatibility | ||
-export([file_handle_leader_reservation/1, | ||
file_handle_other_reservation/0, | ||
|
@@ -138,6 +140,7 @@ | |
-define(RPC_TIMEOUT, 1000). | ||
-define(START_CLUSTER_TIMEOUT, 5000). | ||
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT | ||
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000). | ||
-define(TICK_INTERVAL, 5000). %% the ra server tick time | ||
-define(DELETE_TIMEOUT, 5000). | ||
-define(MEMBER_CHANGE_TIMEOUT, 20_000). | ||
|
@@ -2084,6 +2087,38 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis | |
rabbit_log:warning("Shrinking finished"), | ||
ok. | ||
|
||
force_checkpoint_on_queue(QName) -> | ||
Node = node(), | ||
QNameFmt = rabbit_misc:rs(QName), | ||
case rabbit_db_queue:get_durable(QName) of | ||
{ok, Q} when ?amqqueue_is_classic(Q) -> | ||
{error, classic_queue_not_supported}; | ||
{ok, Q} when ?amqqueue_is_quorum(Q) -> | ||
{RaName, _} = amqqueue:get_pid(Q), | ||
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]), | ||
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line is too long, keep to ~80 chars max There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is also no need to use RPC for this. |
||
{ok, _Q} -> | ||
{error, not_quorum_queue}; | ||
{error, _} = E -> | ||
E | ||
end. | ||
|
||
force_checkpoint(VhostSpec, QueueSpec) -> | ||
[begin | ||
QName = amqqueue:get_name(Q), | ||
case force_checkpoint_on_queue(QName) of | ||
ok -> | ||
{QName, {ok}}; | ||
{error, Err} -> | ||
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w", | ||
[rabbit_misc:rs(QName), Err]), | ||
{QName, {error, Err}} | ||
end | ||
end | ||
|| Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE), | ||
is_match(amqqueue:get_vhost(Q), VhostSpec) | ||
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]. | ||
|
||
is_minority(All, Up) -> | ||
MinQuorum = length(All) div 2 + 1, | ||
length(Up) < MinQuorum. | ||
|
@@ -2144,4 +2179,3 @@ file_handle_other_reservation() -> | |
|
||
file_handle_release_reservation() -> | ||
ok. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
-include_lib("eunit/include/eunit.hrl"). | ||
-include_lib("amqp_client/include/amqp_client.hrl"). | ||
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). | ||
-include_lib("rabbit/src/rabbit_fifo.hrl"). | ||
|
||
-import(queue_utils, [wait_for_messages_ready/3, | ||
wait_for_messages_pending_ack/3, | ||
|
@@ -97,6 +98,8 @@ groups() -> | |
force_shrink_member_to_current_member, | ||
force_all_queues_shrink_member_to_current_member, | ||
force_vhost_queues_shrink_member_to_current_member, | ||
force_checkpoint_on_queue, | ||
force_checkpoint, | ||
policy_repair, | ||
gh_12635, | ||
replica_states | ||
|
@@ -1308,6 +1311,63 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> | |
?assertEqual(3, length(Nodes0)) | ||
end || Q <- QQs, VHost <- VHosts]. | ||
|
||
force_checkpoint_on_queue(Config) -> | ||
[Server0, _Server1, _Server2] = | ||
rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), | ||
QQ = ?config(queue_name, Config), | ||
RaName = ra_name(QQ), | ||
QName = rabbit_misc:r(<<"/">>, queue, QQ), | ||
|
||
?assertEqual({'queue.declare_ok', QQ, 0, 0}, | ||
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), | ||
|
||
rabbit_ct_client_helpers:publish(Ch, QQ, 3), | ||
wait_for_messages_ready([Server0], RaName, 3), | ||
|
||
% Wait for initial checkpoint and make sure it's 0; checkpoint hasn't been triggered yet. | ||
rabbit_ct_helpers:await_condition( | ||
fun() -> | ||
{ok, #{aux := Aux}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not realise that It would be better if you wanted to query this to add a clause to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking into adding a clause to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @aaron-seo is this still something you are working on or should the core team take over? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apologies for the delay, please feel free to take over as needed; been pulled into other things currently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually there is no need to modify Ra. just match against |
||
#aux_v3{last_checkpoint = #checkpoint{index = Index}} = Aux, | ||
Index =:= 0 | ||
end), | ||
|
||
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, | ||
force_checkpoint_on_queue, [QName]), | ||
|
||
% Wait for initial checkpoint and make sure it's not 0 | ||
rabbit_ct_helpers:await_condition( | ||
fun() -> | ||
{ok, #{aux := Aux}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), | ||
#aux_v3{last_checkpoint = #checkpoint{index = Index}} = Aux, | ||
Index =/= 0 | ||
end). | ||
|
||
force_checkpoint(Config) -> | ||
[Server0, _Server1, _Server2] = | ||
rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), | ||
QQ = ?config(queue_name, Config), | ||
QQName = rabbit_misc:r(<<"/">>, queue, QQ), | ||
CQ = <<"force_checkpoint_cq">>, | ||
RaName = ra_name(QQ), | ||
|
||
?assertEqual({'queue.declare_ok', QQ, 0, 0}, | ||
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), | ||
|
||
?assertEqual({'queue.declare_ok', CQ, 0, 0}, | ||
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])), | ||
|
||
rabbit_ct_client_helpers:publish(Ch, QQ, 3), | ||
wait_for_messages_ready([Server0], RaName, 3), | ||
|
||
ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, | ||
force_checkpoint, [<<".*">>, <<".*">>]), | ||
ExpectedRes = [{QQName, {ok}}], | ||
|
||
% Result should only have quorum queue | ||
?assertEqual(ExpectedRes, ForceCheckpointRes). | ||
|
||
% Tests that, if the process of a QQ is dead in the moment of declaring a policy | ||
% that affects such queue, when the process is made available again, the policy | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
## This Source Code Form is subject to the terms of the Mozilla Public | ||
## License, v. 2.0. If a copy of the MPL was not distributed with this | ||
## file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
## | ||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. | ||
|
||
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do | ||
alias RabbitMQ.CLI.Core.{DocGuide} | ||
|
||
@behaviour RabbitMQ.CLI.CommandBehaviour | ||
|
||
defp default_opts, | ||
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false} | ||
|
||
def switches(), | ||
do: [ | ||
vhost_pattern: :string, | ||
queue_pattern: :string, | ||
errors_only: :boolean | ||
] | ||
|
||
def merge_defaults(args, opts) do | ||
{args, Map.merge(default_opts(), opts)} | ||
end | ||
|
||
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning | ||
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments | ||
|
||
def run([], %{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The returned value can easily be formatted as JSON but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For tabular-looking data (so, lists of maps), yes, you can at least try |
||
node: node_name, | ||
vhost_pattern: vhost_pat, | ||
queue_pattern: queue_pat, | ||
errors_only: errors_only | ||
}) do | ||
args = [vhost_pat, queue_pat] | ||
|
||
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do | ||
{:badrpc, _} = error -> | ||
error | ||
|
||
results when errors_only -> | ||
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results, | ||
do: [ | ||
{:vhost, vhost}, | ||
{:name, name}, | ||
{:result, res} | ||
] | ||
|
||
results -> | ||
for {{:resource, vhost, _kind, name}, res} <- results, | ||
do: [ | ||
{:vhost, vhost}, | ||
{:name, name}, | ||
{:result, res} | ||
] | ||
end | ||
end | ||
|
||
use RabbitMQ.CLI.DefaultOutput | ||
|
||
def formatter(), do: RabbitMQ.CLI.Formatters.Table | ||
|
||
def usage, | ||
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]" | ||
|
||
def usage_additional do | ||
[ | ||
["--queue-pattern <pattern>", "regular expression to match queue names"], | ||
["--vhost-pattern <pattern>", "regular expression to match virtual host names"], | ||
["--errors-only", "only list queues which reported an error"] | ||
] | ||
end | ||
|
||
def usage_doc_guides() do | ||
[ | ||
DocGuide.quorum_queues() | ||
] | ||
end | ||
|
||
def help_section, do: :replication | ||
|
||
def description, | ||
do: "Forces checkpoints for all matching quorum queues" | ||
|
||
def banner([], _) do | ||
"Forcing checkpoint for all matching quorum queues..." | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
## This Source Code Form is subject to the terms of the Mozilla Public | ||
## License, v. 2.0. If a copy of the MPL was not distributed with this | ||
## file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
## | ||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. | ||
|
||
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do | ||
use ExUnit.Case, async: false | ||
import TestHelper | ||
|
||
@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand | ||
|
||
setup_all do | ||
RabbitMQ.CLI.Core.Distribution.start() | ||
|
||
:ok | ||
end | ||
|
||
setup context do | ||
{:ok, | ||
opts: %{ | ||
node: get_rabbit_hostname(), | ||
timeout: context[:test_timeout] || 30000, | ||
vhost_pattern: ".*", | ||
queue_pattern: ".*", | ||
errors_only: false | ||
}} | ||
end | ||
|
||
test "merge_defaults: defaults to reporting complete results" do | ||
assert @command.merge_defaults([], %{}) == | ||
{[], | ||
%{ | ||
vhost_pattern: ".*", | ||
queue_pattern: ".*", | ||
errors_only: false | ||
}} | ||
end | ||
|
||
test "validate: accepts no positional arguments" do | ||
assert @command.validate([], %{}) == :ok | ||
end | ||
|
||
test "validate: any positional arguments fail validation" do | ||
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args} | ||
|
||
assert @command.validate(["quorum-queue-a", "two"], %{}) == | ||
{:validation_failure, :too_many_args} | ||
|
||
assert @command.validate(["quorum-queue-a", "two", "three"], %{}) == | ||
{:validation_failure, :too_many_args} | ||
end | ||
|
||
@tag test_timeout: 3000 | ||
test "run: targeting an unreachable node throws a badrpc", context do | ||
assert match?( | ||
{:badrpc, _}, | ||
@command.run( | ||
[], | ||
Map.merge(context[:opts], %{node: :jake@thedog}) | ||
) | ||
) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we implement the aux overview query as suggested below there shoudl be no need to move these records.