Skip to content

Commit 69c2855

Browse files
committed
Intercept outgoing just before conversion
Intercept outgoing message just before conversion to target protocol as this will give most flexibility to 3rd party plugins.
1 parent d302b85 commit 69c2855

File tree

3 files changed

+8
-8
lines changed

3 files changed

+8
-8
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -2180,9 +2180,9 @@ handle_deliver(ConsumerTag, AckRequired,
21802180
delivery_tag = {binary, Dtag},
21812181
message_format = ?UINT(?MESSAGE_FORMAT),
21822182
settled = SendSettled},
2183-
Mc1 = mc:convert(mc_amqp, Mc0),
2184-
Mc2 = mc:set_annotation(redelivered, Redelivered, Mc1),
2185-
Mc = rabbit_msg_interceptor:intercept_outgoing(Mc2, MsgIcptCtx),
2183+
Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx),
2184+
Mc2 = mc:convert(mc_amqp, Mc1),
2185+
Mc = mc:set_annotation(redelivered, Redelivered, Mc2),
21862186
Sections = mc:protocol_state(Mc),
21872187
validate_message_size(Sections, MaxMessageSize),
21882188
Frames = transfer_frames(Transfer, Sections, MaxFrameSize),

deps/rabbit/src/rabbit_channel.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -2645,8 +2645,8 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
26452645
{noreply, record_sent(get, QueueType, DeliveryTag, not(NoAck), Msg0, State)}.
26462646

26472647
outgoing_content(Mc, MsgIcptCtx) ->
2648-
Mc1 = mc:convert(mc_amqpl, Mc),
2649-
Mc2 = rabbit_msg_interceptor:intercept_outgoing(Mc1, MsgIcptCtx),
2648+
Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc, MsgIcptCtx),
2649+
Mc2 = mc:convert(mc_amqpl, Mc1),
26502650
mc:protocol_state(Mc2).
26512651

26522652
init_tick_timer(State = #ch{tick_timer = undefined}) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -2073,15 +2073,15 @@ deliver_to_client(Msgs, Ack, State) ->
20732073
deliver_one_to_client(Msg, Ack, S)
20742074
end, State, Msgs).
20752075

2076-
deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery,
2076+
deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc0} = Delivery,
20772077
AckRequired,
20782078
#state{cfg = #cfg{msg_interceptor_ctx = MsgIcptCtx}} = State0) ->
20792079
SubscriberQoS = case AckRequired of
20802080
true -> ?QOS_1;
20812081
false -> ?QOS_0
20822082
end,
2083-
McMqtt0 = mc:convert(mc_mqtt, Mc, mc_env()),
2084-
McMqtt = rabbit_msg_interceptor:intercept_outgoing(McMqtt0, MsgIcptCtx),
2083+
Mc = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx),
2084+
McMqtt = mc:convert(mc_mqtt, Mc, mc_env()),
20852085
MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt),
20862086
QoS = effective_qos(PublisherQos, SubscriberQoS),
20872087
{SettleOp, State1} = maybe_publish_to_client(MqttMsg, Delivery, QoS, State0),

0 commit comments

Comments
 (0)