Skip to content

Commit 85ec98c

Browse files
LoisSotoLopezgomoripeti
authored andcommitted
Add incoming message interceptors
This commit enables users to provide custom message interceptor modules, i.e. modules to process incoming and outgoing messages. The `rabbit_message_interceptor` behaviour defines a `intercept/4` callback, for those modules to implement. Co-authored-by: Péter Gömöri <[email protected]>
1 parent e90b47f commit 85ec98c

15 files changed

+304
-75
lines changed

deps/rabbit/priv/schema/rabbit.schema

+94-15
Original file line numberDiff line numberDiff line change
@@ -2667,23 +2667,102 @@ end}.
26672667
{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
26682668
{datatype, {enum, [true, false]}}]}.
26692669

2670+
% Pseudo-key to include the interceptor in the list of interceptors.
2671+
% - If any other configuration is provided for the interceptor this
2672+
% configuration is not required.
2673+
% - If no other configuration is provided, this one is required so that the
2674+
% interceptor gets invoked.
2675+
{mapping, "message_interceptors.incoming.$interceptor.enabled", "rabbit.incoming_message_interceptors", [
2676+
{datatype, {enum, [true]}}]}.
2677+
2678+
{mapping, "message_interceptors.outgoing.$interceptor.enabled", "rabbit.outgoing_message_interceptors", [
2679+
{datatype, {enum, [true]}}]}.
2680+
2681+
{mapping,
2682+
"message_interceptors.incoming.set_header_timestamp.overwrite",
2683+
"rabbit.incoming_message_interceptors",
2684+
[{datatype, {enum, [true, false]}}]}.
2685+
{mapping,
2686+
"message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite",
2687+
"rabbit.incoming_message_interceptors",
2688+
[{datatype, {enum, [true, false]}}]}.
2689+
2690+
{mapping,
2691+
"message_interceptors.incoming.set_header_routing_node.overwrite",
2692+
"rabbit.incoming_message_interceptors",
2693+
[{datatype, {enum, [true, false]}}]}.
2694+
{mapping,
2695+
"message_interceptors.incoming.rabbit_message_interceptor_timestamp.overwrite",
2696+
"rabbit.incoming_message_interceptors",
2697+
[{datatype, {enum, [true, false]}}]}.
2698+
26702699
{translation, "rabbit.incoming_message_interceptors",
26712700
fun(Conf) ->
2672-
case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of
2673-
[] ->
2674-
cuttlefish:unset();
2675-
L ->
2676-
[begin
2677-
Interceptor = list_to_atom(Interceptor0),
2678-
case lists:member(Interceptor, [set_header_timestamp,
2679-
set_header_routing_node]) of
2680-
true ->
2681-
{Interceptor, Overwrite};
2682-
false ->
2683-
cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
2684-
end
2685-
end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L]
2686-
end
2701+
case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of
2702+
[] ->
2703+
cuttlefish:unset();
2704+
L ->
2705+
InterceptorsConfig = [
2706+
{Module0, Config, Value}
2707+
|| {["message_interceptors", "incoming", Module0, Config], Value} <- L
2708+
],
2709+
{Result, Order0} = lists:foldl(
2710+
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2711+
Interceptor = list_to_atom(Interceptor0),
2712+
Key = list_to_atom(Key0),
2713+
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2714+
% This Interceptor -> Module alias exists for
2715+
% compatibility reasons
2716+
Module = case Interceptor of
2717+
set_header_timestamp ->
2718+
rabbit_message_interceptor_timestamp;
2719+
set_header_routing_node ->
2720+
rabbit_message_interceptor_routing_node;
2721+
_ ->
2722+
Interceptor
2723+
end,
2724+
NewAcc = maps:update_with(Module,
2725+
MapPutFun,
2726+
#{Key => Value},
2727+
Acc),
2728+
{NewAcc, [Module| Order]}
2729+
end,
2730+
{#{}, []},
2731+
InterceptorsConfig
2732+
),
2733+
Order = lists:uniq(Order0),
2734+
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2735+
end
2736+
end
2737+
}.
2738+
2739+
{translation, "rabbit.outgoing_message_interceptors",
2740+
fun(Conf) ->
2741+
case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of
2742+
[] ->
2743+
cuttlefish:unset();
2744+
L ->
2745+
InterceptorsConfig = [
2746+
{Module0, Config, Value}
2747+
|| {["message_interceptors", "outgoing", Module0, Config], Value} <- L
2748+
],
2749+
{Result, Order0} = lists:foldl(
2750+
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2751+
Module = list_to_atom(Interceptor0),
2752+
Key = list_to_atom(Key0),
2753+
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2754+
NewAcc = maps:update_with(Module,
2755+
MapPutFun,
2756+
#{Key => Value},
2757+
Acc),
2758+
{NewAcc, [Module| Order]}
2759+
end,
2760+
{#{}, []},
2761+
InterceptorsConfig
2762+
),
2763+
Order = lists:uniq(Order0),
2764+
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2765+
end
26872766
end
26882767
}.
26892768

deps/rabbit/src/rabbit.erl

+2-1
Original file line numberDiff line numberDiff line change
@@ -1656,7 +1656,8 @@ persist_static_configuration() ->
16561656
[classic_queue_index_v2_segment_entry_count,
16571657
classic_queue_store_v2_max_cache_size,
16581658
classic_queue_store_v2_check_crc32,
1659-
incoming_message_interceptors
1659+
incoming_message_interceptors,
1660+
outgoing_message_interceptors
16601661
]),
16611662

16621663
%% Disallow the following two cases:

deps/rabbit/src/rabbit_amqp_session.erl

+12-4
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,8 @@
283283
max_handle :: link_handle(),
284284
max_incoming_window :: pos_integer(),
285285
max_link_credit :: pos_integer(),
286-
max_queue_credit :: pos_integer()
286+
max_queue_credit :: pos_integer(),
287+
msg_interceptor_ctx :: map()
287288
}).
288289

289290
-record(state, {
@@ -474,7 +475,11 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId,
474475
max_handle = EffectiveHandleMax,
475476
max_incoming_window = MaxIncomingWindow,
476477
max_link_credit = MaxLinkCredit,
477-
max_queue_credit = MaxQueueCredit
478+
max_queue_credit = MaxQueueCredit,
479+
msg_interceptor_ctx = #{protocol => ?PROTOCOL,
480+
username => User#user.username,
481+
vhost => Vhost,
482+
conn_name => ConnName}
478483
}}}.
479484

480485
terminate(_Reason, #state{incoming_links = IncomingLinks,
@@ -2411,7 +2416,8 @@ incoming_link_transfer(
24112416
trace_state = Trace,
24122417
conn_name = ConnName,
24132418
channel_num = ChannelNum,
2414-
max_link_credit = MaxLinkCredit}}) ->
2419+
max_link_credit = MaxLinkCredit,
2420+
msg_interceptor_ctx = MsgInterceptorCtx}}) ->
24152421

24162422
{PayloadBin, DeliveryId, Settled} =
24172423
case MultiTransfer of
@@ -2436,7 +2442,9 @@ incoming_link_transfer(
24362442
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
24372443
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
24382444
{ok, X, RoutingKeys, Mc1, PermCache} ->
2439-
Mc2 = rabbit_message_interceptor:intercept(Mc1),
2445+
Mc2 = rabbit_message_interceptor:intercept(Mc1,
2446+
MsgInterceptorCtx,
2447+
incoming_message_interceptors),
24402448
check_user_id(Mc2, User),
24412449
TopicPermCache = check_write_permitted_on_topics(
24422450
X, User, RoutingKeys, TopicPermCache0),

deps/rabbit/src/rabbit_channel.erl

+13-4
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@
110110
authz_context,
111111
max_consumers, % taken from rabbit.consumer_max_per_channel
112112
%% defines how ofter gc will be executed
113-
writer_gc_threshold
113+
writer_gc_threshold,
114+
msg_interceptor_ctx
114115
}).
115116

116117
-record(pending_ack, {
@@ -509,7 +510,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
509510
consumer_timeout = ConsumerTimeout,
510511
authz_context = OptionalVariables,
511512
max_consumers = MaxConsumers,
512-
writer_gc_threshold = GCThreshold
513+
writer_gc_threshold = GCThreshold,
514+
msg_interceptor_ctx = #{protocol => amqp091,
515+
username => User#user.username,
516+
vhost => VHost,
517+
conn_name => ConnName}
513518
},
514519
limiter = Limiter,
515520
tx = none,
@@ -813,6 +818,7 @@ get_consumer_timeout() ->
813818
_ ->
814819
undefined
815820
end.
821+
816822
%%---------------------------------------------------------------------------
817823

818824
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -1167,7 +1173,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
11671173
user = #user{username = Username} = User,
11681174
trace_state = TraceState,
11691175
authz_context = AuthzContext,
1170-
writer_gc_threshold = GCThreshold
1176+
writer_gc_threshold = GCThreshold,
1177+
msg_interceptor_ctx = MsgInterceptorCtx
11711178
},
11721179
tx = Tx,
11731180
confirm_enabled = ConfirmEnabled,
@@ -1206,7 +1213,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12061213
rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]);
12071214
{ok, Message0} ->
12081215
check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext),
1209-
Message = rabbit_message_interceptor:intercept(Message0),
1216+
Message = rabbit_message_interceptor:intercept(Message0,
1217+
MsgInterceptorCtx,
1218+
incoming_message_interceptors),
12101219
check_user_id_header(Message, User),
12111220
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
12121221
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],
+37-38
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,49 @@
1-
%% This Source Code Form is subject to the terms of the Mozilla Public
2-
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3-
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4-
%%
5-
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6-
7-
%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp
8-
%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can
9-
%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols (that
10-
%% do not use rabbit_channel) to also add AMQP 0.9.1 headers to incoming messages.
111
-module(rabbit_message_interceptor).
12-
-include("mc.hrl").
132

14-
-export([intercept/1]).
3+
-export([intercept/3,
4+
set_msg_annotation/4]).
5+
6+
-type protocol() :: amqp091 | amqp10 | mqtt310 | mqtt311 | mqtt50.
157

16-
-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>).
17-
-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
8+
-type msg_interceptor_ctx() :: #{protocol := protocol(),
9+
vhost := binary(),
10+
username := binary(),
11+
conn_name => binary(),
12+
atom() => term()}.
1813

19-
-spec intercept(mc:state()) -> mc:state().
20-
intercept(Msg) ->
21-
Interceptors = persistent_term:get(incoming_message_interceptors, []),
22-
lists:foldl(fun({InterceptorName, Overwrite}, M) ->
23-
intercept(M, InterceptorName, Overwrite)
24-
end, Msg, Interceptors).
14+
-callback intercept(Msg, MsgInterceptorCtx, Group, Config) -> Msg when
15+
Msg :: mc:state(),
16+
MsgInterceptorCtx :: msg_interceptor_ctx(),
17+
Group :: incoming_message_interceptors | outgoing_message_interceptors,
18+
Config :: #{atom() := term()}.
2519

26-
intercept(Msg, set_header_routing_node, Overwrite) ->
27-
Node = atom_to_binary(node()),
28-
set_annotation(Msg, ?HEADER_ROUTING_NODE, Node, Overwrite);
29-
intercept(Msg0, set_header_timestamp, Overwrite) ->
30-
Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0),
31-
Msg = set_annotation(Msg0, ?HEADER_TIMESTAMP, Ts, Overwrite),
32-
set_timestamp(Msg, Ts, Overwrite).
20+
-spec intercept(Msg, MsgInterceptorCtx, Group) -> Msg when
21+
Msg :: mc:state(),
22+
MsgInterceptorCtx :: map(),
23+
Group :: incoming_message_interceptors | outgoing_message_interceptors.
24+
intercept(Msg, MsgInterceptorCtx, Group) ->
25+
Interceptors = persistent_term:get(Group, []),
26+
lists:foldl(fun({Module, Config}, Msg0) ->
27+
try
28+
Module:intercept(Msg0,
29+
MsgInterceptorCtx,
30+
Group,
31+
Config)
32+
catch
33+
error:undef ->
34+
Msg0
35+
end
36+
end, Msg , Interceptors).
3337

34-
-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) -> mc:state().
35-
set_annotation(Msg, Key, Value, Overwrite) ->
38+
-spec set_msg_annotation(mc:state(),
39+
mc:ann_key(),
40+
mc:ann_value(),
41+
boolean()
42+
) -> mc:state().
43+
set_msg_annotation(Msg, Key, Value, Overwrite) ->
3644
case {mc:x_header(Key, Msg), Overwrite} of
3745
{Val, false} when Val =/= undefined ->
3846
Msg;
3947
_ ->
4048
mc:set_annotation(Key, Value, Msg)
4149
end.
42-
43-
-spec set_timestamp(mc:state(), pos_integer(), boolean()) -> mc:state().
44-
set_timestamp(Msg, Timestamp, Overwrite) ->
45-
case {mc:timestamp(Msg), Overwrite} of
46-
{Ts, false} when is_integer(Ts) ->
47-
Msg;
48-
_ ->
49-
mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg)
50-
end.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-module(rabbit_message_interceptor_routing_node).
2+
-behaviour(rabbit_message_interceptor).
3+
4+
-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
5+
6+
-export([intercept/4]).
7+
8+
intercept(Msg, _MsgInterceptorCtx, _Group, Config) ->
9+
Node = atom_to_binary(node()),
10+
Overwrite = maps:get(overwrite, Config, false),
11+
rabbit_message_interceptor:set_msg_annotation(Msg,
12+
?HEADER_ROUTING_NODE,
13+
Node,
14+
Overwrite).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-module(rabbit_message_interceptor_timestamp).
2+
-behaviour(rabbit_message_interceptor).
3+
4+
-include("mc.hrl").
5+
6+
-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>).
7+
8+
-export([intercept/4]).
9+
10+
intercept(Msg0, _MsgInterceptorCtx, _Group, Config) ->
11+
Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0),
12+
Overwrite = maps:get(overwrite, Config, false),
13+
Msg = rabbit_message_interceptor:set_msg_annotation(
14+
Msg0,
15+
?HEADER_TIMESTAMP,
16+
Ts,
17+
Overwrite),
18+
set_msg_timestamp(Msg, Ts, Overwrite).
19+
20+
set_msg_timestamp(Msg, Timestamp, Overwrite) ->
21+
case {mc:timestamp(Msg), Overwrite} of
22+
{Ts, false} when is_integer(Ts) ->
23+
Msg;
24+
_ ->
25+
mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg)
26+
end.

deps/rabbit/test/amqp_client_SUITE.erl

+5-2
Original file line numberDiff line numberDiff line change
@@ -4380,8 +4380,11 @@ available_messages(QType, Config) ->
43804380

43814381
incoming_message_interceptors(Config) ->
43824382
Key = ?FUNCTION_NAME,
4383-
ok = rpc(Config, persistent_term, put, [Key, [{set_header_routing_node, false},
4384-
{set_header_timestamp, false}]]),
4383+
ok = rpc(Config,
4384+
persistent_term,
4385+
put,
4386+
[Key, [{rabbit_message_interceptor_routing_node, #{overwrite => false}},
4387+
{rabbit_message_interceptor_timestamp, #{overwrite => false}}]]),
43854388
Stream = <<"my stream">>,
43864389
QQName = <<"my quorum queue">>,
43874390
{_, Session, LinkPair} = Init = init(Config),

0 commit comments

Comments
 (0)