Skip to content

Commit

Permalink
feat: Database Broadcast enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Jan 20, 2025
1 parent d6a988b commit e01dd4c
Show file tree
Hide file tree
Showing 11 changed files with 22 additions and 46 deletions.
3 changes: 0 additions & 3 deletions lib/realtime/api/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ defmodule Realtime.Api.Tenant do
field(:suspend, :boolean, default: false)
field(:events_per_second_rolling, :float, virtual: true)
field(:events_per_second_now, :integer, virtual: true)
field(:notify_private_alpha, :boolean, default: false)
field(:private_only, :boolean, default: false)

has_many(:extensions, Realtime.Api.Extensions,
Expand Down Expand Up @@ -74,7 +73,6 @@ defmodule Realtime.Api.Tenant do
:max_channels_per_client,
:max_joins_per_second,
:suspend,
:notify_private_alpha,
:private_only
])
|> validate_required([
Expand Down Expand Up @@ -113,7 +111,6 @@ defmodule Realtime.Api.Tenant do
:max_channels_per_client,
:max_joins_per_second,
:suspend,
:notify_private_alpha,
:private_only
])
end
Expand Down
1 change: 0 additions & 1 deletion lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ defmodule Realtime.Tenants do
:max_channels_per_client
:max_joins_per_second
:suspend
:notify_private_alpha
:private_only
"""
@spec update_management(String.t(), map()) :: Tenant.t() | nil
Expand Down
12 changes: 2 additions & 10 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ defmodule Realtime.Tenants.Connect do
def handle_continue(:start_listen_and_replication, state) do
%{tenant: tenant} = state

with {:ok, broadcast_changes_pid} <- start_replication(tenant, self()),
{:ok, listen_pid} <- start_listen(tenant, self()) do
with {:ok, broadcast_changes_pid} <- ReplicationConnection.start(tenant, self()),
{:ok, listen_pid} <- Listen.start(tenant, self()) do
{:noreply, %{state | broadcast_changes_pid: broadcast_changes_pid, listen_pid: listen_pid},
{:continue, :setup_connected_user_events}}
else
Expand Down Expand Up @@ -336,12 +336,4 @@ defmodule Realtime.Tenants.Connect do

defp tenant_suspended?(%Tenant{suspend: true}), do: {:error, :tenant_suspended}
defp tenant_suspended?(_), do: :ok

defp start_replication(%{notify_private_alpha: false}, _), do: {:ok, nil}

defp start_replication(tenant, connect_pid),
do: ReplicationConnection.start(tenant, connect_pid)

defp start_listen(%{notify_private_alpha: false}, _), do: {:ok, nil}
defp start_listen(tenant, connect_pid), do: Listen.start(tenant, connect_pid)
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.84",
version: "2.34.0",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
3 changes: 1 addition & 2 deletions priv/repo/seeds.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ Repo.transaction(fn ->
"ssl_enforced" => false
}
}
],
"notify_private_alpha" => true
]
})
|> Repo.insert!()
end)
Expand Down
3 changes: 1 addition & 2 deletions priv/repo/seeds_after_migration.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ end
}
],
"external_id" => tenant_name,
"jwt_secret" => "secure_jwt_secret",
"notify_private_alpha" => true
"jwt_secret" => "secure_jwt_secret"
}
|> Api.create_tenant()

Expand Down
11 changes: 3 additions & 8 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,7 @@ defmodule Realtime.Integration.RtChannelTest do
@tag policies: [
:authenticated_read_broadcast_and_presence,
:authenticated_write_broadcast_and_presence
],
notify_private_alpha: true
]
test "broadcast insert event changes on insert in table with trigger", %{
topic: topic,
db_conn: db_conn,
Expand Down Expand Up @@ -652,7 +651,6 @@ defmodule Realtime.Integration.RtChannelTest do
:authenticated_read_broadcast_and_presence,
:authenticated_write_broadcast_and_presence
],
notify_private_alpha: true,
requires_data: true
test "broadcast update event changes on update in table with trigger", %{
topic: topic,
Expand Down Expand Up @@ -703,8 +701,7 @@ defmodule Realtime.Integration.RtChannelTest do
@tag policies: [
:authenticated_read_broadcast_and_presence,
:authenticated_write_broadcast_and_presence
],
notify_private_alpha: true
]
test "broadcast delete event changes on delete in table with trigger", %{
topic: topic,
db_conn: db_conn,
Expand Down Expand Up @@ -746,8 +743,7 @@ defmodule Realtime.Integration.RtChannelTest do
@tag policies: [
:authenticated_read_broadcast_and_presence,
:authenticated_write_broadcast_and_presence
],
notify_private_alpha: true
]
test "broadcast event when function 'send' is called with private topic", %{
topic: topic,
db_conn: db_conn
Expand Down Expand Up @@ -784,7 +780,6 @@ defmodule Realtime.Integration.RtChannelTest do
500
end

@tag notify_private_alpha: true
test "broadcast event when function 'send' is called with public topic", %{
topic: topic,
db_conn: db_conn
Expand Down
1 change: 1 addition & 0 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ defmodule Realtime.DatabaseTest do
)

Database.replication_slot_teardown(conn, name)
Process.sleep(1000)
assert %{rows: []} = Postgrex.query!(conn, "SELECT slot_name FROM pg_replication_slots", [])
end

Expand Down
27 changes: 11 additions & 16 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ defmodule Realtime.Tenants.ConnectTest do
alias Realtime.Tenants.ReplicationConnection
alias Realtime.UsersCounter

describe "lookup_or_start_connection/1" do
setup do
Cleanup.ensure_no_replication_slot()
tenant = tenant_fixture()
%{tenant: tenant}
end
setup do
Cleanup.ensure_no_replication_slot()
tenant = tenant_fixture()
%{tenant: tenant}
end

describe "lookup_or_start_connection/1" do
test "if tenant exists and connected, returns the db connection", %{tenant: tenant} do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Sandbox.allow(Repo, self(), db_conn)
Expand Down Expand Up @@ -76,7 +76,7 @@ defmodule Realtime.Tenants.ConnectTest do

Sandbox.allow(Repo, self(), db_conn)
# Not enough time has passed, connection still alive
:timer.sleep(500)
:timer.sleep(400)
assert {_, %{conn: _}} = :syn.lookup(Connect, tenant_id)

# Enough time has passed, connection stopped
Expand Down Expand Up @@ -189,8 +189,7 @@ defmodule Realtime.Tenants.ConnectTest do
end
end

test "starts broadcast handler and does not fail on existing connection" do
tenant = tenant_fixture(%{notify_private_alpha: true})
test "starts broadcast handler and does not fail on existing connection", %{tenant: tenant} do
on_exit(fn -> Connect.shutdown(tenant.external_id) end)

assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Expand All @@ -205,8 +204,7 @@ defmodule Realtime.Tenants.ConnectTest do
assert replication_connection_before == replication_connection_after
end

test "failed broadcast handler and listen recover from failure" do
tenant = tenant_fixture(%{notify_private_alpha: true})
test "failed broadcast handler and listen recover from failure", %{tenant: tenant} do
on_exit(fn -> Connect.shutdown(tenant.external_id) end)
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(3000)
Expand All @@ -229,8 +227,7 @@ defmodule Realtime.Tenants.ConnectTest do
assert Process.alive?(listen_pid)
end

test "on database disconnect, connection is killed to all components" do
tenant = tenant_fixture(%{notify_private_alpha: true})
test "on database disconnect, connection is killed to all components", %{tenant: tenant} do
on_exit(fn -> Connect.shutdown(tenant.external_id) end)
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
old_pid = Connect.whereis(tenant.external_id)
Expand All @@ -257,9 +254,7 @@ defmodule Realtime.Tenants.ConnectTest do
end

describe "shutdown/1" do
test "shutdowns all associated connections" do
tenant = tenant_fixture(%{notify_private_alpha: true})

test "shutdowns all associated connections", %{tenant: tenant} do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(1000)

Expand Down
2 changes: 1 addition & 1 deletion test/realtime_web/controllers/tenant_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,6 @@ defmodule RealtimeWeb.TenantControllerTest do
end

defp create_tenant(_) do
%{tenant: tenant_fixture(%{notify_private_alpha: true})}
%{tenant: tenant_fixture()}
end
end
3 changes: 1 addition & 2 deletions test/support/generators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ defmodule Generators do
],
"postgres_cdc_default" => "postgres_cdc_rls",
"jwt_secret" => "new secret",
"jwt_jwks" => nil,
"notify_private_alpha" => false
"jwt_jwks" => nil
}

override = override |> Enum.map(fn {k, v} -> {"#{k}", v} end) |> Map.new()
Expand Down

0 comments on commit e01dd4c

Please sign in to comment.