Skip to content

Commit 12bf6ec

Browse files
committed
Improve message interceptors
1. Force the config for timestamp and routing node message interceptors to be configured with the overwrite boolean() to avoid defining multiple default values throughout the code. 2. Add type specs 3. Extend existing test case for new MQTT client ID interceptor 4. routing node and timestamp should only set the annotation for incoming_message_interceptors group 5. Fix `rabbitmq.conf`. Prior to this commit there were several issue: a.) Setting the right configuration was too user unfriendly, e.g. the user has to set ``` message_interceptor.incoming.rabbit_mqtt_message_interceptor_client_id.annotation_key = x-opt-mqtt-client-id ``` just to enable the MQTT message interceptor. b.) The code that parses was too difficult to understand c.) MQTT plugin was setting the env for app rabbit, which is an anti-pattern d.) disabling a plugin (e.g. MQTT), left its message interceptors still in place This is now all fixed, the user sets the rabbitmq.conf as follows: ``` message_interceptors.incoming.set_header_timestamp.overwrite = true message_interceptors.incoming.set_header_routing_node.overwrite = false mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true ``` Note that the first two lines use the same format as for RabbitMQ 4.0 for backwards compatiblity. The last line (MQTT) follows a similar pattern.
1 parent 85ec98c commit 12bf6ec

14 files changed

+221
-254
lines changed

deps/rabbit/priv/schema/rabbit.schema

+19-96
Original file line numberDiff line numberDiff line change
@@ -2667,103 +2667,26 @@ end}.
26672667
{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
26682668
{datatype, {enum, [true, false]}}]}.
26692669

2670-
% Pseudo-key to include the interceptor in the list of interceptors.
2671-
% - If any other configuration is provided for the interceptor this
2672-
% configuration is not required.
2673-
% - If no other configuration is provided, this one is required so that the
2674-
% interceptor gets invoked.
2675-
{mapping, "message_interceptors.incoming.$interceptor.enabled", "rabbit.incoming_message_interceptors", [
2676-
{datatype, {enum, [true]}}]}.
2677-
2678-
{mapping, "message_interceptors.outgoing.$interceptor.enabled", "rabbit.outgoing_message_interceptors", [
2679-
{datatype, {enum, [true]}}]}.
2680-
2681-
{mapping,
2682-
"message_interceptors.incoming.set_header_timestamp.overwrite",
2683-
"rabbit.incoming_message_interceptors",
2684-
[{datatype, {enum, [true, false]}}]}.
2685-
{mapping,
2686-
"message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite",
2687-
"rabbit.incoming_message_interceptors",
2688-
[{datatype, {enum, [true, false]}}]}.
2689-
2690-
{mapping,
2691-
"message_interceptors.incoming.set_header_routing_node.overwrite",
2692-
"rabbit.incoming_message_interceptors",
2693-
[{datatype, {enum, [true, false]}}]}.
2694-
{mapping,
2695-
"message_interceptors.incoming.rabbit_message_interceptor_timestamp.overwrite",
2696-
"rabbit.incoming_message_interceptors",
2697-
[{datatype, {enum, [true, false]}}]}.
2698-
26992670
{translation, "rabbit.incoming_message_interceptors",
2700-
fun(Conf) ->
2701-
case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of
2702-
[] ->
2703-
cuttlefish:unset();
2704-
L ->
2705-
InterceptorsConfig = [
2706-
{Module0, Config, Value}
2707-
|| {["message_interceptors", "incoming", Module0, Config], Value} <- L
2708-
],
2709-
{Result, Order0} = lists:foldl(
2710-
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2711-
Interceptor = list_to_atom(Interceptor0),
2712-
Key = list_to_atom(Key0),
2713-
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2714-
% This Interceptor -> Module alias exists for
2715-
% compatibility reasons
2716-
Module = case Interceptor of
2717-
set_header_timestamp ->
2718-
rabbit_message_interceptor_timestamp;
2719-
set_header_routing_node ->
2720-
rabbit_message_interceptor_routing_node;
2721-
_ ->
2722-
Interceptor
2723-
end,
2724-
NewAcc = maps:update_with(Module,
2725-
MapPutFun,
2726-
#{Key => Value},
2727-
Acc),
2728-
{NewAcc, [Module| Order]}
2729-
end,
2730-
{#{}, []},
2731-
InterceptorsConfig
2732-
),
2733-
Order = lists:uniq(Order0),
2734-
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2735-
end
2736-
end
2737-
}.
2738-
2739-
{translation, "rabbit.outgoing_message_interceptors",
2740-
fun(Conf) ->
2741-
case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of
2742-
[] ->
2743-
cuttlefish:unset();
2744-
L ->
2745-
InterceptorsConfig = [
2746-
{Module0, Config, Value}
2747-
|| {["message_interceptors", "outgoing", Module0, Config], Value} <- L
2748-
],
2749-
{Result, Order0} = lists:foldl(
2750-
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2751-
Module = list_to_atom(Interceptor0),
2752-
Key = list_to_atom(Key0),
2753-
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2754-
NewAcc = maps:update_with(Module,
2755-
MapPutFun,
2756-
#{Key => Value},
2757-
Acc),
2758-
{NewAcc, [Module| Order]}
2759-
end,
2760-
{#{}, []},
2761-
InterceptorsConfig
2762-
),
2763-
Order = lists:uniq(Order0),
2764-
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2765-
end
2766-
end
2671+
fun(Conf) ->
2672+
case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of
2673+
[] ->
2674+
cuttlefish:unset();
2675+
L ->
2676+
[begin
2677+
Interceptor = list_to_atom(Interceptor0),
2678+
Mod = case Interceptor of
2679+
set_header_timestamp ->
2680+
rabbit_message_interceptor_timestamp;
2681+
set_header_routing_node ->
2682+
rabbit_message_interceptor_routing_node;
2683+
_ ->
2684+
cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
2685+
end,
2686+
{Mod, #{overwrite => Overwrite}}
2687+
end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L]
2688+
end
2689+
end
27672690
}.
27682691

27692692
{mapping, "stream.replication.port_range.min", "osiris.port_range", [

deps/rabbit/src/rabbit.erl

+4-3
Original file line numberDiff line numberDiff line change
@@ -1655,11 +1655,12 @@ persist_static_configuration() ->
16551655
persist_static_configuration(
16561656
[classic_queue_index_v2_segment_entry_count,
16571657
classic_queue_store_v2_max_cache_size,
1658-
classic_queue_store_v2_check_crc32,
1659-
incoming_message_interceptors,
1660-
outgoing_message_interceptors
1658+
classic_queue_store_v2_check_crc32
16611659
]),
16621660

1661+
Interceptors = application:get_env(?MODULE, incoming_message_interceptors, []),
1662+
ok = rabbit_message_interceptor:add(Interceptors, incoming_message_interceptors),
1663+
16631664
%% Disallow the following two cases:
16641665
%% 1. Negative values
16651666
%% 2. MoreCreditAfter greater than InitialCredit

deps/rabbit/src/rabbit_amqp_session.erl

+6-6
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@
284284
max_incoming_window :: pos_integer(),
285285
max_link_credit :: pos_integer(),
286286
max_queue_credit :: pos_integer(),
287-
msg_interceptor_ctx :: map()
287+
msg_interceptor_ctx :: rabbit_message_interceptor:context()
288288
}).
289289

290290
-record(state, {
@@ -477,9 +477,9 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId,
477477
max_link_credit = MaxLinkCredit,
478478
max_queue_credit = MaxQueueCredit,
479479
msg_interceptor_ctx = #{protocol => ?PROTOCOL,
480-
username => User#user.username,
481480
vhost => Vhost,
482-
conn_name => ConnName}
481+
username => User#user.username,
482+
connection_name => ConnName}
483483
}}}.
484484

485485
terminate(_Reason, #state{incoming_links = IncomingLinks,
@@ -2442,12 +2442,12 @@ incoming_link_transfer(
24422442
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
24432443
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
24442444
{ok, X, RoutingKeys, Mc1, PermCache} ->
2445+
check_user_id(Mc1, User),
2446+
TopicPermCache = check_write_permitted_on_topics(
2447+
X, User, RoutingKeys, TopicPermCache0),
24452448
Mc2 = rabbit_message_interceptor:intercept(Mc1,
24462449
MsgInterceptorCtx,
24472450
incoming_message_interceptors),
2448-
check_user_id(Mc2, User),
2449-
TopicPermCache = check_write_permitted_on_topics(
2450-
X, User, RoutingKeys, TopicPermCache0),
24512451
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
24522452
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
24532453
Opts = #{correlation => {HandleInt, DeliveryId}},

deps/rabbit/src/rabbit_channel.erl

+7-7
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
max_consumers, % taken from rabbit.consumer_max_per_channel
112112
%% defines how ofter gc will be executed
113113
writer_gc_threshold,
114-
msg_interceptor_ctx
114+
msg_interceptor_ctx :: rabbit_message_interceptor:context()
115115
}).
116116

117117
-record(pending_ack, {
@@ -493,6 +493,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
493493
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
494494
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
495495
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
496+
MsgInterceptorCtx = #{protocol => amqp091,
497+
vhost => VHost,
498+
username => User#user.username,
499+
connection_name => ConnName},
496500
State = #ch{cfg = #conf{state = starting,
497501
protocol = Protocol,
498502
channel = Channel,
@@ -511,11 +515,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
511515
authz_context = OptionalVariables,
512516
max_consumers = MaxConsumers,
513517
writer_gc_threshold = GCThreshold,
514-
msg_interceptor_ctx = #{protocol => amqp091,
515-
username => User#user.username,
516-
vhost => VHost,
517-
conn_name => ConnName}
518-
},
518+
msg_interceptor_ctx = MsgInterceptorCtx},
519519
limiter = Limiter,
520520
tx = none,
521521
next_tag = 1,
@@ -1213,10 +1213,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12131213
rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]);
12141214
{ok, Message0} ->
12151215
check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext),
1216+
check_user_id_header(Message0, User),
12161217
Message = rabbit_message_interceptor:intercept(Message0,
12171218
MsgInterceptorCtx,
12181219
incoming_message_interceptors),
1219-
check_user_id_header(Message, User),
12201220
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
12211221
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],
12221222
Queues = rabbit_amqqueue:lookup_many(QNames),
+53-37
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,65 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
17
-module(rabbit_message_interceptor).
28

9+
%% client API
310
-export([intercept/3,
4-
set_msg_annotation/4]).
11+
add/2,
12+
remove/2]).
13+
%% helpers for behaviour implementations
14+
-export([set_annotation/4]).
515

16+
%% same protocol names as output by Prometheus endpoint
617
-type protocol() :: amqp091 | amqp10 | mqtt310 | mqtt311 | mqtt50.
18+
-type context() :: #{protocol := protocol(),
19+
vhost := rabbit_types:vhost(),
20+
username := rabbit_types:username(),
21+
connection_name := binary(),
22+
atom() => term()}.
23+
-type group() :: incoming_message_interceptors |
24+
outgoing_message_interceptors.
25+
-type config() :: #{atom() => term()}.
26+
-type interceptor() :: {module(), config()}.
27+
28+
29+
-export_type([context/0]).
30+
31+
-callback intercept(mc:state(), context(), group(), config()) ->
32+
mc:state().
733

8-
-type msg_interceptor_ctx() :: #{protocol := protocol(),
9-
vhost := binary(),
10-
username := binary(),
11-
conn_name => binary(),
12-
atom() => term()}.
13-
14-
-callback intercept(Msg, MsgInterceptorCtx, Group, Config) -> Msg when
15-
Msg :: mc:state(),
16-
MsgInterceptorCtx :: msg_interceptor_ctx(),
17-
Group :: incoming_message_interceptors | outgoing_message_interceptors,
18-
Config :: #{atom() := term()}.
19-
20-
-spec intercept(Msg, MsgInterceptorCtx, Group) -> Msg when
21-
Msg :: mc:state(),
22-
MsgInterceptorCtx :: map(),
23-
Group :: incoming_message_interceptors | outgoing_message_interceptors.
24-
intercept(Msg, MsgInterceptorCtx, Group) ->
25-
Interceptors = persistent_term:get(Group, []),
26-
lists:foldl(fun({Module, Config}, Msg0) ->
27-
try
28-
Module:intercept(Msg0,
29-
MsgInterceptorCtx,
30-
Group,
31-
Config)
32-
catch
33-
error:undef ->
34-
Msg0
35-
end
36-
end, Msg , Interceptors).
37-
38-
-spec set_msg_annotation(mc:state(),
39-
mc:ann_key(),
40-
mc:ann_value(),
41-
boolean()
42-
) -> mc:state().
43-
set_msg_annotation(Msg, Key, Value, Overwrite) ->
34+
-spec intercept(mc:state(), context(), group()) ->
35+
mc:state().
36+
intercept(Msg, Ctx, Group) ->
37+
Interceptors = persistent_term:get(Group),
38+
lists:foldl(fun({Mod, Config}, Msg0) ->
39+
Mod:intercept(Msg0, Ctx, Group, Config)
40+
end, Msg, Interceptors).
41+
42+
-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) ->
43+
mc:state().
44+
set_annotation(Msg, Key, Value, Overwrite) ->
4445
case {mc:x_header(Key, Msg), Overwrite} of
4546
{Val, false} when Val =/= undefined ->
4647
Msg;
4748
_ ->
4849
mc:set_annotation(Key, Value, Msg)
4950
end.
51+
52+
-spec add([interceptor()], group()) -> ok.
53+
add(Interceptors, Group) ->
54+
%% validation
55+
lists:foreach(fun({Mod, #{}}) ->
56+
case erlang:function_exported(Mod, intercept, 4) of
57+
true -> ok;
58+
false -> error(Mod)
59+
end
60+
end, Interceptors),
61+
persistent_term:put(Group, persistent_term:get(Group, []) ++ Interceptors).
62+
63+
-spec remove([interceptor()], group()) -> ok.
64+
remove(Interceptors, Group) ->
65+
persistent_term:put(Group, persistent_term:get(Group, []) -- Interceptors).
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
17
-module(rabbit_message_interceptor_routing_node).
28
-behaviour(rabbit_message_interceptor).
39

410
-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
511

612
-export([intercept/4]).
713

8-
intercept(Msg, _MsgInterceptorCtx, _Group, Config) ->
14+
intercept(Msg, _Ctx, incoming_message_interceptors, Config) ->
915
Node = atom_to_binary(node()),
10-
Overwrite = maps:get(overwrite, Config, false),
11-
rabbit_message_interceptor:set_msg_annotation(Msg,
12-
?HEADER_ROUTING_NODE,
13-
Node,
14-
Overwrite).
16+
Overwrite = maps:get(overwrite, Config),
17+
rabbit_message_interceptor:set_annotation(Msg,
18+
?HEADER_ROUTING_NODE,
19+
Node,
20+
Overwrite);
21+
intercept(Msg, _Ctx, _Group, _Config) ->
22+
Msg.
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
17
-module(rabbit_message_interceptor_timestamp).
28
-behaviour(rabbit_message_interceptor).
39

@@ -7,20 +13,21 @@
713

814
-export([intercept/4]).
915

10-
intercept(Msg0, _MsgInterceptorCtx, _Group, Config) ->
16+
intercept(Msg0, _Ctx, incoming_message_interceptors, Config) ->
1117
Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0),
12-
Overwrite = maps:get(overwrite, Config, false),
13-
Msg = rabbit_message_interceptor:set_msg_annotation(
14-
Msg0,
15-
?HEADER_TIMESTAMP,
16-
Ts,
17-
Overwrite),
18-
set_msg_timestamp(Msg, Ts, Overwrite).
18+
Overwrite = maps:get(overwrite, Config),
19+
Msg = rabbit_message_interceptor:set_annotation(Msg0,
20+
?HEADER_TIMESTAMP,
21+
Ts,
22+
Overwrite),
23+
set_timestamp(Msg, Ts, Overwrite);
24+
intercept(Msg, _MsgInterceptorCtx, _Group, _Config) ->
25+
Msg.
1926

20-
set_msg_timestamp(Msg, Timestamp, Overwrite) ->
27+
set_timestamp(Msg, Ts, Overwrite) ->
2128
case {mc:timestamp(Msg), Overwrite} of
22-
{Ts, false} when is_integer(Ts) ->
29+
{ExistingTs, false} when is_integer(ExistingTs) ->
2330
Msg;
2431
_ ->
25-
mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg)
32+
mc:set_annotation(?ANN_TIMESTAMP, Ts, Msg)
2633
end.

0 commit comments

Comments
 (0)