-
Notifications
You must be signed in to change notification settings - Fork 3.9k
For 4.1.x, by @aaron-seo: introduce a command that would force QQs to take a checkpoint and truncate its segments #13548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bda3754
df916d5
7e56416
3b25ba7
3e57efd
cff685a
1a9f5ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,8 @@ | |
force_vhost_queues_shrink_member_to_current_member/1, | ||
force_all_queues_shrink_member_to_current_member/0]). | ||
|
||
-export([force_checkpoint/2, force_checkpoint_on_queue/1]). | ||
|
||
%% for backwards compatibility | ||
-export([file_handle_leader_reservation/1, | ||
file_handle_other_reservation/0, | ||
|
@@ -141,6 +143,7 @@ | |
-define(RPC_TIMEOUT, 1000). | ||
-define(START_CLUSTER_TIMEOUT, 5000). | ||
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT | ||
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000). | ||
-define(TICK_INTERVAL, 5000). %% the ra server tick time | ||
-define(DELETE_TIMEOUT, 5000). | ||
-define(MEMBER_CHANGE_TIMEOUT, 20_000). | ||
|
@@ -2089,6 +2092,38 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis | |
rabbit_log:warning("Shrinking finished"), | ||
ok. | ||
|
||
force_checkpoint_on_queue(QName) -> | ||
Node = node(), | ||
QNameFmt = rabbit_misc:rs(QName), | ||
case rabbit_db_queue:get_durable(QName) of | ||
{ok, Q} when ?amqqueue_is_classic(Q) -> | ||
{error, classic_queue_not_supported}; | ||
{ok, Q} when ?amqqueue_is_quorum(Q) -> | ||
{RaName, _} = amqqueue:get_pid(Q), | ||
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]), | ||
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT); | ||
{ok, _Q} -> | ||
{error, not_quorum_queue}; | ||
{error, _} = E -> | ||
E | ||
end. | ||
|
||
force_checkpoint(VhostSpec, QueueSpec) -> | ||
[begin | ||
QName = amqqueue:get_name(Q), | ||
case force_checkpoint_on_queue(QName) of | ||
ok -> | ||
{QName, {ok}}; | ||
{error, Err} -> | ||
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w", | ||
[rabbit_misc:rs(QName), Err]), | ||
{QName, {error, Err}} | ||
end | ||
end | ||
|| Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this will include queues that do not have any members on the current node but the |
||
is_match(amqqueue:get_vhost(Q), VhostSpec) | ||
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]. | ||
|
||
is_minority(All, Up) -> | ||
MinQuorum = length(All) div 2 + 1, | ||
length(Up) < MinQuorum. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
## This Source Code Form is subject to the terms of the Mozilla Public | ||
## License, v. 2.0. If a copy of the MPL was not distributed with this | ||
## file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
## | ||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. | ||
|
||
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do | ||
alias RabbitMQ.CLI.Core.{DocGuide} | ||
|
||
@behaviour RabbitMQ.CLI.CommandBehaviour | ||
|
||
defp default_opts, | ||
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false} | ||
|
||
def switches(), | ||
do: [ | ||
vhost_pattern: :string, | ||
queue_pattern: :string, | ||
errors_only: :boolean | ||
] | ||
|
||
def merge_defaults(args, opts) do | ||
{args, Map.merge(default_opts(), opts)} | ||
end | ||
|
||
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning | ||
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments | ||
|
||
def run([], %{ | ||
node: node_name, | ||
vhost_pattern: vhost_pat, | ||
queue_pattern: queue_pat, | ||
errors_only: errors_only | ||
}) do | ||
args = [vhost_pat, queue_pat] | ||
|
||
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do | ||
{:badrpc, _} = error -> | ||
error | ||
|
||
results when errors_only -> | ||
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results, | ||
do: [ | ||
{:vhost, vhost}, | ||
{:name, name}, | ||
{:result, res} | ||
] | ||
|
||
results -> | ||
for {{:resource, vhost, _kind, name}, res} <- results, | ||
do: [ | ||
{:vhost, vhost}, | ||
{:name, name}, | ||
{:result, res} | ||
] | ||
end | ||
end | ||
|
||
use RabbitMQ.CLI.DefaultOutput | ||
|
||
def formatter(), do: RabbitMQ.CLI.Formatters.Table | ||
|
||
def usage, | ||
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]" | ||
|
||
def usage_additional do | ||
[ | ||
["--queue-pattern <pattern>", "regular expression to match queue names"], | ||
["--vhost-pattern <pattern>", "regular expression to match virtual host names"], | ||
["--errors-only", "only list queues which reported an error"] | ||
] | ||
end | ||
|
||
def usage_doc_guides() do | ||
[ | ||
DocGuide.quorum_queues() | ||
] | ||
end | ||
|
||
def help_section, do: :replication | ||
|
||
def description, | ||
do: "Forces checkpoints for all matching quorum queues" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no it will only force a checkpoint on the local member of the matching queues, if there is a local member. |
||
|
||
def banner([], _) do | ||
"Forcing checkpoint for all matching quorum queues..." | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
## This Source Code Form is subject to the terms of the Mozilla Public | ||
## License, v. 2.0. If a copy of the MPL was not distributed with this | ||
## file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
## | ||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. | ||
|
||
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do | ||
use ExUnit.Case, async: false | ||
import TestHelper | ||
|
||
@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand | ||
|
||
setup_all do | ||
RabbitMQ.CLI.Core.Distribution.start() | ||
|
||
:ok | ||
end | ||
|
||
setup context do | ||
{:ok, | ||
opts: %{ | ||
node: get_rabbit_hostname(), | ||
timeout: context[:test_timeout] || 30000, | ||
vhost_pattern: ".*", | ||
queue_pattern: ".*", | ||
errors_only: false | ||
}} | ||
end | ||
|
||
test "merge_defaults: defaults to reporting complete results" do | ||
assert @command.merge_defaults([], %{}) == | ||
{[], | ||
%{ | ||
vhost_pattern: ".*", | ||
queue_pattern: ".*", | ||
errors_only: false | ||
}} | ||
end | ||
|
||
test "validate: accepts no positional arguments" do | ||
assert @command.validate([], %{}) == :ok | ||
end | ||
|
||
test "validate: any positional arguments fail validation" do | ||
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args} | ||
|
||
assert @command.validate(["quorum-queue-a", "two"], %{}) == | ||
{:validation_failure, :too_many_args} | ||
|
||
assert @command.validate(["quorum-queue-a", "two", "three"], %{}) == | ||
{:validation_failure, :too_many_args} | ||
end | ||
|
||
@tag test_timeout: 3000 | ||
test "run: targeting an unreachable node throws a badrpc", context do | ||
assert match?( | ||
{:badrpc, _}, | ||
@command.run( | ||
[], | ||
Map.merge(context[:opts], %{node: :jake@thedog}) | ||
) | ||
) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no need to do an rpc here as this function always addresses the quorum queue member on the node it runs on. is this the intent? to force a checkpoint on a single member if there is one locally (there are no checks to ensure that).