Skip to content

Commit ecdf04d

Browse files
Merge pull request #12273 from rabbitmq/md/bp/v4.0.x/khepri-prs
2 parents 067b038 + 2e7f149 commit ecdf04d

11 files changed

+217
-97
lines changed

deps/rabbit/src/rabbit_db.erl

+5-5
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ init() ->
6767
end,
6868

6969
Ret = case rabbit_khepri:is_enabled() of
70-
true -> init_using_khepri();
71-
false -> init_using_mnesia()
70+
true -> init_using_khepri(IsVirgin);
71+
false -> init_using_mnesia(IsVirgin)
7272
end,
7373
case Ret of
7474
ok ->
@@ -91,19 +91,19 @@ pre_init(IsVirgin) ->
9191
OtherMembers = rabbit_nodes:nodes_excl_me(Members),
9292
rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin).
9393

94-
init_using_mnesia() ->
94+
init_using_mnesia(_IsVirgin) ->
9595
?LOG_DEBUG(
9696
"DB: initialize Mnesia",
9797
#{domain => ?RMQLOG_DOMAIN_DB}),
9898
ok = rabbit_mnesia:init(),
9999
?assertEqual(rabbit:data_dir(), mnesia_dir()),
100100
rabbit_sup:start_child(mnesia_sync).
101101

102-
init_using_khepri() ->
102+
init_using_khepri(IsVirgin) ->
103103
?LOG_DEBUG(
104104
"DB: initialize Khepri",
105105
#{domain => ?RMQLOG_DOMAIN_DB}),
106-
rabbit_khepri:init().
106+
rabbit_khepri:init(IsVirgin).
107107

108108
init_finished() ->
109109
%% Used during initialisation by rabbit_logger_exchange_h.erl

deps/rabbit/src/rabbit_db_binding.erl

+12-18
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
-define(MNESIA_SEMI_DURABLE_TABLE, rabbit_semi_durable_route).
5454
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
5555
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
56-
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_bindings).
56+
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
5757
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
5858

5959
%% -------------------------------------------------------------------
@@ -835,9 +835,8 @@ delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
835835
_Kind = ?KHEPRI_WILDCARD_STAR,
836836
_DstName = ?KHEPRI_WILDCARD_STAR,
837837
_RoutingKey = #if_has_data{}),
838-
{ok, Bindings} = khepri_tx:get_many(Path),
839-
ok = khepri_tx:delete_many(Path),
840-
maps:fold(fun(_P, Set, Acc) ->
838+
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
839+
maps:fold(fun(_P, #{data := Set}, Acc) ->
841840
sets:to_list(Set) ++ Acc
842841
end, [], Bindings).
843842

@@ -881,25 +880,20 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
881880
OnlyDurable :: boolean(),
882881
Deletions :: rabbit_binding:deletions().
883882

884-
delete_for_destination_in_khepri(DstName, OnlyDurable) ->
885-
BindingsMap = match_destination_in_khepri(DstName),
886-
maps:foreach(fun(K, _V) -> khepri_tx:delete(K) end, BindingsMap),
887-
Bindings = maps:fold(fun(_, Set, Acc) ->
883+
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
884+
Pattern = khepri_route_path(
885+
VHost,
886+
_SrcName = ?KHEPRI_WILDCARD_STAR,
887+
Kind,
888+
Name,
889+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
890+
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
891+
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
888892
sets:to_list(Set) ++ Acc
889893
end, [], BindingsMap),
890894
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
891895
lists:keysort(#binding.source, Bindings), OnlyDurable).
892896

893-
match_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}) ->
894-
Path = khepri_route_path(
895-
VHost,
896-
_SrcName = ?KHEPRI_WILDCARD_STAR,
897-
Kind,
898-
Name,
899-
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
900-
{ok, Map} = khepri_tx:get_many(Path),
901-
Map.
902-
903897
%% -------------------------------------------------------------------
904898
%% delete_transient_for_destination_in_mnesia().
905899
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_exchange.erl

+64
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
peek_serial/1,
2727
next_serial/1,
2828
delete/2,
29+
delete_all/1,
2930
delete_serial/1,
3031
recover/1,
3132
match/1,
@@ -657,6 +658,69 @@ delete_in_khepri(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSour
657658
ok = khepri_tx:delete(khepri_exchange_path(XName)),
658659
rabbit_db_binding:delete_all_for_exchange_in_khepri(X, OnlyDurable, RemoveBindingsForSource).
659660

661+
%% -------------------------------------------------------------------
662+
%% delete_all().
663+
%% -------------------------------------------------------------------
664+
665+
-spec delete_all(VHostName) -> Ret when
666+
VHostName :: vhost:name(),
667+
Deletions :: rabbit_binding:deletions(),
668+
Ret :: {ok, Deletions}.
669+
%% @doc Deletes all exchanges for a given vhost.
670+
%%
671+
%% @returns an `{ok, Deletions}' tuple containing the {@link
672+
%% rabbit_binding:deletions()} caused by deleting the exchanges under the given
673+
%% vhost.
674+
%%
675+
%% @private
676+
677+
delete_all(VHostName) ->
678+
rabbit_khepri:handle_fallback(
679+
#{mnesia => fun() -> delete_all_in_mnesia(VHostName) end,
680+
khepri => fun() -> delete_all_in_khepri(VHostName) end
681+
}).
682+
683+
delete_all_in_mnesia(VHostName) ->
684+
rabbit_mnesia:execute_mnesia_transaction(
685+
fun() ->
686+
delete_all_in_mnesia_tx(VHostName)
687+
end).
688+
689+
delete_all_in_mnesia_tx(VHostName) ->
690+
Match = #exchange{name = rabbit_misc:r(VHostName, exchange), _ = '_'},
691+
Xs = mnesia:match_object(?MNESIA_TABLE, Match, write),
692+
Deletions =
693+
lists:foldl(
694+
fun(X, Acc) ->
695+
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
696+
unconditional_delete_in_mnesia( X, false),
697+
XDeletions1 = rabbit_binding:add_deletion(
698+
XName, {X, deleted, Bindings}, XDeletions),
699+
rabbit_binding:combine_deletions(Acc, XDeletions1)
700+
end, rabbit_binding:new_deletions(), Xs),
701+
{ok, Deletions}.
702+
703+
delete_all_in_khepri(VHostName) ->
704+
rabbit_khepri:transaction(
705+
fun() ->
706+
delete_all_in_khepri_tx(VHostName)
707+
end, rw, #{timeout => infinity}).
708+
709+
delete_all_in_khepri_tx(VHostName) ->
710+
Pattern = khepri_exchange_path(VHostName, ?KHEPRI_WILDCARD_STAR),
711+
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
712+
Deletions =
713+
maps:fold(
714+
fun(_Path, #{data := X}, Deletions) ->
715+
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
716+
rabbit_db_binding:delete_all_for_exchange_in_khepri(
717+
X, false, true),
718+
Deletions1 = rabbit_binding:add_deletion(
719+
XName, {X, deleted, Bindings}, XDeletions),
720+
rabbit_binding:combine_deletions(Deletions, Deletions1)
721+
end, rabbit_binding:new_deletions(), NodeProps),
722+
{ok, Deletions}.
723+
660724
%% -------------------------------------------------------------------
661725
%% delete_serial().
662726
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_rtparams.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
]).
2424

2525
-define(MNESIA_TABLE, rabbit_runtime_parameters).
26-
-define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparams).
27-
-define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparams).
26+
-define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparam).
27+
-define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparam).
2828
-define(any(Value), case Value of
2929
'_' -> ?KHEPRI_WILDCARD_STAR;
3030
_ -> Value

deps/rabbit/src/rabbit_db_user.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@
7575
-define(MNESIA_TABLE, rabbit_user).
7676
-define(PERM_MNESIA_TABLE, rabbit_user_permission).
7777
-define(TOPIC_PERM_MNESIA_TABLE, rabbit_topic_permission).
78-
-define(KHEPRI_USERS_PROJECTION, rabbit_khepri_users).
79-
-define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permissions).
78+
-define(KHEPRI_USERS_PROJECTION, rabbit_khepri_user).
79+
-define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permission).
8080

8181
%% -------------------------------------------------------------------
8282
%% create().

deps/rabbit/src/rabbit_exchange.erl

+12-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
update_scratch/3, update_decorators/2, immutable/1,
1515
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
1616
route/2, route/3, delete/3, validate_binding/2, count/0,
17-
ensure_deleted/3]).
17+
ensure_deleted/3, delete_all/2]).
1818
-export([list_names/0]).
1919
-export([serialise_events/1]).
2020
-export([serial/1, peek_serial/1]).
@@ -484,6 +484,17 @@ delete(XName, IfUnused, Username) ->
484484
XName#resource.name, Username)
485485
end.
486486

487+
-spec delete_all(VHostName, ActingUser) -> Ret when
488+
VHostName :: vhost:name(),
489+
ActingUser :: rabbit_types:username(),
490+
Ret :: ok.
491+
492+
delete_all(VHostName, ActingUser) ->
493+
{ok, Deletions} = rabbit_db_exchange:delete_all(VHostName),
494+
Deletions1 = rabbit_binding:process_deletions(Deletions),
495+
rabbit_binding:notify_deletions(Deletions1, ActingUser),
496+
ok.
497+
487498
process_deletions({error, _} = E) ->
488499
E;
489500
process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->

0 commit comments

Comments
 (0)