Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Database Broadcast enabled #1271

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions lib/realtime/api/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ defmodule Realtime.Api.Tenant do
field(:suspend, :boolean, default: false)
field(:events_per_second_rolling, :float, virtual: true)
field(:events_per_second_now, :integer, virtual: true)
field(:notify_private_alpha, :boolean, default: false)
field(:private_only, :boolean, default: false)

has_many(:extensions, Realtime.Api.Extensions,
Expand Down Expand Up @@ -74,7 +73,6 @@ defmodule Realtime.Api.Tenant do
:max_channels_per_client,
:max_joins_per_second,
:suspend,
:notify_private_alpha,
:private_only
])
|> validate_required([
Expand Down Expand Up @@ -113,7 +111,6 @@ defmodule Realtime.Api.Tenant do
:max_channels_per_client,
:max_joins_per_second,
:suspend,
:notify_private_alpha,
:private_only
])
end
Expand Down
1 change: 0 additions & 1 deletion lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ defmodule Realtime.Tenants do
:max_channels_per_client
:max_joins_per_second
:suspend
:notify_private_alpha
:private_only
"""
@spec update_management(String.t(), map()) :: Tenant.t() | nil
Expand Down
3 changes: 1 addition & 2 deletions lib/realtime/tenants/batch_broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
send_message_and_count(tenant, sub_topic, event, payload, true)
end)

tenant_db_conn =
Connect.lookup_or_start_connection(tenant.external_id)
tenant_db_conn = Connect.lookup_or_start_connection(tenant.external_id)

# Handle events for private channel
events
Expand Down
18 changes: 5 additions & 13 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ defmodule Realtime.Tenants.Connect do
call_external_node(tenant_id, opts)

{:error, :tenant_database_connection_initializing} ->
:timer.sleep(100)
Process.sleep(100)
call_external_node(tenant_id, opts)

{:error, :initializing} ->
Expand Down Expand Up @@ -132,8 +132,8 @@ defmodule Realtime.Tenants.Connect do
@spec shutdown(binary()) :: :ok | nil
def shutdown(tenant_id) do
case whereis(tenant_id) do
nil -> nil
pid -> GenServer.stop(pid)
pid when is_pid(pid) -> GenServer.stop(pid)
_ -> :ok
end
end

Expand Down Expand Up @@ -194,8 +194,8 @@ defmodule Realtime.Tenants.Connect do
def handle_continue(:start_listen_and_replication, state) do
%{tenant: tenant} = state

with {:ok, broadcast_changes_pid} <- start_replication(tenant, self()),
{:ok, listen_pid} <- start_listen(tenant, self()) do
with {:ok, broadcast_changes_pid} <- ReplicationConnection.start(tenant, self()),
{:ok, listen_pid} <- Listen.start(tenant, self()) do
{:noreply, %{state | broadcast_changes_pid: broadcast_changes_pid, listen_pid: listen_pid},
{:continue, :setup_connected_user_events}}
else
Expand Down Expand Up @@ -336,12 +336,4 @@ defmodule Realtime.Tenants.Connect do

defp tenant_suspended?(%Tenant{suspend: true}), do: {:error, :tenant_suspended}
defp tenant_suspended?(_), do: :ok

defp start_replication(%{notify_private_alpha: false}, _), do: {:ok, nil}

defp start_replication(tenant, connect_pid),
do: ReplicationConnection.start(tenant, connect_pid)

defp start_listen(%{notify_private_alpha: false}, _), do: {:ok, nil}
defp start_listen(tenant, connect_pid), do: Listen.start(tenant, connect_pid)
end
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.84",
version: "2.34.0",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down Expand Up @@ -62,7 +62,6 @@ defmodule Realtime.MixProject do
{:libcluster, "~> 3.3"},
{:uuid, "~> 1.1"},
{:prom_ex, "~> 1.8"},
{:mock, "~> 0.3.7", only: :test},
{:joken, "~> 2.5.0"},
{:ex_json_schema, "~> 0.7"},
{:recon, "~> 2.5"},
Expand All @@ -74,6 +73,7 @@ defmodule Realtime.MixProject do
{:open_api_spex, "~> 3.16"},
{:corsica, "~> 2.0"},
{:observer_cli, "~> 1.7"},
{:mock, "~> 0.3", only: :test},
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:mint_web_socket, "~> 1.0", only: :test},
{:dialyxir, "~> 1.4", only: :dev, runtime: false},
Expand Down
3 changes: 1 addition & 2 deletions priv/repo/seeds.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ Repo.transaction(fn ->
"ssl_enforced" => false
}
}
],
"notify_private_alpha" => true
]
})
|> Repo.insert!()
end)
Expand Down
3 changes: 1 addition & 2 deletions priv/repo/seeds_after_migration.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ end
}
],
"external_id" => tenant_name,
"jwt_secret" => "secure_jwt_secret",
"notify_private_alpha" => true
"jwt_secret" => "secure_jwt_secret"
}
|> Api.create_tenant()

Expand Down
40 changes: 17 additions & 23 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,7 @@
@tag policies: [
:authenticated_read_broadcast_and_presence,
:authenticated_write_broadcast_and_presence
],
notify_private_alpha: true
]
test "broadcast insert event changes on insert in table with trigger", %{
topic: topic,
db_conn: db_conn,
Expand All @@ -624,7 +623,7 @@

assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
assert_receive %Message{event: "presence_state"}, 500
:timer.sleep(500)
Process.sleep(500)
value = random_string()
Postgrex.query!(db_conn, "INSERT INTO #{table_name} (details) VALUES ($1)", [value])

Expand Down Expand Up @@ -652,7 +651,6 @@
:authenticated_read_broadcast_and_presence,
:authenticated_write_broadcast_and_presence
],
notify_private_alpha: true,
requires_data: true
test "broadcast update event changes on update in table with trigger", %{
topic: topic,
Expand All @@ -668,7 +666,7 @@

assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
assert_receive %Message{event: "presence_state"}, 500
:timer.sleep(500)
Process.sleep(500)
new_value = random_string()

Postgrex.query!(db_conn, "INSERT INTO #{table_name} (details) VALUES ($1)", [value])
Expand All @@ -678,7 +676,7 @@
value
])

:timer.sleep(500)
Process.sleep(500)
old_record = %{"details" => value, "id" => 1}
record = %{"details" => new_value, "id" => 1}

Expand All @@ -703,8 +701,7 @@
@tag policies: [
:authenticated_read_broadcast_and_presence,
:authenticated_write_broadcast_and_presence
],
notify_private_alpha: true
]
test "broadcast delete event changes on delete in table with trigger", %{
topic: topic,
db_conn: db_conn,
Expand All @@ -717,7 +714,7 @@

assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
assert_receive %Message{event: "presence_state"}, 500
:timer.sleep(500)
Process.sleep(500)
value = random_string()

Postgrex.query!(db_conn, "INSERT INTO #{table_name} (details) VALUES ($1)", [value])
Expand Down Expand Up @@ -746,8 +743,7 @@
@tag policies: [
:authenticated_read_broadcast_and_presence,
:authenticated_write_broadcast_and_presence
],
notify_private_alpha: true
]
test "broadcast event when function 'send' is called with private topic", %{
topic: topic,
db_conn: db_conn
Expand All @@ -760,7 +756,7 @@

assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
assert_receive %Message{event: "presence_state"}, 500
:timer.sleep(500)
Process.sleep(500)
value = random_string()
event = random_string()

Expand All @@ -784,7 +780,6 @@
500
end

@tag notify_private_alpha: true
test "broadcast event when function 'send' is called with public topic", %{
topic: topic,
db_conn: db_conn
Expand All @@ -797,7 +792,7 @@

assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
assert_receive %Message{event: "presence_state"}, 500
:timer.sleep(500)
Process.sleep(500)
value = random_string()
event = random_string()

Expand Down Expand Up @@ -831,7 +826,7 @@
topic: topic
} do
Realtime.Tenants.update_management(@external_id, %{private_only: true})
:timer.sleep(100)
Process.sleep(100)
{socket, _} = get_connection("authenticated")
config = %{broadcast: %{self: true}, private: false}
topic = "realtime:#{topic}"
Expand All @@ -847,7 +842,7 @@
500

Realtime.Tenants.update_management(@external_id, %{private_only: false})
:timer.sleep(100)
Process.sleep(100)
end

@tag policies: [
Expand All @@ -858,15 +853,15 @@
topic: topic
} do
Realtime.Tenants.update_management(@external_id, %{private_only: true})
:timer.sleep(100)
Process.sleep(100)
{socket, _} = get_connection("authenticated")
config = %{broadcast: %{self: true}, private: true}
topic = "realtime:#{topic}"
WebsocketClient.join(socket, topic, %{config: config})

assert_receive %Phoenix.Socket.Message{event: "phx_reply"}, 500
Realtime.Tenants.update_management(@external_id, %{private_only: false})
:timer.sleep(100)
Process.sleep(100)
end
end

Expand Down Expand Up @@ -989,8 +984,7 @@
end

def rls_context(%{tenant: tenant} = context) do
{:ok, db_conn} =
Database.connect(tenant, "realtime_test")
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)

clean_table(db_conn, "realtime", "messages")
topic = Map.get(context, :topic, random_string())
Expand All @@ -1004,10 +998,10 @@
end

def setup_trigger(%{tenant: tenant, topic: topic} = context) do
Realtime.Tenants.Connect.shutdown(@external_id)

Check warning on line 1001 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.
:timer.sleep(500)
Process.sleep(500)

{:ok, db_conn} = Realtime.Tenants.Connect.connect(@external_id)

Check warning on line 1004 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.

random_name = String.downcase("test_#{random_string()}")
query = "CREATE TABLE #{random_name} (id serial primary key, details text)"
Expand Down Expand Up @@ -1041,12 +1035,12 @@
Postgrex.query!(db_conn, query, [])

on_exit(fn ->
{:ok, db_conn} = Database.connect(tenant, "realtime_test")
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
query = "DROP TABLE #{random_name} CASCADE"
Postgrex.query!(db_conn, query, [])
Realtime.Tenants.Connect.shutdown(db_conn)

Check warning on line 1041 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.

:timer.sleep(500)
Process.sleep(500)
end)

context
Expand Down
1 change: 1 addition & 0 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ defmodule Realtime.DatabaseTest do
)

Database.replication_slot_teardown(conn, name)
Process.sleep(1000)
assert %{rows: []} = Postgrex.query!(conn, "SELECT slot_name FROM pg_replication_slots", [])
end

Expand Down
2 changes: 1 addition & 1 deletion test/realtime/messages_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Realtime.MessagesTest do
tenant = tenant_fixture()
Migrations.run_migrations(tenant)

{:ok, conn} = Database.connect(tenant, "realtime_test")
{:ok, conn} = Database.connect(tenant, "realtime_test", :stop)
clean_table(conn, "realtime", "messages")
date_start = Date.utc_today() |> Date.add(-10)
date_end = Date.utc_today()
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/rate_counter/rate_counter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ defmodule Realtime.RateCounterTest do
assert {:ok, %RateCounter{}} = RateCounter.get(term)
{_, _, counter_pid} = GenCounter.find_counter(term)
Process.exit(counter_pid, :shutdown)
:timer.sleep(10)
Process.sleep(10)
refute Process.alive?(pid)
end
end
10 changes: 5 additions & 5 deletions test/realtime/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Realtime.RepoTest do

setup do
tenant = tenant_fixture()
{:ok, db_conn} = Database.connect(tenant, "realtime_test")
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)

Migrations.run_migrations(tenant)

Expand Down Expand Up @@ -57,7 +57,7 @@ defmodule Realtime.RepoTest do
end

true = Process.exit(parent_pid, :kill)
:timer.sleep(1500)
Process.sleep(1500)
assert Process.alive?(repo_pid) == false
end

Expand Down Expand Up @@ -98,7 +98,7 @@ defmodule Realtime.RepoTest do
assert_receive :query_success, 2000
assert_receive :query_success, 2000

:timer.sleep(100)
Process.sleep(100)
assert Process.alive?(repo_pid_1) == false
assert Process.alive?(repo_pid_2) == false
assert Process.alive?(pid_1) == false
Expand All @@ -112,7 +112,7 @@ defmodule Realtime.RepoTest do
spawn(fn ->
Repo.with_dynamic_repo(db_config(), fn repo ->
send(test_pid, repo)
:timer.sleep(100)
Process.sleep(100)
raise "💣"
end)
end)
Expand All @@ -126,7 +126,7 @@ defmodule Realtime.RepoTest do
end

assert Process.alive?(repo_pid) == true
:timer.sleep(300)
Process.sleep(300)
assert Process.alive?(repo_pid) == false
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/rpc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Realtime.RpcTest do

defmodule TestRpc do
def test_raise, do: raise("test")
def test_timeout, do: :timer.sleep(1000)
def test_timeout, do: Process.sleep(1000)
def test_success, do: {:ok, "success"}
end

Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/authorization_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
tenant = tenant_fixture()
Migrations.run_migrations(tenant)

{:ok, db_conn} = Database.connect(tenant, "realtime_test")
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)

clean_table(db_conn, "realtime", "messages")
topic = random_string()
Expand All @@ -181,7 +181,7 @@
role: claims.role
})

Realtime.Tenants.Migrations.create_partitions(db_conn)

Check warning on line 184 in test/realtime/tenants/authorization_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.

on_exit(fn -> Process.exit(db_conn, :normal) end)

Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/cache_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Realtime.Tenants.CacheSupervisorTest do
{:suspend_tenant, external_id}
)

:timer.sleep(500)
Process.sleep(500)
assert %Tenant{suspend: true} = Cache.get_tenant_by_external_id(external_id)
end
end
Loading
Loading