Skip to content

Message Interceptors #13760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_on
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor

PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_dlx_integration rabbit_fifo_int
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/ct.test.spec
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
, rabbit_fifo_prop_SUITE
, rabbit_fifo_v0_SUITE
, rabbit_local_random_exchange_SUITE
, rabbit_message_interceptor_SUITE
, rabbit_msg_interceptor_SUITE
, rabbit_stream_coordinator_SUITE
, rabbit_stream_sac_coordinator_SUITE
, rabbitmq_4_0_deprecations_SUITE
Expand Down
55 changes: 41 additions & 14 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2664,27 +2664,54 @@ end}.
%%
%% Message interceptors
%%
{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [

{mapping, "message_interceptors.$stage.$name.$key", "rabbit.message_interceptors", [
{datatype, {enum, [true, false]}}]}.

{translation, "rabbit.incoming_message_interceptors",
fun(Conf) ->
{translation, "rabbit.message_interceptors",
fun(Conf) ->
case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of
[] ->
cuttlefish:unset();
L ->
[begin
Interceptor = list_to_atom(Interceptor0),
case lists:member(Interceptor, [set_header_timestamp,
set_header_routing_node]) of
true ->
{Interceptor, Overwrite};
false ->
cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
end
end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L]
lists:foldr(
fun({["message_interceptors", "incoming", "set_header_routing_node", "overwrite"], Overwrite}, Acc)
when is_boolean(Overwrite) ->
Mod = rabbit_msg_interceptor_routing_node,
Cfg = #{overwrite => Overwrite},
[{Mod, Cfg} | Acc];
({["message_interceptors", "incoming", "set_header_timestamp", "overwrite"], Overwrite}, Acc)
when is_boolean(Overwrite) ->
Mod = rabbit_msg_interceptor_timestamp,
Cfg = #{incoming => true,
overwrite => Overwrite},
case lists:keytake(Mod, 1, Acc) of
false ->
[{Mod, Cfg} | Acc];
{value, {Mod, Cfg1}, Acc1} ->
Cfg2 = maps:merge(Cfg1, Cfg),
[{Mod, Cfg2} | Acc1]
end;
({["message_interceptors", "outgoing", "timestamp", "enabled"], Enabled}, Acc) ->
case Enabled of
true ->
Mod = rabbit_msg_interceptor_timestamp,
Cfg = #{outgoing => true},
case lists:keytake(Mod, 1, Acc) of
false ->
[{Mod, Cfg} | Acc];
{value, {Mod, Cfg1}, Acc1} ->
Cfg2 = maps:merge(Cfg1, Cfg),
[{Mod, Cfg2} | Acc1]
end;
false ->
Acc
end;
(Other, _Acc) ->
cuttlefish:invalid(io_lib:format("~p is invalid", [Other]))
end, [], L)
end
end
end
}.

{mapping, "stream.replication.port_range.min", "osiris.port_range", [
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,6 @@ protocol_state(#content{properties = #'P_basic'{headers = H00,
priority = Priority0,
delivery_mode = DeliveryMode0} = B0} = C,
Anns) ->
%% Add any x- annotations as headers
H0 = case H00 of
undefined -> [];
_ ->
Expand All @@ -474,6 +473,7 @@ protocol_state(#content{properties = #'P_basic'{headers = H00,
_ ->
H0
end,
%% Add any x- annotations as headers
Headers1 = maps:fold(
fun (<<"x-", _/binary>> = Key, Val, H) when is_integer(Val) ->
[{Key, long, Val} | H];
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1655,10 +1655,12 @@ persist_static_configuration() ->
persist_static_configuration(
[classic_queue_index_v2_segment_entry_count,
classic_queue_store_v2_max_cache_size,
classic_queue_store_v2_check_crc32,
incoming_message_interceptors
classic_queue_store_v2_check_crc32
]),

Interceptors = application:get_env(?MODULE, message_interceptors, []),
ok = rabbit_msg_interceptor:add(Interceptors),

%% Disallow the following two cases:
%% 1. Negative values
%% 2. MoreCreditAfter greater than InitialCredit
Expand Down
24 changes: 16 additions & 8 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@
max_handle :: link_handle(),
max_incoming_window :: pos_integer(),
max_link_credit :: pos_integer(),
max_queue_credit :: pos_integer()
max_queue_credit :: pos_integer(),
msg_interceptor_ctx :: rabbit_msg_interceptor:context()
}).

-record(state, {
Expand Down Expand Up @@ -474,7 +475,11 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId,
max_handle = EffectiveHandleMax,
max_incoming_window = MaxIncomingWindow,
max_link_credit = MaxLinkCredit,
max_queue_credit = MaxQueueCredit
max_queue_credit = MaxQueueCredit,
msg_interceptor_ctx = #{protocol => ?PROTOCOL,
vhost => Vhost,
username => User#user.username,
connection_name => ConnName}
}}}.

terminate(_Reason, #state{incoming_links = IncomingLinks,
Expand Down Expand Up @@ -2159,7 +2164,8 @@ handle_deliver(ConsumerTag, AckRequired,
conn_name = ConnName,
channel_num = ChannelNum,
user = #user{username = Username},
trace_state = Trace}}) ->
trace_state = Trace,
msg_interceptor_ctx = MsgIcptCtx}}) ->
Handle = ctag_to_handle(ConsumerTag),
case OutgoingLinks0 of
#{Handle := #outgoing_link{queue_type = QType,
Expand All @@ -2174,8 +2180,9 @@ handle_deliver(ConsumerTag, AckRequired,
delivery_tag = {binary, Dtag},
message_format = ?UINT(?MESSAGE_FORMAT),
settled = SendSettled},
Mc1 = mc:convert(mc_amqp, Mc0),
Mc = mc:set_annotation(redelivered, Redelivered, Mc1),
Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx),
Mc2 = mc:convert(mc_amqp, Mc1),
Mc = mc:set_annotation(redelivered, Redelivered, Mc2),
Sections = mc:protocol_state(Mc),
validate_message_size(Sections, MaxMessageSize),
Frames = transfer_frames(Transfer, Sections, MaxFrameSize),
Expand Down Expand Up @@ -2411,7 +2418,8 @@ incoming_link_transfer(
trace_state = Trace,
conn_name = ConnName,
channel_num = ChannelNum,
max_link_credit = MaxLinkCredit}}) ->
max_link_credit = MaxLinkCredit,
msg_interceptor_ctx = MsgIcptCtx}}) ->

{PayloadBin, DeliveryId, Settled} =
case MultiTransfer of
Expand All @@ -2436,10 +2444,10 @@ incoming_link_transfer(
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
{ok, X, RoutingKeys, Mc1, PermCache} ->
Mc2 = rabbit_message_interceptor:intercept(Mc1),
check_user_id(Mc2, User),
check_user_id(Mc1, User),
TopicPermCache = check_write_permitted_on_topics(
X, User, RoutingKeys, TopicPermCache0),
Mc2 = rabbit_msg_interceptor:intercept_incoming(Mc1, MsgIcptCtx),
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
Opts = #{correlation => {HandleInt, DeliveryId}},
Expand Down
56 changes: 34 additions & 22 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@
authz_context,
max_consumers, % taken from rabbit.consumer_max_per_channel
%% defines how ofter gc will be executed
writer_gc_threshold
writer_gc_threshold,
msg_interceptor_ctx :: rabbit_msg_interceptor:context()
}).

-record(pending_ack, {
Expand Down Expand Up @@ -492,6 +493,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
MsgIcptCtx = #{protocol => amqp091,
vhost => VHost,
username => User#user.username,
connection_name => ConnName},
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
Expand All @@ -509,8 +514,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
consumer_timeout = ConsumerTimeout,
authz_context = OptionalVariables,
max_consumers = MaxConsumers,
writer_gc_threshold = GCThreshold
},
writer_gc_threshold = GCThreshold,
msg_interceptor_ctx = MsgIcptCtx},
limiter = Limiter,
tx = none,
next_tag = 1,
Expand Down Expand Up @@ -657,13 +662,14 @@ handle_cast({deliver_reply, _K, _Del},
noreply(State);
handle_cast({deliver_reply, _K, _Msg}, State = #ch{reply_consumer = none}) ->
noreply(State);
handle_cast({deliver_reply, Key, Msg},
State = #ch{cfg = #conf{writer_pid = WriterPid},
handle_cast({deliver_reply, Key, Mc},
State = #ch{cfg = #conf{writer_pid = WriterPid,
msg_interceptor_ctx = MsgIcptCtx},
next_tag = DeliveryTag,
reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
Content = mc:protocol_state(mc:convert(mc_amqpl, Msg)),
ExchName = mc:exchange(Msg),
[RoutingKey | _] = mc:routing_keys(Msg),
ExchName = mc:exchange(Mc),
[RoutingKey | _] = mc:routing_keys(Mc),
Content = outgoing_content(Mc, MsgIcptCtx),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.deliver'{consumer_tag = ConsumerTag,
Expand Down Expand Up @@ -813,6 +819,7 @@ get_consumer_timeout() ->
_ ->
undefined
end.

%%---------------------------------------------------------------------------

reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
Expand Down Expand Up @@ -1167,7 +1174,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
user = #user{username = Username} = User,
trace_state = TraceState,
authz_context = AuthzContext,
writer_gc_threshold = GCThreshold
writer_gc_threshold = GCThreshold,
msg_interceptor_ctx = MsgIcptCtx
},
tx = Tx,
confirm_enabled = ConfirmEnabled,
Expand Down Expand Up @@ -1206,8 +1214,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]);
{ok, Message0} ->
check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext),
Message = rabbit_message_interceptor:intercept(Message0),
check_user_id_header(Message, User),
check_user_id_header(Message0, User),
Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx),
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],
Queues = rabbit_amqqueue:lookup_many(QNames),
Expand Down Expand Up @@ -2592,15 +2600,15 @@ handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) ->
end, State, Msgs).

handle_deliver0(ConsumerTag, AckRequired,
{QName, QPid, _MsgId, Redelivered, MsgCont0} = Msg,
{QName, QPid, _MsgId, Redelivered, Mc} = Msg,
State = #ch{cfg = #conf{writer_pid = WriterPid,
writer_gc_threshold = GCThreshold},
writer_gc_threshold = GCThreshold,
msg_interceptor_ctx = MsgIcptCtx},
next_tag = DeliveryTag,
queue_states = Qs}) ->
Exchange = mc:exchange(MsgCont0),
[RoutingKey | _] = mc:routing_keys(MsgCont0),
MsgCont = mc:convert(mc_amqpl, MsgCont0),
Content = mc:protocol_state(MsgCont),
Exchange = mc:exchange(Mc),
[RoutingKey | _] = mc:routing_keys(Mc),
Content = outgoing_content(Mc, MsgIcptCtx),
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
redelivered = Redelivered,
Expand All @@ -2621,12 +2629,11 @@ handle_deliver0(ConsumerTag, AckRequired,
record_sent(deliver, QueueType, ConsumerTag, AckRequired, Msg, State).

handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
Msg0 = {_QName, _QPid, _MsgId, Redelivered, MsgCont0},
Msg0 = {_QName, _QPid, _MsgId, Redelivered, Mc},
QueueType, State) ->
Exchange = mc:exchange(MsgCont0),
[RoutingKey | _] = mc:routing_keys(MsgCont0),
MsgCont = mc:convert(mc_amqpl, MsgCont0),
Content = mc:protocol_state(MsgCont),
Exchange = mc:exchange(Mc),
[RoutingKey | _] = mc:routing_keys(Mc),
Content = outgoing_content(Mc, State#ch.cfg#conf.msg_interceptor_ctx),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
Expand All @@ -2637,6 +2644,11 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
Content),
{noreply, record_sent(get, QueueType, DeliveryTag, not(NoAck), Msg0, State)}.

outgoing_content(Mc, MsgIcptCtx) ->
Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc, MsgIcptCtx),
Mc2 = mc:convert(mc_amqpl, Mc1),
mc:protocol_state(Mc2).

init_tick_timer(State = #ch{tick_timer = undefined}) ->
{ok, Interval} = application:get_env(rabbit, channel_tick_interval),
State#ch{tick_timer = erlang:send_after(Interval, self(), tick)};
Expand Down
50 changes: 0 additions & 50 deletions deps/rabbit/src/rabbit_message_interceptor.erl

This file was deleted.

Loading
Loading