Skip to content

Commit 6b1a27c

Browse files
committed
Add unit tests to stream SAC coordinator
Node disconnection/reconnection.
1 parent e73cc86 commit 6b1a27c

File tree

3 files changed

+412
-176
lines changed

3 files changed

+412
-176
lines changed

Diff for: deps/rabbit/src/rabbit_stream_coordinator.erl

+3-2
Original file line numberDiff line numberDiff line change
@@ -720,10 +720,11 @@ apply(#{machine_version := MachineVersion} = Meta,
720720
{Ss#{Id => S}, E}
721721
end, {Streams0, Effects0}, Streams0),
722722

723-
{Sac1, Effects2} = case MachineVersion > 5 of
723+
{Sac1, Effects2} = case MachineVersion > 4 of
724724
true ->
725725
SacMod = sac_module(Meta),
726-
SacMod:handle_node_reconnected(Sac0, Effects1);
726+
SacMod:handle_node_reconnected(Node,
727+
Sac0, Effects1);
727728
false ->
728729
{Sac0, Effects1}
729730
end,

Diff for: deps/rabbit/src/rabbit_stream_sac_coordinator.erl

+32-21
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
ensure_monitors/4,
4343
handle_connection_down/2,
4444
handle_connection_node_disconnected/2,
45-
handle_node_reconnected/2,
45+
handle_node_reconnected/3,
4646
forget_connection/2,
4747
consumer_groups/3,
4848
group_consumers/5,
@@ -298,17 +298,7 @@ apply(#command_purge_nodes{nodes = Nodes}, State0) ->
298298
{State1, ok, Eff}.
299299

300300
purge_node(Node, #?MODULE{groups = Groups0} = State0) ->
301-
PidsGroups =
302-
maps:fold(fun(K, #group{consumers = Consumers}, Acc) ->
303-
lists:foldl(fun(#consumer{pid = Pid}, AccIn)
304-
when node(Pid) =:= Node ->
305-
PG0 = maps:get(Pid, AccIn, #{}),
306-
PG1 = PG0#{K => true},
307-
AccIn#{Pid => PG1};
308-
(_, AccIn) ->
309-
AccIn
310-
end, Acc, Consumers)
311-
end, #{}, Groups0),
301+
PidsGroups = compute_node_pid_group_dependencies(Node, Groups0),
312302
maps:fold(fun(Pid, Groups, {S0, Eff0}) ->
313303
{S1, Eff1} = handle_connection_down0(Pid, S0, Groups),
314304
{S1, Eff1 ++ Eff0}
@@ -578,8 +568,15 @@ ensure_monitors(#command_connection_reconnected{pid = Pid},
578568
{State#?MODULE{pids_groups = AllPidsGroups},
579569
Monitors#{Pid => sac},
580570
[{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]};
571+
ensure_monitors(#command_purge_nodes{},
572+
#?MODULE{groups = Groups} = State,
573+
Monitors,
574+
Effects) ->
575+
AllPidsGroups = compute_pid_group_dependencies(Groups),
576+
{State#?MODULE{pids_groups = AllPidsGroups},
577+
Monitors,
578+
Effects};
581579
ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) ->
582-
%% TODO sac: ensure the pid-group mapping after purge_nodes?
583580
{State0, Monitors, Effects}.
584581

585582
-spec handle_connection_down(connection_pid(), state()) ->
@@ -619,21 +616,21 @@ handle_connection_node_disconnected(ConnPid,
619616
#{connection_pid => ConnPid}}, T}]}
620617
end.
621618

622-
-spec handle_node_reconnected(state(), ra_machine:effects()) ->
619+
-spec handle_node_reconnected(node(), state(), ra_machine:effects()) ->
623620
{state(), ra_machine:effects()}.
624-
handle_node_reconnected(#?MODULE{pids_groups = PidsGroups0,
621+
handle_node_reconnected(Node,
622+
#?MODULE{pids_groups = PidsGroups0,
625623
groups = Groups0} = State0,
626624
Effects0) ->
627-
AllPidsGroups = compute_pid_group_dependencies(Groups0),
628-
NotMonitored = maps:keys(AllPidsGroups) -- maps:keys(PidsGroups0),
625+
NodePidsGroups = compute_node_pid_group_dependencies(Node, Groups0),
626+
PidsGroups1 = maps:merge(PidsGroups0, NodePidsGroups),
629627
Effects1 =
630628
lists:foldr(fun(P, Acc) ->
631629
[notify_connection_effect(P),
632-
{monitor, process, P},
633-
{monitor, node, node(P)} | Acc]
634-
end, Effects0, NotMonitored),
630+
{monitor, process, P} | Acc]
631+
end, Effects0, maps:keys(NodePidsGroups)),
635632

636-
{State0#?MODULE{pids_groups = AllPidsGroups}, Effects1}.
633+
{State0#?MODULE{pids_groups = PidsGroups1}, Effects1}.
637634

638635
-spec forget_connection(connection_pid(), state()) ->
639636
{state(), ra_machine:effects()}.
@@ -1117,3 +1114,17 @@ compute_pid_group_dependencies(Groups) ->
11171114
AccIn#{Pid => PG1}
11181115
end, Acc, Cs)
11191116
end, #{}, Groups).
1117+
1118+
-spec compute_node_pid_group_dependencies(node(), groups()) -> pids_groups().
1119+
compute_node_pid_group_dependencies(Node, Groups) ->
1120+
maps:fold(fun(K, #group{consumers = Consumers}, Acc) ->
1121+
lists:foldl(fun(#consumer{pid = Pid}, AccIn)
1122+
when node(Pid) =:= Node ->
1123+
PG0 = maps:get(Pid, AccIn, #{}),
1124+
PG1 = PG0#{K => true},
1125+
AccIn#{Pid => PG1};
1126+
(_, AccIn) ->
1127+
AccIn
1128+
end, Acc, Consumers)
1129+
end, #{}, Groups).
1130+

0 commit comments

Comments
 (0)