From 7d1cab2e442441d969c56a09d4a3707380ff49c2 Mon Sep 17 00:00:00 2001 From: Josh Curtis Date: Thu, 19 Dec 2024 11:01:47 -0800 Subject: [PATCH] Add support for auth_hash authentication to mirrored servers The change itself is fairly simple, just piping the auth_hash object to the mirror server pool. The test was created by putting aping existing tests in mirror_spec.rb and auth_query_spec.rb. I also updated the auth of the mirror instance to md5, since the connection failed silently when scram-sha-256 was used. --- src/mirrors.rs | 5 +- src/server.rs | 1 + tests/docker/docker-compose.yml | 2 +- tests/ruby/helpers/auth_query_helper.rb | 3 ++ tests/ruby/mirrors_spec.rb | 71 +++++++++++++++++++++++++ 5 files changed, 80 insertions(+), 2 deletions(-) diff --git a/src/mirrors.rs b/src/mirrors.rs index 73ab7320..a9371de8 100644 --- a/src/mirrors.rs +++ b/src/mirrors.rs @@ -17,6 +17,7 @@ pub struct MirroredClient { database: String, bytes_rx: Receiver, disconnect_rx: Receiver<()>, + auth_hash: Arc>>, } impl MirroredClient { @@ -39,7 +40,7 @@ impl MirroredClient { self.user.clone(), self.database.as_str(), ClientServerMap::default(), - Arc::new(RwLock::new(None)), + self.auth_hash.clone(), None, true, false, @@ -125,6 +126,7 @@ impl MirroringManager { user: User, database: String, addresses: Vec
, + auth_hash: Arc>>, ) -> MirroringManager { let mut byte_senders: Vec> = vec![]; let mut exit_senders: Vec> = vec![]; @@ -140,6 +142,7 @@ impl MirroringManager { address: addr, bytes_rx, disconnect_rx: exit_rx, + auth_hash: auth_hash.clone(), }; exit_senders.push(exit_tx); byte_senders.push(bytes_tx); diff --git a/src/server.rs b/src/server.rs index 882450ea..7a7add6a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -820,6 +820,7 @@ impl Server { user.clone(), database.to_owned(), address.mirrors.clone(), + auth_hash, )), }, cleanup_connections, diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml index 0e174d83..ea3d05d4 100644 --- a/tests/docker/docker-compose.yml +++ b/tests/docker/docker-compose.yml @@ -24,7 +24,7 @@ services: POSTGRES_USER: postgres POSTGRES_DB: postgres POSTGRES_PASSWORD: postgres - POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256 + POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5 command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"] pg4: image: postgres:14 diff --git a/tests/ruby/helpers/auth_query_helper.rb b/tests/ruby/helpers/auth_query_helper.rb index 43d7c785..f1b30070 100644 --- a/tests/ruby/helpers/auth_query_helper.rb +++ b/tests/ruby/helpers/auth_query_helper.rb @@ -147,8 +147,10 @@ def self.set_up_auth_query_for_user(user:, password:, instance_ports: [ 5432, 10 instance_ports.each do |port| connection = PG.connect("postgres://postgres:postgres@localhost:#{port}/#{database}") connection.exec(self.drop_query_auth_function(user)) rescue PG::UndefinedFunction + connection.exec("REVOKE EXECUTE ON FUNCTION pg_stat_statements_reset FROM #{user}") rescue PG::UndefinedObject connection.exec("DROP ROLE #{user}") rescue PG::UndefinedObject connection.exec("CREATE ROLE #{user} ENCRYPTED PASSWORD '#{password}' LOGIN;") + connection.exec("GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO #{user}") connection.exec(self.create_query_auth_function(user)) connection.close end @@ -158,6 +160,7 @@ def self.tear_down_auth_query_for_user(user:, password:, instance_ports: [ 5432, instance_ports.each do |port| connection = PG.connect("postgres://postgres:postgres@localhost:#{port}/#{database}") connection.exec(self.drop_query_auth_function(user)) rescue PG::UndefinedFunction + connection.exec("REVOKE EXECUTE ON FUNCTION pg_stat_statements_reset FROM #{user}") connection.exec("DROP ROLE #{user}") connection.close end diff --git a/tests/ruby/mirrors_spec.rb b/tests/ruby/mirrors_spec.rb index b6a4514c..480905fb 100644 --- a/tests/ruby/mirrors_spec.rb +++ b/tests/ruby/mirrors_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'uri' require_relative 'spec_helper' +require_relative 'helpers/auth_query_helper' describe "Query Mirroing" do let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } @@ -89,3 +90,73 @@ end end end + +describe "Query Mirroring with Auth Query" do + let(:pg_user) { { 'username' => 'sharding_user', 'password' => 'sharding_user' } } + let(:config_user) { {'username' => 'md5_auth_user'} } + let(:auth_query_user) { { 'username' => 'md5_auth_user', 'password' => 'hash' } } + let(:mirror_pg) { PgInstance.new(8432, "md5_auth_user", "hash", "shard0") } + let(:mirror_host) { "localhost" } + let(:config) { + { + 'general' => { + 'auth_query' => "SELECT * FROM public.user_lookup('$1');", + 'auth_query_user' => auth_query_user['username'], + 'auth_query_password' => auth_query_user['password'] + }, + } + } + let(:processes) { Helpers::AuthQuery.single_shard_auth_query( + pool_name: "sharded_db", + pg_user: pg_user, + config_user: config_user, + extra_conf: config, + wait_until_ready: false, + )} + + before do + pgcat = processes.pgcat + + new_configs = processes.pgcat.current_config + new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [ + [mirror_host, mirror_pg.port.to_i, 0], + [mirror_host, mirror_pg.port.to_i, 1], + ] + pgcat.update_config(new_configs) + pgcat.reload_config + + Helpers::AuthQuery.set_up_auth_query_for_user( + user: auth_query_user['username'], + password: auth_query_user['password'], + instance_ports: [processes.primary.port, processes.replicas[0].port, mirror_pg.port], + ) + + pgcat.wait_until_ready( + pgcat.connection_string("sharded_db", auth_query_user['username'], auth_query_user['password']) + ) + + mirror_pg.reset + end + + after do + Helpers::AuthQuery.tear_down_auth_query_for_user( + user: auth_query_user['username'], + password: auth_query_user['password'], + instance_ports: [processes.primary.port, processes.replicas[0].port, mirror_pg.port], + ) + end + + context "when auth_query is configured" do + it "can mirror a query" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", auth_query_user['username'], auth_query_user['password'])) + runs=5 + runs.times { conn.sync_exec("SELECT 1 + 2") } + conn.close + + # I'd like to check verify the primary and replica received the queries too, but getting the permissions correct is annoying + # expect((processes.all_databases + processes.replicas).map(&:count_select_1_plus_2).sum).to eq(0) + expect(mirror_pg.count_select_1_plus_2).to eq(runs) + end + end +end +