Skip to content

Commit 5463a7e

Browse files
committed
fix: on new keys, disconnect user socket
1 parent 37995f6 commit 5463a7e

File tree

8 files changed

+316
-236
lines changed

8 files changed

+316
-236
lines changed

Diff for: lib/realtime/api.ex

+3-2
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ defmodule Realtime.Api do
124124
data: %{external_id: external_id}
125125
})
126126
when is_map_key(changes, :jwt_jwks) or is_map_key(changes, :jwt_secret) do
127-
Phoenix.PubSub.broadcast!(Realtime.PubSub, "realtime:operations:" <> external_id, :disconnect)
127+
RealtimeWeb.Endpoint.broadcast("user_socket:" <> external_id, "disconnect", %{})
128128
end
129129

130130
defp maybe_trigger_disconnect(_), do: nil
@@ -198,7 +198,8 @@ defmodule Realtime.Api do
198198
{value, settings} = Map.pop(extension.settings, from)
199199
new_settings = Map.put(settings, to, value)
200200

201-
Ecto.Changeset.cast(extension, %{settings: new_settings}, [:settings])
201+
extension
202+
|> Ecto.Changeset.cast(%{settings: new_settings}, [:settings])
202203
|> Repo.update!()
203204
end
204205
end

Diff for: lib/realtime_web/channels/realtime_channel.ex

+4-9
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ defmodule RealtimeWeb.RealtimeChannel do
6969
Realtime.UsersCounter.add(transport_pid, tenant_id)
7070
RealtimeWeb.Endpoint.subscribe(tenant_topic)
7171
Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id)
72+
Process.monitor(transport_pid)
7273

7374
pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic)
7475

@@ -196,6 +197,7 @@ defmodule RealtimeWeb.RealtimeChannel do
196197
end
197198
end
198199

200+
@impl true
199201
def handle_info(
200202
_any,
201203
%{
@@ -211,27 +213,22 @@ defmodule RealtimeWeb.RealtimeChannel do
211213
shutdown_response(socket, message)
212214
end
213215

214-
@impl true
215-
216216
def handle_info(:sync_presence = msg, socket) do
217217
PresenceHandler.track(msg, socket)
218218
end
219219

220-
@impl true
221220
def handle_info(%{event: "postgres_cdc_rls_down"}, socket) do
222221
pg_sub_ref = postgres_subscribe()
223222

224223
{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
225224
end
226225

227-
@impl true
228226
def handle_info(%{event: "postgres_cdc_down"}, socket) do
229227
pg_sub_ref = postgres_subscribe()
230228

231229
{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
232230
end
233231

234-
@impl true
235232
def handle_info(
236233
%{event: type, payload: payload} = msg,
237234
%{assigns: %{policies: policies}} = socket
@@ -262,7 +259,6 @@ defmodule RealtimeWeb.RealtimeChannel do
262259
{:noreply, socket}
263260
end
264261

265-
@impl true
266262
def handle_info(:postgres_subscribe, %{assigns: %{channel_name: channel_name}} = socket) do
267263
%{
268264
assigns: %{
@@ -309,7 +305,6 @@ defmodule RealtimeWeb.RealtimeChannel do
309305
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
310306
end
311307

312-
@impl true
313308
def handle_info(:confirm_token, %{assigns: %{pg_change_params: pg_change_params}} = socket) do
314309
case confirm_token(socket) do
315310
{:ok, claims, confirm_token_ref, _, _} ->
@@ -332,10 +327,10 @@ defmodule RealtimeWeb.RealtimeChannel do
332327
end
333328
end
334329

335-
def handle_info(:disconnect, %{assigns: %{channel_name: channel_name}} = socket) do
330+
def handle_info(%{event: "phx_leave"}, %{assigns: %{channel_name: channel_name}} = socket) do
336331
Logger.info("Received operational call to disconnect channel")
337332
push_system_message("system", socket, "ok", "Server requested disconnect", channel_name)
338-
{:stop, :shutdown, socket}
333+
{:stop, {:shutdown, :left}, socket}
339334
end
340335

341336
def handle_info(msg, socket) do

Diff for: lib/realtime_web/channels/user_socket.ex

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ defmodule RealtimeWeb.UserSocket do
5959
jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
6060
{:ok, claims} <- ChannelsAuthorization.authorize_conn(token, jwt_secret_dec, jwt_jwks),
6161
{:ok, postgres_cdc_module} <- PostgresCdc.driver(postgres_cdc_default) do
62+
RealtimeWeb.Endpoint.subscribe(subscribers_id(external_id))
63+
6264
assigns = %RealtimeChannel.Assigns{
6365
claims: claims,
6466
jwt_secret: jwt_secret,

Diff for: mix.lock

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},
5454
"open_api_spex": {:hex, :open_api_spex, "3.21.2", "6a704f3777761feeb5657340250d6d7332c545755116ca98f33d4b875777e1e5", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0 or ~> 6.0", [hex: :poison, repo: "hexpm", optional: true]}, {:ymlr, "~> 2.0 or ~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :ymlr, repo: "hexpm", optional: true]}], "hexpm", "f42ae6ed668b895ebba3e02773cfb4b41050df26f803f2ef634c72a7687dc387"},
5555
"parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"},
56-
"phoenix": {:hex, :phoenix, "1.7.18", "5310c21443514be44ed93c422e15870aef254cf1b3619e4f91538e7529d2b2e4", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "1797fcc82108442a66f2c77a643a62980f342bfeb63d6c9a515ab8294870004e"},
56+
"phoenix": {:hex, :phoenix, "1.7.19", "36617efe5afbd821099a8b994ff4618a340a5bfb25531a1802c4d4c634017a57", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "ba4dc14458278773f905f8ae6c2ec743d52c3a35b6b353733f64f02dfe096cd6"},
5757
"phoenix_ecto": {:hex, :phoenix_ecto, "4.4.3", "86e9878f833829c3f66da03d75254c155d91d72a201eb56ae83482328dc7ca93", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d36c401206f3011fefd63d04e8ef626ec8791975d9d107f9a0817d426f61ac07"},
5858
"phoenix_html": {:hex, :phoenix_html, "3.3.4", "42a09fc443bbc1da37e372a5c8e6755d046f22b9b11343bf885067357da21cb3", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0249d3abec3714aff3415e7ee3d9786cb325be3151e6c4b3021502c585bf53fb"},
5959
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.6", "7b1f0327f54c9eb69845fd09a77accf922f488c549a7e7b8618775eb603a62c7", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "1681ab813ec26ca6915beb3414aa138f298e17721dc6a2bde9e6eb8a62360ff6"},

Diff for: test/integration/integration.ex

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
defmodule Integration do
2+
import Generators
3+
4+
alias Realtime.Database
5+
alias Realtime.Integration.WebsocketClient
6+
alias Phoenix.Socket.V1
7+
alias Realtime.Database
8+
alias Realtime.Integration.WebsocketClient
9+
10+
@serializer V1.JSONSerializer
11+
@secret "secure_jwt_secret"
12+
@external_id "dev_tenant"
13+
defp uri(port), do: "ws://#{@external_id}.localhost:#{port}/socket/websocket"
14+
def token_valid(role, claims \\ %{}), do: generate_token(Map.put(claims, :role, role))
15+
def token_no_role, do: generate_token()
16+
17+
def generate_token(claims \\ %{}) do
18+
claims =
19+
Map.merge(
20+
%{
21+
ref: "localhost",
22+
iat: System.system_time(:second),
23+
exp: System.system_time(:second) + 604_800
24+
},
25+
claims
26+
)
27+
28+
{:ok, generate_jwt_token(@secret, claims)}
29+
end
30+
31+
def get_connection(port, role \\ "anon", claims \\ %{}, params \\ %{vsn: "1.0.0", log_level: :warning}) do
32+
params = Enum.reduce(params, "", fn {k, v}, acc -> "#{acc}&#{k}=#{v}" end)
33+
uri = "#{uri(port)}?#{params}"
34+
35+
with {:ok, token} <- token_valid(role, claims),
36+
{:ok, socket} <-
37+
WebsocketClient.connect(self(), uri, @serializer, [{"x-api-key", token}]) do
38+
{socket, token}
39+
end
40+
end
41+
42+
def rls_context(%{tenant: tenant} = context) do
43+
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
44+
45+
clean_table(db_conn, "realtime", "messages")
46+
topic = Map.get(context, :topic, random_string())
47+
message = message_fixture(tenant, %{topic: topic})
48+
49+
if policies = context[:policies] do
50+
create_rls_policies(db_conn, policies, message)
51+
end
52+
53+
Map.put(context, :topic, message.topic)
54+
end
55+
56+
def change_tenant_configuration(limit, value) do
57+
@external_id
58+
|> Realtime.Tenants.get_tenant_by_external_id()
59+
|> Realtime.Api.Tenant.changeset(%{limit => value})
60+
|> Realtime.Repo.update!()
61+
62+
Realtime.Tenants.Cache.invalidate_tenant_cache(@external_id)
63+
end
64+
end

0 commit comments

Comments
 (0)