Skip to content

Commit 067b038

Browse files
authored
Merge pull request #12268 from rabbitmq/mergify/bp/v4.0.x/pr-11225
Reorganize data in the Khepri store (backport #11225)
2 parents b73fa52 + adfcb5e commit 067b038

17 files changed

+101
-87
lines changed

deps/rabbit/app.bzl

+1
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ def all_srcs(name = "all_srcs"):
525525
"include/amqqueue.hrl",
526526
"include/amqqueue_v2.hrl",
527527
"include/internal_user.hrl",
528+
"include/khepri.hrl",
528529
"include/mc.hrl",
529530
"include/rabbit_amqp.hrl",
530531
"include/rabbit_global_counters.hrl",

deps/rabbit/include/khepri.hrl

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term “Broadcom”
6+
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
7+
%%
8+
9+
-define(KHEPRI_ROOT_PATH, [rabbitmq]).

deps/rabbit/src/rabbit_db_binding.erl

+10-13
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@
3838

3939
-export([
4040
khepri_route_path/1, khepri_route_path/5,
41-
khepri_route_path_to_args/1,
42-
khepri_route_exchange_path/1
41+
khepri_route_path_to_args/1
4342
]).
4443

4544
%% Recovery is only needed for transient entities. Once mnesia is removed, these
@@ -202,8 +201,6 @@ create_in_khepri(#binding{source = SrcName,
202201
MaybeSerial = rabbit_exchange:serialise_events(Src),
203202
Serial = rabbit_khepri:transaction(
204203
fun() ->
205-
ExchangePath = khepri_route_exchange_path(SrcName),
206-
ok = khepri_tx:put(ExchangePath, #{type => Src#exchange.type}),
207204
case khepri_tx:get(RoutePath) of
208205
{ok, Set} ->
209206
case sets:is_element(Binding, Set) of
@@ -1010,18 +1007,21 @@ clear_in_khepri() ->
10101007
%% --------------------------------------------------------------
10111008

10121009
khepri_route_path(
1013-
#binding{source = #resource{virtual_host = VHost, name = SrcName},
1014-
destination = #resource{kind = Kind, name = DstName},
1010+
#binding{source = #resource{virtual_host = VHost,
1011+
kind = exchange,
1012+
name = SrcName},
1013+
destination = #resource{virtual_host = VHost,
1014+
kind = Kind,
1015+
name = DstName},
10151016
key = RoutingKey}) ->
10161017
khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey).
10171018

10181019
khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey)
1019-
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
1020-
?IS_KHEPRI_PATH_CONDITION(SrcName) andalso
1021-
?IS_KHEPRI_PATH_CONDITION(Kind) andalso
1020+
when ?IS_KHEPRI_PATH_CONDITION(Kind) andalso
10221021
?IS_KHEPRI_PATH_CONDITION(DstName) andalso
10231022
?IS_KHEPRI_PATH_CONDITION(RoutingKey) ->
1024-
[?MODULE, routes, VHost, SrcName, Kind, DstName, RoutingKey].
1023+
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, SrcName),
1024+
ExchangePath ++ [bindings, Kind, DstName, RoutingKey].
10251025

10261026
khepri_route_path_to_args(Path) ->
10271027
Pattern = khepri_route_path(
@@ -1047,9 +1047,6 @@ khepri_route_path_to_args(
10471047
'$RoutingKey' := RoutingKey}) ->
10481048
{VHost, SrcName, Kind, DstName, RoutingKey}.
10491049

1050-
khepri_route_exchange_path(#resource{virtual_host = VHost, name = SrcName}) ->
1051-
[?MODULE, routes, VHost, SrcName].
1052-
10531050
%% --------------------------------------------------------------
10541051
%% Internal
10551052
%% --------------------------------------------------------------

deps/rabbit/src/rabbit_db_binding_m2k_converter.erl

+1-7
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) ->
4545
%% @private
4646

4747
copy_to_khepri(rabbit_route = Table,
48-
#route{binding = #binding{source = XName} = Binding},
48+
#route{binding = #binding{} = Binding},
4949
State) ->
5050
?LOG_DEBUG(
5151
"Mnesia->Khepri data copy: [~0p] key: ~0p",
@@ -55,18 +55,12 @@ copy_to_khepri(rabbit_route = Table,
5555
rabbit_db_m2k_converter:with_correlation_id(
5656
fun(CorrId) ->
5757
Extra = #{async => CorrId},
58-
XPath = rabbit_db_binding:khepri_route_exchange_path(XName),
5958
?LOG_DEBUG(
6059
"Mnesia->Khepri data copy: [~0p] path: ~0p corr: ~0p",
6160
[Table, Path, CorrId],
6261
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
6362
rabbit_khepri:transaction(
6463
fun() ->
65-
%% Store the exchange's type in the exchange name
66-
%% branch of the tree.
67-
[#exchange{type = XType}] =
68-
rabbit_db_exchange:get_in_khepri_tx(XName),
69-
ok = khepri_tx:put(XPath, #{type => XType}),
7064
%% Add the binding to the set at the binding's
7165
%% path.
7266
Set = case khepri_tx:get(Path) of

deps/rabbit/src/rabbit_db_exchange.erl

+8-10
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
-include_lib("khepri/include/khepri.hrl").
1111
-include_lib("rabbit_common/include/rabbit.hrl").
1212

13+
-include("include/khepri.hrl").
14+
1315
-export([
1416
get_all/0,
1517
get_all/1,
@@ -894,15 +896,11 @@ maybe_auto_delete_in_khepri(XName, OnlyDurable) ->
894896
khepri_exchange_path(#resource{virtual_host = VHost, name = Name}) ->
895897
khepri_exchange_path(VHost, Name).
896898

897-
khepri_exchange_path(VHost, Name)
898-
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
899-
?IS_KHEPRI_PATH_CONDITION(Name) ->
900-
[?MODULE, exchanges, VHost, Name].
899+
khepri_exchange_path(VHost, Name) when ?IS_KHEPRI_PATH_CONDITION(Name) ->
900+
rabbit_db_vhost:khepri_vhost_path(VHost) ++ [exchanges, Name].
901901

902-
khepri_exchange_serial_path(#resource{virtual_host = VHost, name = Name}) ->
903-
khepri_exchange_serial_path(VHost, Name).
902+
khepri_exchange_serial_path(#resource{} = Resource) ->
903+
khepri_exchange_path(Resource) ++ [serial].
904904

905-
khepri_exchange_serial_path(VHost, Name)
906-
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
907-
?IS_KHEPRI_PATH_CONDITION(Name) ->
908-
[?MODULE, exchange_serials, VHost, Name].
905+
khepri_exchange_serial_path(VHost, Name) ->
906+
khepri_exchange_path(VHost, Name) ++ [serial].

deps/rabbit/src/rabbit_db_maintenance.erl

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
-include_lib("khepri/include/khepri.hrl").
1111
-include_lib("rabbit_common/include/rabbit.hrl").
1212

13+
-include("include/khepri.hrl").
14+
1315
-export([
1416
table_definitions/0,
1517
set/1,
@@ -168,4 +170,4 @@ get_consistent_in_khepri(Node) ->
168170
%% -------------------------------------------------------------------
169171

170172
khepri_maintenance_path(Node) when ?IS_KHEPRI_PATH_CONDITION(Node) ->
171-
[?MODULE, maintenance, Node].
173+
?KHEPRI_ROOT_PATH ++ [node_maintenance, Node].

deps/rabbit/src/rabbit_db_msup.erl

+4-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
-include_lib("khepri/include/khepri.hrl").
1111
-include("mirrored_supervisor.hrl").
1212

13+
-include("include/khepri.hrl").
14+
1315
-export([
1416
create_tables/0,
1517
table_definitions/0,
@@ -326,8 +328,8 @@ clear_in_khepri() ->
326328
khepri_mirrored_supervisor_path(Group, Id)
327329
when ?IS_KHEPRI_PATH_CONDITION(Group) andalso
328330
?IS_KHEPRI_PATH_CONDITION(Id) ->
329-
[?MODULE, mirrored_supervisor_childspec, Group, Id];
331+
?KHEPRI_ROOT_PATH ++ [mirrored_supervisors, Group, Id];
330332
khepri_mirrored_supervisor_path(Group, Id)
331333
when is_atom(Group) ->
332334
IdPath = Group:id_to_khepri_path(Id),
333-
[?MODULE, mirrored_supervisor_childspec, Group] ++ IdPath.
335+
?KHEPRI_ROOT_PATH ++ [mirrored_supervisors, Group] ++ IdPath.

deps/rabbit/src/rabbit_db_queue.erl

+2-4
Original file line numberDiff line numberDiff line change
@@ -1359,7 +1359,5 @@ list_with_possible_retry_in_khepri(Fun) ->
13591359
khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
13601360
khepri_queue_path(VHost, Name).
13611361

1362-
khepri_queue_path(VHost, Name)
1363-
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
1364-
?IS_KHEPRI_PATH_CONDITION(Name) ->
1365-
[?MODULE, queues, VHost, Name].
1362+
khepri_queue_path(VHost, Name) when ?IS_KHEPRI_PATH_CONDITION(Name) ->
1363+
rabbit_db_vhost:khepri_vhost_path(VHost) ++ [queues, Name].

deps/rabbit/src/rabbit_db_rtparams.erl

+6-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
-include_lib("khepri/include/khepri.hrl").
1111
-include_lib("rabbit_common/include/rabbit.hrl").
1212

13+
-include("include/khepri.hrl").
14+
1315
-export([set/2, set/4,
1416
get/1,
1517
get_all/0, get_all/2,
@@ -362,10 +364,10 @@ khepri_rp_path(Key) ->
362364
khepri_global_rp_path(Key).
363365

364366
khepri_global_rp_path(Key) when ?IS_KHEPRI_PATH_CONDITION(Key) ->
365-
[?MODULE, global, Key].
367+
?KHEPRI_ROOT_PATH ++ [runtime_params, Key].
366368

367369
khepri_vhost_rp_path(VHost, Component, Name)
368-
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
369-
?IS_KHEPRI_PATH_CONDITION(Component) andalso
370+
when ?IS_KHEPRI_PATH_CONDITION(Component) andalso
370371
?IS_KHEPRI_PATH_CONDITION(Name) ->
371-
[?MODULE, per_vhost, VHost, Component, Name].
372+
VHostPath = rabbit_db_vhost:khepri_vhost_path(VHost),
373+
VHostPath ++ [runtime_params, Component, Name].

deps/rabbit/src/rabbit_db_user.erl

+15-16
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
-include_lib("khepri/include/khepri.hrl").
1313
-include_lib("rabbit_common/include/rabbit.hrl").
1414

15+
-include("include/khepri.hrl").
16+
1517
-export([create/1,
1618
update/2,
1719
get/1,
@@ -489,13 +491,12 @@ set_user_permissions_in_khepri(Username, VHostName, UserPermission) ->
489491
end)), rw).
490492

491493
set_user_permissions_in_khepri_tx(Username, VHostName, UserPermission) ->
494+
%% TODO: Check user presence in a transaction.
492495
Path = khepri_user_permission_path(
493-
#if_all{conditions =
494-
[Username,
495-
#if_node_exists{exists = true}]},
496+
Username,
496497
VHostName),
497498
Extra = #{keep_while =>
498-
#{rabbit_db_vhost:khepri_vhost_path(VHostName) =>
499+
#{rabbit_db_user:khepri_user_path(Username) =>
499500
#if_node_exists{exists = true}}},
500501
Ret = khepri_tx:put(
501502
Path, UserPermission, Extra),
@@ -877,14 +878,13 @@ set_topic_permissions_in_khepri(Username, VHostName, TopicPermission) ->
877878
set_topic_permissions_in_khepri_tx(Username, VHostName, TopicPermission) ->
878879
#topic_permission{topic_permission_key =
879880
#topic_permission_key{exchange = ExchangeName}} = TopicPermission,
881+
%% TODO: Check user presence in a transaction.
880882
Path = khepri_topic_permission_path(
881-
#if_all{conditions =
882-
[Username,
883-
#if_node_exists{exists = true}]},
883+
Username,
884884
VHostName,
885885
ExchangeName),
886886
Extra = #{keep_while =>
887-
#{rabbit_db_vhost:khepri_vhost_path(VHostName) =>
887+
#{rabbit_db_user:khepri_user_path(Username) =>
888888
#if_node_exists{exists = true}}},
889889
Ret = khepri_tx:put(Path, TopicPermission, Extra),
890890
case Ret of
@@ -1094,15 +1094,14 @@ clear_in_khepri() ->
10941094

10951095
khepri_user_path(Username)
10961096
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
1097-
[?MODULE, users, Username].
1097+
?KHEPRI_ROOT_PATH ++ [users, Username].
10981098

10991099
khepri_user_permission_path(Username, VHostName)
1100-
when ?IS_KHEPRI_PATH_CONDITION(Username) andalso
1101-
?IS_KHEPRI_PATH_CONDITION(VHostName) ->
1102-
[?MODULE, users, Username, user_permissions, VHostName].
1100+
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
1101+
(rabbit_db_vhost:khepri_vhost_path(VHostName) ++
1102+
[user_permissions, Username]).
11031103

11041104
khepri_topic_permission_path(Username, VHostName, Exchange)
1105-
when ?IS_KHEPRI_PATH_CONDITION(Username) andalso
1106-
?IS_KHEPRI_PATH_CONDITION(VHostName) andalso
1107-
?IS_KHEPRI_PATH_CONDITION(Exchange) ->
1108-
[?MODULE, users, Username, topic_permissions, VHostName, Exchange].
1105+
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
1106+
(rabbit_db_exchange:khepri_exchange_path(VHostName, Exchange) ++
1107+
[user_permissions, Username]).

deps/rabbit/src/rabbit_db_user_m2k_converter.erl

+4-8
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,12 @@ copy_to_khepri(
7373
[Table, Username, VHost],
7474
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
7575
Path = rabbit_db_user:khepri_user_permission_path(
76-
#if_all{conditions =
77-
[Username,
78-
#if_node_exists{exists = true}]},
76+
Username,
7977
VHost),
8078
rabbit_db_m2k_converter:with_correlation_id(
8179
fun(CorrId) ->
8280
Extra = #{keep_while =>
83-
#{rabbit_db_vhost:khepri_vhost_path(VHost) =>
81+
#{rabbit_db_user:khepri_user_path(Username) =>
8482
#if_node_exists{exists = true}},
8583
async => CorrId},
8684
?LOG_DEBUG(
@@ -103,15 +101,13 @@ copy_to_khepri(
103101
[Table, Username, VHost],
104102
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
105103
Path = rabbit_db_user:khepri_topic_permission_path(
106-
#if_all{conditions =
107-
[Username,
108-
#if_node_exists{exists = true}]},
104+
Username,
109105
VHost,
110106
Exchange),
111107
rabbit_db_m2k_converter:with_correlation_id(
112108
fun(CorrId) ->
113109
Extra = #{keep_while =>
114-
#{rabbit_db_vhost:khepri_vhost_path(VHost) =>
110+
#{rabbit_db_user:khepri_user_path(Username) =>
115111
#if_node_exists{exists = true}},
116112
async => CorrId},
117113
?LOG_DEBUG(

deps/rabbit/src/rabbit_db_vhost.erl

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
-include_lib("rabbit_common/include/logging.hrl").
1212
-include_lib("khepri/include/khepri.hrl").
1313

14+
-include("include/khepri.hrl").
1415
-include("vhost.hrl").
1516

1617
-export([create_or_get/3,
@@ -532,4 +533,4 @@ clear_in_khepri() ->
532533
%% --------------------------------------------------------------
533534

534535
khepri_vhost_path(VHost) when ?IS_KHEPRI_PATH_CONDITION(VHost) ->
535-
[?MODULE, VHost].
536+
?KHEPRI_ROOT_PATH ++ [vhosts, VHost].

deps/rabbit/src/rabbit_khepri.erl

+19-5
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@
9494
-include_lib("rabbit_common/include/logging.hrl").
9595
-include_lib("rabbit_common/include/rabbit.hrl").
9696

97+
-include("include/khepri.hrl").
98+
9799
-export([setup/0,
98100
setup/1,
99101
init/0,
@@ -145,6 +147,7 @@
145147

146148
dir/0,
147149
info/0,
150+
root_path/0,
148151

149152
handle_async_ret/1,
150153

@@ -895,6 +898,15 @@ cluster_status_from_khepri() ->
895898
{error, khepri_not_running}
896899
end.
897900

901+
-spec root_path() -> RootPath when
902+
RootPath :: khepri_path:path().
903+
%% @doc Returns the path where RabbitMQ stores every metadata.
904+
%%
905+
%% This path must be prepended to all paths used by RabbitMQ subsystems.
906+
907+
root_path() ->
908+
?KHEPRI_ROOT_PATH.
909+
898910
%% -------------------------------------------------------------------
899911
%% "Proxy" functions to Khepri API.
900912
%% -------------------------------------------------------------------
@@ -1213,10 +1225,11 @@ register_rabbit_index_route_projection() ->
12131225
Options = #{type => bag, keypos => #index_route.source_key},
12141226
Projection = khepri_projection:new(
12151227
rabbit_khepri_index_route, ProjectionFun, Options),
1216-
DirectOrFanout = #if_data_matches{pattern = #{type => '$1'},
1217-
conditions = [{'andalso',
1218-
{'=/=', '$1', headers},
1219-
{'=/=', '$1', topic}}]},
1228+
DirectOrFanout = #if_data_matches{
1229+
pattern = #exchange{type = '$1', _ = '_'},
1230+
conditions = [{'andalso',
1231+
{'=/=', '$1', headers},
1232+
{'=/=', '$1', topic}}]},
12201233
PathPattern = rabbit_db_binding:khepri_route_path(
12211234
_VHost = ?KHEPRI_WILDCARD_STAR,
12221235
_Exchange = DirectOrFanout,
@@ -1319,7 +1332,8 @@ register_rabbit_topic_graph_projection() ->
13191332
Projection = khepri_projection:new(Name, ProjectionFun, Options),
13201333
PathPattern = rabbit_db_binding:khepri_route_path(
13211334
_VHost = ?KHEPRI_WILDCARD_STAR,
1322-
_Exchange = #if_data_matches{pattern = #{type => topic}},
1335+
_Exchange = #if_data_matches{
1336+
pattern = #exchange{type = topic, _ = '_'}},
13231337
_Kind = ?KHEPRI_WILDCARD_STAR,
13241338
_DstName = ?KHEPRI_WILDCARD_STAR,
13251339
_RoutingKey = ?KHEPRI_WILDCARD_STAR),

deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl

+3-4
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ khepri_consistent_hash_path(#exchange{name = Name}) ->
224224
khepri_consistent_hash_path(#resource{virtual_host = VHost, name = Name}) ->
225225
khepri_consistent_hash_path(VHost, Name).
226226

227-
khepri_consistent_hash_path(VHost, Name)
228-
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
229-
?IS_KHEPRI_PATH_CONDITION(Name) ->
230-
[?MODULE, exchange_type_consistent_hash_ring_state, VHost, Name].
227+
khepri_consistent_hash_path(VHost, Name) ->
228+
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, Name),
229+
ExchangePath ++ [consistent_hash_ring_state].

deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl

+3-4
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,6 @@ remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys).
235235
khepri_jms_topic_exchange_path(#resource{virtual_host = VHost, name = Name}) ->
236236
khepri_jms_topic_exchange_path(VHost, Name).
237237

238-
khepri_jms_topic_exchange_path(VHost, Name)
239-
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
240-
?IS_KHEPRI_PATH_CONDITION(Name) ->
241-
[?MODULE, jms_topic_exchange, VHost, Name].
238+
khepri_jms_topic_exchange_path(VHost, Name) ->
239+
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, Name),
240+
ExchangePath ++ [jms_topic].

0 commit comments

Comments
 (0)