Skip to content

Commit

Permalink
Add support for auth_hash authentication to mirrored servers
Browse files Browse the repository at this point in the history
The change itself is fairly simple, just piping the auth_hash
object  to the mirror server pool.

The test was created by putting aping existing tests in mirror_spec.rb
and auth_query_spec.rb.

I also updated the auth of the mirror instance to md5, since the
connection failed silently when scram-sha-256 was used.
  • Loading branch information
joshcurtis committed Dec 31, 2024
1 parent b37d105 commit 7d1cab2
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 2 deletions.
5 changes: 4 additions & 1 deletion src/mirrors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct MirroredClient {
database: String,
bytes_rx: Receiver<Bytes>,
disconnect_rx: Receiver<()>,
auth_hash: Arc<RwLock<Option<String>>>,
}

impl MirroredClient {
Expand All @@ -39,7 +40,7 @@ impl MirroredClient {
self.user.clone(),
self.database.as_str(),
ClientServerMap::default(),
Arc::new(RwLock::new(None)),
self.auth_hash.clone(),
None,
true,
false,
Expand Down Expand Up @@ -125,6 +126,7 @@ impl MirroringManager {
user: User,
database: String,
addresses: Vec<Address>,
auth_hash: Arc<RwLock<Option<String>>>,
) -> MirroringManager {
let mut byte_senders: Vec<Sender<Bytes>> = vec![];
let mut exit_senders: Vec<Sender<()>> = vec![];
Expand All @@ -140,6 +142,7 @@ impl MirroringManager {
address: addr,
bytes_rx,
disconnect_rx: exit_rx,
auth_hash: auth_hash.clone(),
};
exit_senders.push(exit_tx);
byte_senders.push(bytes_tx);
Expand Down
1 change: 1 addition & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ impl Server {
user.clone(),
database.to_owned(),
address.mirrors.clone(),
auth_hash,
)),
},
cleanup_connections,
Expand Down
2 changes: 1 addition & 1 deletion tests/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
pg4:
image: postgres:14
Expand Down
3 changes: 3 additions & 0 deletions tests/ruby/helpers/auth_query_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ def self.set_up_auth_query_for_user(user:, password:, instance_ports: [ 5432, 10
instance_ports.each do |port|
connection = PG.connect("postgres://postgres:postgres@localhost:#{port}/#{database}")
connection.exec(self.drop_query_auth_function(user)) rescue PG::UndefinedFunction
connection.exec("REVOKE EXECUTE ON FUNCTION pg_stat_statements_reset FROM #{user}") rescue PG::UndefinedObject
connection.exec("DROP ROLE #{user}") rescue PG::UndefinedObject
connection.exec("CREATE ROLE #{user} ENCRYPTED PASSWORD '#{password}' LOGIN;")
connection.exec("GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO #{user}")
connection.exec(self.create_query_auth_function(user))
connection.close
end
Expand All @@ -158,6 +160,7 @@ def self.tear_down_auth_query_for_user(user:, password:, instance_ports: [ 5432,
instance_ports.each do |port|
connection = PG.connect("postgres://postgres:postgres@localhost:#{port}/#{database}")
connection.exec(self.drop_query_auth_function(user)) rescue PG::UndefinedFunction
connection.exec("REVOKE EXECUTE ON FUNCTION pg_stat_statements_reset FROM #{user}")
connection.exec("DROP ROLE #{user}")
connection.close
end
Expand Down
71 changes: 71 additions & 0 deletions tests/ruby/mirrors_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true
require 'uri'
require_relative 'spec_helper'
require_relative 'helpers/auth_query_helper'

describe "Query Mirroing" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
Expand Down Expand Up @@ -89,3 +90,73 @@
end
end
end

describe "Query Mirroring with Auth Query" do
let(:pg_user) { { 'username' => 'sharding_user', 'password' => 'sharding_user' } }
let(:config_user) { {'username' => 'md5_auth_user'} }
let(:auth_query_user) { { 'username' => 'md5_auth_user', 'password' => 'hash' } }
let(:mirror_pg) { PgInstance.new(8432, "md5_auth_user", "hash", "shard0") }
let(:mirror_host) { "localhost" }
let(:config) {
{
'general' => {
'auth_query' => "SELECT * FROM public.user_lookup('$1');",
'auth_query_user' => auth_query_user['username'],
'auth_query_password' => auth_query_user['password']
},
}
}
let(:processes) { Helpers::AuthQuery.single_shard_auth_query(
pool_name: "sharded_db",
pg_user: pg_user,
config_user: config_user,
extra_conf: config,
wait_until_ready: false,
)}

before do
pgcat = processes.pgcat

new_configs = processes.pgcat.current_config
new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
[mirror_host, mirror_pg.port.to_i, 0],
[mirror_host, mirror_pg.port.to_i, 1],
]
pgcat.update_config(new_configs)
pgcat.reload_config

Helpers::AuthQuery.set_up_auth_query_for_user(
user: auth_query_user['username'],
password: auth_query_user['password'],
instance_ports: [processes.primary.port, processes.replicas[0].port, mirror_pg.port],
)

pgcat.wait_until_ready(
pgcat.connection_string("sharded_db", auth_query_user['username'], auth_query_user['password'])
)

mirror_pg.reset
end

after do
Helpers::AuthQuery.tear_down_auth_query_for_user(
user: auth_query_user['username'],
password: auth_query_user['password'],
instance_ports: [processes.primary.port, processes.replicas[0].port, mirror_pg.port],
)
end

context "when auth_query is configured" do
it "can mirror a query" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", auth_query_user['username'], auth_query_user['password']))
runs=5
runs.times { conn.sync_exec("SELECT 1 + 2") }
conn.close

# I'd like to check verify the primary and replica received the queries too, but getting the permissions correct is annoying
# expect((processes.all_databases + processes.replicas).map(&:count_select_1_plus_2).sum).to eq(0)
expect(mirror_pg.count_select_1_plus_2).to eq(runs)
end
end
end

0 comments on commit 7d1cab2

Please sign in to comment.