Skip to content

Commit eb5ec2a

Browse files
committed
Support outgoing message interceptors
1 parent fe0401a commit eb5ec2a

22 files changed

+306
-209
lines changed

deps/rabbit/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_on
273273
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
274274
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor
275275

276-
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
276+
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
277277
PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_dlx_integration rabbit_fifo_int
278278
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
279279
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue

deps/rabbit/ct.test.spec

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
, rabbit_fifo_prop_SUITE
116116
, rabbit_fifo_v0_SUITE
117117
, rabbit_local_random_exchange_SUITE
118-
, rabbit_message_interceptor_SUITE
118+
, rabbit_msg_interceptor_SUITE
119119
, rabbit_stream_coordinator_SUITE
120120
, rabbit_stream_sac_coordinator_SUITE
121121
, rabbitmq_4_0_deprecations_SUITE

deps/rabbit/priv/schema/rabbit.schema

+39-14
Original file line numberDiff line numberDiff line change
@@ -2664,27 +2664,52 @@ end}.
26642664
%%
26652665
%% Message interceptors
26662666
%%
2667-
{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
2667+
2668+
{mapping, "message_interceptors.$group.$name.$key", "rabbit.message_interceptors", [
26682669
{datatype, {enum, [true, false]}}]}.
26692670

2670-
{translation, "rabbit.incoming_message_interceptors",
2671+
{translation, "rabbit.message_interceptors",
26712672
fun(Conf) ->
26722673
case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of
26732674
[] ->
26742675
cuttlefish:unset();
26752676
L ->
2676-
[begin
2677-
Interceptor = list_to_atom(Interceptor0),
2678-
Mod = case Interceptor of
2679-
set_header_timestamp ->
2680-
rabbit_message_interceptor_timestamp;
2681-
set_header_routing_node ->
2682-
rabbit_message_interceptor_routing_node;
2683-
_ ->
2684-
cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
2685-
end,
2686-
{Mod, #{overwrite => Overwrite}}
2687-
end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L]
2677+
lists:foldr(
2678+
fun({["message_interceptors", "incoming", "set_header_routing_node", "overwrite"], Overwrite}, Acc)
2679+
when is_boolean(Overwrite) ->
2680+
Mod = rabbit_msg_interceptor_routing_node,
2681+
Cfg = #{overwrite => Overwrite},
2682+
[{Mod, Cfg} | Acc];
2683+
({["message_interceptors", "incoming", "set_header_timestamp", "overwrite"], Overwrite}, Acc)
2684+
when is_boolean(Overwrite) ->
2685+
Mod = rabbit_msg_interceptor_timestamp,
2686+
Cfg = #{incoming => true,
2687+
overwrite => Overwrite},
2688+
case lists:keytake(Mod, 1, Acc) of
2689+
false ->
2690+
[{Mod, Cfg} | Acc];
2691+
{value, {Mod, Cfg1}, Acc1} ->
2692+
Cfg2 = maps:merge(Cfg1, Cfg),
2693+
[{Mod, Cfg2} | Acc1]
2694+
end;
2695+
({["message_interceptors", "outgoing", "timestamp", "enabled"], Enabled}, Acc) ->
2696+
case Enabled of
2697+
true ->
2698+
Mod = rabbit_msg_interceptor_timestamp,
2699+
Cfg = #{outgoing => true},
2700+
case lists:keytake(Mod, 1, Acc) of
2701+
false ->
2702+
[{Mod, Cfg} | Acc];
2703+
{value, {Mod, Cfg1}, Acc1} ->
2704+
Cfg2 = maps:merge(Cfg1, Cfg),
2705+
[{Mod, Cfg2} | Acc1]
2706+
end;
2707+
false ->
2708+
Acc
2709+
end;
2710+
(Other, _Acc) ->
2711+
cuttlefish:invalid(io_lib:format("~p is invalid", [Other]))
2712+
end, [], L)
26882713
end
26892714
end
26902715
}.

deps/rabbit/src/mc_amqpl.erl

+1-1
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,6 @@ protocol_state(#content{properties = #'P_basic'{headers = H00,
462462
priority = Priority0,
463463
delivery_mode = DeliveryMode0} = B0} = C,
464464
Anns) ->
465-
%% Add any x- annotations as headers
466465
H0 = case H00 of
467466
undefined -> [];
468467
_ ->
@@ -474,6 +473,7 @@ protocol_state(#content{properties = #'P_basic'{headers = H00,
474473
_ ->
475474
H0
476475
end,
476+
%% Add any x- annotations as headers
477477
Headers1 = maps:fold(
478478
fun (<<"x-", _/binary>> = Key, Val, H) when is_integer(Val) ->
479479
[{Key, long, Val} | H];

deps/rabbit/src/rabbit.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -1658,8 +1658,8 @@ persist_static_configuration() ->
16581658
classic_queue_store_v2_check_crc32
16591659
]),
16601660

1661-
Interceptors = application:get_env(?MODULE, incoming_message_interceptors, []),
1662-
ok = rabbit_message_interceptor:add(Interceptors, incoming_message_interceptors),
1661+
Interceptors = application:get_env(?MODULE, message_interceptors, []),
1662+
ok = rabbit_msg_interceptor:add(Interceptors),
16631663

16641664
%% Disallow the following two cases:
16651665
%% 1. Negative values

deps/rabbit/src/rabbit_amqp_session.erl

+7-7
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@
284284
max_incoming_window :: pos_integer(),
285285
max_link_credit :: pos_integer(),
286286
max_queue_credit :: pos_integer(),
287-
msg_interceptor_ctx :: rabbit_message_interceptor:context()
287+
msg_interceptor_ctx :: rabbit_msg_interceptor:context()
288288
}).
289289

290290
-record(state, {
@@ -2164,7 +2164,8 @@ handle_deliver(ConsumerTag, AckRequired,
21642164
conn_name = ConnName,
21652165
channel_num = ChannelNum,
21662166
user = #user{username = Username},
2167-
trace_state = Trace}}) ->
2167+
trace_state = Trace,
2168+
msg_interceptor_ctx = MsgIcptCtx}}) ->
21682169
Handle = ctag_to_handle(ConsumerTag),
21692170
case OutgoingLinks0 of
21702171
#{Handle := #outgoing_link{queue_type = QType,
@@ -2180,7 +2181,8 @@ handle_deliver(ConsumerTag, AckRequired,
21802181
message_format = ?UINT(?MESSAGE_FORMAT),
21812182
settled = SendSettled},
21822183
Mc1 = mc:convert(mc_amqp, Mc0),
2183-
Mc = mc:set_annotation(redelivered, Redelivered, Mc1),
2184+
Mc2 = mc:set_annotation(redelivered, Redelivered, Mc1),
2185+
Mc = rabbit_msg_interceptor:intercept_outgoing(Mc2, MsgIcptCtx),
21842186
Sections = mc:protocol_state(Mc),
21852187
validate_message_size(Sections, MaxMessageSize),
21862188
Frames = transfer_frames(Transfer, Sections, MaxFrameSize),
@@ -2417,7 +2419,7 @@ incoming_link_transfer(
24172419
conn_name = ConnName,
24182420
channel_num = ChannelNum,
24192421
max_link_credit = MaxLinkCredit,
2420-
msg_interceptor_ctx = MsgInterceptorCtx}}) ->
2422+
msg_interceptor_ctx = MsgIcptCtx}}) ->
24212423

24222424
{PayloadBin, DeliveryId, Settled} =
24232425
case MultiTransfer of
@@ -2445,9 +2447,7 @@ incoming_link_transfer(
24452447
check_user_id(Mc1, User),
24462448
TopicPermCache = check_write_permitted_on_topics(
24472449
X, User, RoutingKeys, TopicPermCache0),
2448-
Mc2 = rabbit_message_interceptor:intercept(Mc1,
2449-
MsgInterceptorCtx,
2450-
incoming_message_interceptors),
2450+
Mc2 = rabbit_msg_interceptor:intercept_incoming(Mc1, MsgIcptCtx),
24512451
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
24522452
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
24532453
Opts = #{correlation => {HandleInt, DeliveryId}},

deps/rabbit/src/rabbit_channel.erl

+24-21
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
max_consumers, % taken from rabbit.consumer_max_per_channel
112112
%% defines how ofter gc will be executed
113113
writer_gc_threshold,
114-
msg_interceptor_ctx :: rabbit_message_interceptor:context()
114+
msg_interceptor_ctx :: rabbit_msg_interceptor:context()
115115
}).
116116

117117
-record(pending_ack, {
@@ -662,13 +662,14 @@ handle_cast({deliver_reply, _K, _Del},
662662
noreply(State);
663663
handle_cast({deliver_reply, _K, _Msg}, State = #ch{reply_consumer = none}) ->
664664
noreply(State);
665-
handle_cast({deliver_reply, Key, Msg},
666-
State = #ch{cfg = #conf{writer_pid = WriterPid},
665+
handle_cast({deliver_reply, Key, Mc},
666+
State = #ch{cfg = #conf{writer_pid = WriterPid,
667+
msg_interceptor_ctx = MsgIcptCtx},
667668
next_tag = DeliveryTag,
668669
reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
669-
Content = mc:protocol_state(mc:convert(mc_amqpl, Msg)),
670-
ExchName = mc:exchange(Msg),
671-
[RoutingKey | _] = mc:routing_keys(Msg),
670+
ExchName = mc:exchange(Mc),
671+
[RoutingKey | _] = mc:routing_keys(Mc),
672+
Content = outgoing_content(Mc, MsgIcptCtx),
672673
ok = rabbit_writer:send_command(
673674
WriterPid,
674675
#'basic.deliver'{consumer_tag = ConsumerTag,
@@ -1174,7 +1175,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
11741175
trace_state = TraceState,
11751176
authz_context = AuthzContext,
11761177
writer_gc_threshold = GCThreshold,
1177-
msg_interceptor_ctx = MsgInterceptorCtx
1178+
msg_interceptor_ctx = MsgIcptCtx
11781179
},
11791180
tx = Tx,
11801181
confirm_enabled = ConfirmEnabled,
@@ -1214,9 +1215,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12141215
{ok, Message0} ->
12151216
check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext),
12161217
check_user_id_header(Message0, User),
1217-
Message = rabbit_message_interceptor:intercept(Message0,
1218-
MsgInterceptorCtx,
1219-
incoming_message_interceptors),
1218+
Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx),
12201219
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
12211220
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],
12221221
Queues = rabbit_amqqueue:lookup_many(QNames),
@@ -2601,15 +2600,15 @@ handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) ->
26012600
end, State, Msgs).
26022601

26032602
handle_deliver0(ConsumerTag, AckRequired,
2604-
{QName, QPid, _MsgId, Redelivered, MsgCont0} = Msg,
2603+
{QName, QPid, _MsgId, Redelivered, Mc} = Msg,
26052604
State = #ch{cfg = #conf{writer_pid = WriterPid,
2606-
writer_gc_threshold = GCThreshold},
2605+
writer_gc_threshold = GCThreshold,
2606+
msg_interceptor_ctx = MsgIcptCtx},
26072607
next_tag = DeliveryTag,
26082608
queue_states = Qs}) ->
2609-
Exchange = mc:exchange(MsgCont0),
2610-
[RoutingKey | _] = mc:routing_keys(MsgCont0),
2611-
MsgCont = mc:convert(mc_amqpl, MsgCont0),
2612-
Content = mc:protocol_state(MsgCont),
2609+
Exchange = mc:exchange(Mc),
2610+
[RoutingKey | _] = mc:routing_keys(Mc),
2611+
Content = outgoing_content(Mc, MsgIcptCtx),
26132612
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
26142613
delivery_tag = DeliveryTag,
26152614
redelivered = Redelivered,
@@ -2630,12 +2629,11 @@ handle_deliver0(ConsumerTag, AckRequired,
26302629
record_sent(deliver, QueueType, ConsumerTag, AckRequired, Msg, State).
26312630

26322631
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
2633-
Msg0 = {_QName, _QPid, _MsgId, Redelivered, MsgCont0},
2632+
Msg0 = {_QName, _QPid, _MsgId, Redelivered, Mc},
26342633
QueueType, State) ->
2635-
Exchange = mc:exchange(MsgCont0),
2636-
[RoutingKey | _] = mc:routing_keys(MsgCont0),
2637-
MsgCont = mc:convert(mc_amqpl, MsgCont0),
2638-
Content = mc:protocol_state(MsgCont),
2634+
Exchange = mc:exchange(Mc),
2635+
[RoutingKey | _] = mc:routing_keys(Mc),
2636+
Content = outgoing_content(Mc, State#ch.cfg#conf.msg_interceptor_ctx),
26392637
ok = rabbit_writer:send_command(
26402638
WriterPid,
26412639
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -2646,6 +2644,11 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
26462644
Content),
26472645
{noreply, record_sent(get, QueueType, DeliveryTag, not(NoAck), Msg0, State)}.
26482646

2647+
outgoing_content(Mc, MsgIcptCtx) ->
2648+
Mc1 = mc:convert(mc_amqpl, Mc),
2649+
Mc2 = rabbit_msg_interceptor:intercept_outgoing(Mc1, MsgIcptCtx),
2650+
mc:protocol_state(Mc2).
2651+
26492652
init_tick_timer(State = #ch{tick_timer = undefined}) ->
26502653
{ok, Interval} = application:get_env(rabbit, channel_tick_interval),
26512654
State#ch{tick_timer = erlang:send_after(Interval, self(), tick)};

deps/rabbit/src/rabbit_message_interceptor_timestamp.erl

-33
This file was deleted.

deps/rabbit/src/rabbit_message_interceptor.erl renamed to deps/rabbit/src/rabbit_msg_interceptor.erl

+36-23
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
%%
55
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66

7-
-module(rabbit_message_interceptor).
7+
-module(rabbit_msg_interceptor).
88

99
%% client API
10-
-export([intercept/3,
11-
add/2,
12-
remove/2]).
10+
-export([intercept_incoming/2,
11+
intercept_outgoing/2,
12+
add/1,
13+
remove/1]).
1314
%% helpers for behaviour implementations
1415
-export([set_annotation/4]).
1516

@@ -20,46 +21,58 @@
2021
username := rabbit_types:username(),
2122
connection_name := binary(),
2223
atom() => term()}.
23-
-type group() :: incoming_message_interceptors |
24-
outgoing_message_interceptors.
2524
-type config() :: #{atom() => term()}.
2625
-type interceptor() :: {module(), config()}.
26+
-type interceptors() :: [interceptor()].
27+
-type stage() :: incoming | outgoing.
2728

29+
-define(KEY, message_interceptors).
2830

2931
-export_type([context/0]).
3032

31-
-callback intercept(mc:state(), context(), group(), config()) ->
33+
-callback intercept(mc:state(), context(), stage(), config()) ->
3234
mc:state().
3335

34-
-spec intercept(mc:state(), context(), group()) ->
36+
-spec intercept_incoming(mc:state(), context()) ->
3537
mc:state().
36-
intercept(Msg, Ctx, Group) ->
37-
Interceptors = persistent_term:get(Group, []),
38+
intercept_incoming(Msg, Ctx) ->
39+
intercept(Msg, Ctx, incoming).
40+
41+
-spec intercept_outgoing(mc:state(), context()) ->
42+
mc:state().
43+
intercept_outgoing(Msg, Ctx) ->
44+
intercept(Msg, Ctx, outgoing).
45+
46+
intercept(Msg, Ctx, Stage) ->
47+
Interceptors = persistent_term:get(?KEY),
3848
lists:foldl(fun({Mod, Config}, Msg0) ->
39-
Mod:intercept(Msg0, Ctx, Group, Config)
49+
Mod:intercept(Msg0, Ctx, Stage, Config)
4050
end, Msg, Interceptors).
4151

42-
-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) ->
52+
-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(),
53+
Overwrite :: boolean()) ->
4354
mc:state().
44-
set_annotation(Msg, Key, Value, Overwrite) ->
45-
case {mc:x_header(Key, Msg), Overwrite} of
46-
{Val, false} when Val =/= undefined ->
47-
Msg;
55+
set_annotation(Msg, Key, Value, true) ->
56+
mc:set_annotation(Key, Value, Msg);
57+
set_annotation(Msg, Key, Value, false) ->
58+
case mc:x_header(Key, Msg) of
59+
undefined ->
60+
mc:set_annotation(Key, Value, Msg);
4861
_ ->
49-
mc:set_annotation(Key, Value, Msg)
62+
Msg
5063
end.
5164

52-
-spec add([interceptor()], group()) -> ok.
53-
add(Interceptors, Group) ->
65+
-spec add(interceptors()) -> ok.
66+
add(Interceptors) ->
5467
%% validation
5568
lists:foreach(fun({Mod, #{}}) ->
5669
case erlang:function_exported(Mod, intercept, 4) of
5770
true -> ok;
5871
false -> error(Mod)
5972
end
6073
end, Interceptors),
61-
persistent_term:put(Group, persistent_term:get(Group, []) ++ Interceptors).
74+
persistent_term:put(?KEY, persistent_term:get(?KEY, []) ++ Interceptors).
6275

63-
-spec remove([interceptor()], group()) -> ok.
64-
remove(Interceptors, Group) ->
65-
persistent_term:put(Group, persistent_term:get(Group, []) -- Interceptors).
76+
-spec remove(interceptors()) -> ok.
77+
remove(Interceptors) ->
78+
persistent_term:put(?KEY, persistent_term:get(?KEY, []) -- Interceptors).

0 commit comments

Comments
 (0)