From 52a980fa0a200d5710e6256eaea3234556cecce5 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 26 Mar 2023 14:52:55 -0500 Subject: [PATCH] wip --- src/client.rs | 707 ++++++++++++++++++++++++++++---------------------- 1 file changed, 396 insertions(+), 311 deletions(-) diff --git a/src/client.rs b/src/client.rs index bb70a656..c716a6af 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,15 @@ use crate::errors::Error; use crate::pool::BanReason; +use bb8::PooledConnection; /// Handle clients by pretending to be a PostgreSQL server. use bytes::{Buf, BufMut, BytesMut}; +use hyper::server::conn; use log::{debug, error, info, trace, warn}; +use crate::pool::ServerPool; +use crate::config::Role; use std::collections::HashMap; +use std::ops::Add; use std::time::Instant; use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf}; use tokio::net::TcpStream; @@ -31,6 +36,19 @@ enum ClientConnectionType { CancelQuery, } +struct RetryBuffer { + buffer: BytesMut, + retry_count: u32, +} + +pub enum ClientFlowControl { + Retry, + PerformNextCommand, + ReleaseConnection, + Disconnect +} + + /// The client state. One of these is created per client. pub struct Client { /// The reads are buffered (8K by default). @@ -92,6 +110,8 @@ pub struct Client { /// Used to notify clients about an impending shutdown shutdown: Receiver<()>, + + retry_buffer: Option, } /// Client entrypoint. @@ -558,6 +578,7 @@ where application_name: application_name.to_string(), shutdown, connected_to_server: false, + retry_buffer: None, }) } @@ -592,155 +613,184 @@ where application_name: String::from("undefined"), shutdown, connected_to_server: false, + retry_buffer: None, }) } - /// Handle a connected and authenticated client. - pub async fn handle(&mut self) -> Result<(), Error> { - // The client wants to cancel a query it has issued previously. - if self.cancel_mode { - trace!("Sending CancelRequest"); - let (process_id, secret_key, address, port) = { - let guard = self.client_server_map.lock(); + async fn cancel_query(&mut self) -> Result<(), Error> { + trace!("Sending CancelRequest"); - match guard.get(&(self.process_id, self.secret_key)) { - // Drop the mutex as soon as possible. - // We found the server the client is using for its query - // that it wants to cancel. - Some((process_id, secret_key, address, port)) => { - (*process_id, *secret_key, address.clone(), *port) - } + let (process_id, secret_key, address, port) = { + let guard = self.client_server_map.lock(); - // The client doesn't know / got the wrong server, - // we're closing the connection for security reasons. - None => return Ok(()), + match guard.get(&(self.process_id, self.secret_key)) { + // Drop the mutex as soon as possible. + // We found the server the client is using for its query + // that it wants to cancel. + Some((process_id, secret_key, address, port)) => { + (*process_id, *secret_key, address.clone(), *port) } - }; - // Opens a new separate connection to the server, sends the backend_id - // and secret_key and then closes it for security reasons. No other interactions - // take place. - return Server::cancel(&address, port, process_id, secret_key).await; - } + // The client doesn't know / got the wrong server, + // we're closing the connection for security reasons. + None => return Ok(()), + } + }; - // The query router determines where the query is going to go, - // e.g. primary, replica, which shard. - let mut query_router = QueryRouter::new(); - self.stats.client_register( - self.process_id, - self.pool_name.clone(), - self.username.clone(), - self.application_name.clone(), - ); + // Opens a new separate connection to the server, sends the backend_id + // and secret_key and then closes it for security reasons. No other interactions + // take place. + return Server::cancel(&address, port, process_id, secret_key).await; + } - // Our custom protocol loop. - // We expect the client to either start a transaction with regular queries - // or issue commands for our sharding and server selection protocol. - loop { - trace!( - "Client idle, waiting for message, transaction mode: {}", - self.transaction_mode - ); + async fn checkout_connection(&mut self, pool: &ConnectionPool, query_router: &mut QueryRouter) -> Result<(PooledConnection, Address), Error> { + // Grab a server from the pool. + let mut connection = match pool + .get(query_router.shard(), query_router.role(), self.process_id) + .await + { + Ok(conn) => { + debug!("Got connection from pool"); + conn + } + Err(err) => { + self.buffer.clear(); + error_response(&mut self.write, "could not get connection from the pool") + .await?; - // Read a complete message from the client, which normally would be - // either a `Q` (query) or `P` (prepare, extended protocol). - // We can parse it here before grabbing a server from the pool, - // in case the client is sending some custom protocol messages, e.g. - // SET SHARDING KEY TO 'bigint'; + error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}", + self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err); + return Err(err); + } + }; - let message = tokio::select! { - _ = self.shutdown.recv() => { - if !self.admin { - error_response_terminal( - &mut self.write, - "terminating connection due to administrator command" - ).await?; - return Ok(()) - } + let server = &mut connection.0; + let address = connection.1.clone(); - // Admin clients ignore shutdown. - else { - read_message(&mut self.read).await? - } - }, - message_result = read_message(&mut self.read) => message_result? - }; + // Server is assigned to the client in case the client wants to + // cancel a query later. + server.claim(self.process_id, self.secret_key); + self.connected_to_server = true; - match message[0] as char { - // Buffer extended protocol messages even if we do not have - // a server connection yet. Hopefully, when we get the S message - // we'll be able to allocate a connection. Also, clients do not expect - // the server to respond to these messages so even if we were not able to - // allocate a connection, we wouldn't be able to send back an error message - // to the client so we buffer them and defer the decision to error out or not - // to when we get the S message - 'D' | 'E' => { - self.buffer.put(&message[..]); - continue; - } + // Update statistics + self.stats + .client_active(self.process_id, server.server_id()); - 'Q' => { - if query_router.query_parser_enabled() { - query_router.infer(&message); - } - } + self.last_address_id = Some(address.id); + self.last_server_id = Some(server.server_id()); + + debug!( + "Client {:?} talking to server {:?}", + self.addr, + server.address() + ); - 'P' => { - self.buffer.put(&message[..]); + return Ok(connection); + } - if query_router.query_parser_enabled() { - query_router.infer(&message); - } + async fn client_proc(&mut self, query_router: &mut QueryRouter) -> Result { - continue; + trace!( + "Client idle, waiting for message, transaction mode: {}", + self.transaction_mode + ); + + // Read a complete message from the client, which normally would be + // either a `Q` (query) or `P` (prepare, extended protocol). + // We can parse it here before grabbing a server from the pool, + // in case the client is sending some custom protocol messages, e.g. + // SET SHARDING KEY TO 'bigint'; + let message = tokio::select! { + _ = self.shutdown.recv() => { + if !self.admin { + error_response_terminal( + &mut self.write, + "terminating connection due to administrator command" + ).await?; + return Ok(ClientFlowControl::Disconnect) + } else { + // Admin clients ignore shutdown. + read_message(&mut self.read).await? } + }, - 'B' => { - self.buffer.put(&message[..]); + message_result = read_message(&mut self.read) => message_result? + }; - if query_router.query_parser_enabled() { - query_router.infer_shard_from_bind(&message); - } + match message[0] as char { + // Buffer extended protocol messages even if we do not have + // a server connection yet. Hopefully, when we get the S message + // we'll be able to allocate a connection. Also, clients do not expect + // the server to respond to these messages so even if we were not able to + // allocate a connection, we wouldn't be able to send back an error message + // to the client so we buffer them and defer the decision to error out or not + // to when we get the S message + 'D' | 'E' => { + self.buffer.put(&message[..]); + return Ok(ClientFlowControl::PerformNextCommand); + } - continue; + 'Q' => { + if query_router.query_parser_enabled() { + query_router.infer(&message); } + } - 'X' => { - debug!("Client disconnecting"); - return Ok(()); + 'P' => { + self.buffer.put(&message[..]); + + if query_router.query_parser_enabled() { + query_router.infer(&message); } - _ => (), + return Ok(ClientFlowControl::PerformNextCommand); } - // Handle admin database queries. - if self.admin { - debug!("Handling admin command"); - handle_admin(&mut self.write, message, self.client_server_map.clone()).await?; - continue; - } + 'B' => { + self.buffer.put(&message[..]); - // Get a pool instance referenced by the most up-to-date - // pointer. This ensures we always read the latest config - // when starting a query. - let mut pool = self.get_pool().await?; + if query_router.query_parser_enabled() { + query_router.infer_shard_from_bind(&message); + } - // Check if the pool is paused and wait until it's resumed. - if pool.wait_paused().await { - // Refresh pool information, something might have changed. - pool = self.get_pool().await?; + return Ok(ClientFlowControl::PerformNextCommand); } - query_router.update_pool_settings(pool.settings.clone()); + 'X' => { + debug!("Client disconnecting"); + return Ok(ClientFlowControl::Disconnect); + } + + _ => (), + } + + // Handle admin database queries. + if self.admin { + debug!("Handling admin command"); + handle_admin(&mut self.write, message, self.client_server_map.clone()).await?; + return Ok(ClientFlowControl::PerformNextCommand); + } + + // Get a pool instance referenced by the most up-to-date + // pointer. This ensures we always read the latest config + // when starting a query. + let mut pool = self.get_pool().await?; + + // Check if the pool is paused and wait until it's resumed. + if pool.wait_paused().await { + // Refresh pool information, something might have changed. + pool = self.get_pool().await?; + } - let current_shard = query_router.shard(); + query_router.update_pool_settings(pool.settings.clone()); - // Handle all custom protocol commands, if any. - match query_router.try_execute_command(&message) { - // Normal query, not a custom command. - None => (), + let current_shard = query_router.shard(); + // Handle all custom protocol commands, if any. + match query_router.try_execute_command(&message) { + // Normal query, not a custom command. + None => (), // SET SHARD TO Some((Command::SetShard, _)) => { // Selected shard is not configured. @@ -761,97 +811,48 @@ where } else { custom_protocol_response_ok(&mut self.write, "SET SHARD").await?; } - continue; + return Ok(ClientFlowControl::PerformNextCommand); } // SET PRIMARY READS TO Some((Command::SetPrimaryReads, _)) => { custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?; - continue; + return Ok(ClientFlowControl::PerformNextCommand); } // SET SHARDING KEY TO Some((Command::SetShardingKey, _)) => { custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?; - continue; + return Ok(ClientFlowControl::PerformNextCommand); } // SET SERVER ROLE TO Some((Command::SetServerRole, _)) => { custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?; - continue; + return Ok(ClientFlowControl::PerformNextCommand); } // SHOW SERVER ROLE Some((Command::ShowServerRole, value)) => { show_response(&mut self.write, "server role", &value).await?; - continue; + return Ok(ClientFlowControl::PerformNextCommand); } // SHOW SHARD Some((Command::ShowShard, value)) => { show_response(&mut self.write, "shard", &value).await?; - continue; + return Ok(ClientFlowControl::PerformNextCommand); } // SHOW PRIMARY READS Some((Command::ShowPrimaryReads, value)) => { show_response(&mut self.write, "primary reads", &value).await?; - continue; + return Ok(ClientFlowControl::PerformNextCommand); } }; debug!("Waiting for connection from pool"); - // Grab a server from the pool. - let connection = match pool - .get(query_router.shard(), query_router.role(), self.process_id) - .await - { - Ok(conn) => { - debug!("Got connection from pool"); - conn - } - Err(err) => { - // Client is attempting to get results from the server, - // but we were unable to grab a connection from the pool - // We'll send back an error message and clean the extended - // protocol buffer - if message[0] as char == 'S' { - error!("Got Sync message but failed to get a connection from the pool"); - self.buffer.clear(); - } - error_response(&mut self.write, "could not get connection from the pool") - .await?; - - error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}", - self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err); - continue; - } - }; - - let mut reference = connection.0; - let address = connection.1; - let server = &mut *reference; - - // Server is assigned to the client in case the client wants to - // cancel a query later. - server.claim(self.process_id, self.secret_key); - self.connected_to_server = true; - - // Update statistics - self.stats - .client_active(self.process_id, server.server_id()); - - self.last_address_id = Some(address.id); - self.last_server_id = Some(server.server_id()); - - debug!( - "Client {:?} talking to server {:?}", - self.addr, - server.address() - ); - // TODO: investigate other parameters and set them too. // Set application_name. @@ -866,189 +867,273 @@ where // If the client is in session mode, no more custom protocol // commands will be accepted. loop { - let message = match initial_message { - None => { - trace!("Waiting for message inside transaction or in session mode"); - - match read_message(&mut self.read).await { - Ok(message) => message, - Err(err) => { - // Client disconnected inside a transaction. - // Clean up the server and re-use it. - server.checkin_cleanup().await?; - - return Err(err); + match self.tx_proc(server, &pool, initial_message).await { + Ok(control_flow) => { + match control_flow { + ClientFlowControl::PerformNextCommand => { + initial_message = None; + continue; + } + control_flow_result => { + return Ok(control_flow_result); } } - } - Some(message) => { - initial_message = None; - message - } - }; + }, + Err(err) => return Err(err), + } + } - // The message will be forwarded to the server intact. We still would like to - // parse it below to figure out what to do with it. + // The server is no longer bound to us, we can't cancel it's queries anymore. + debug!("Releasing server back into the pool"); + server.checkin_cleanup().await?; + self.stats.server_idle(server.server_id()); + self.connected_to_server = false; - // Safe to unwrap because we know this message has a certain length and has the code - // This reads the first byte without advancing the internal pointer and mutating the bytes - let code = *message.get(0).unwrap() as char; + self.release(); + self.stats.client_idle(self.process_id); + } - trace!("Message: {}", code); - match code { - // Query - 'Q' => { - debug!("Sending query to server"); + #[inline(always)] + pub async fn tx_proc(&mut self, server: &mut Server, pool: &ConnectionPool, initial_message: Option ) -> Result { + let message = match initial_message { + None => { + trace!("Waiting for message inside transaction or in session mode"); - self.send_and_receive_loop(code, Some(&message), server, &address, &pool) - .await?; + match read_message(&mut self.read).await { + Ok(message) => message, + Err(err) => { + // Client disconnected inside a transaction. + // Clean up the server and re-use it. + server.checkin_cleanup().await?; - if !server.in_transaction() { - // Report transaction executed statistics. - self.stats.transaction(self.process_id, server.server_id()); + return Err(err); + } + } + } + Some(message) => message + }; - // Release server back to the pool if we are in transaction mode. - // If we are in session mode, we keep the server until the client disconnects. - if self.transaction_mode { - break; + // The message will be forwarded to the server intact. We still would like to + // parse it below to figure out what to do with it. + + // Safe to unwrap because we know this message has a certain length and has the code + // This reads the first byte without advancing the internal pointer and mutating the bytes + let code = *message.get(0).unwrap() as char; + let address = server.address(); + trace!("Message: {}", code); + + match code { + // Query + 'Q' => { + debug!("Sending query to server"); + + match self.send_and_receive_loop(code, Some(&message), server, &address, &pool).await { + Ok(_) => self.retry_buffer = None, + Err(_) => { + if server.is_bad() && !server.in_transaction() && server.address().role == Role::Replica { + match self.retry_buffer { + Some(ref mut retry_buffer) => { + if retry_buffer.retry_count < 3 { + retry_buffer.retry_count += 1; + return Ok(ClientFlowControl::Retry); + } + }, + None => { + self.retry_buffer = Some(RetryBuffer { buffer: message, retry_count: 0 }); + return Ok(ClientFlowControl::Retry); + } } } - } - - // Terminate - 'X' => { - server.checkin_cleanup().await?; - self.release(); + }, + } - return Ok(()); - } + if !server.in_transaction() { + // Report transaction executed statistics. + self.stats.transaction(self.process_id, server.server_id()); - // Parse - // The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`. - 'P' => { - self.buffer.put(&message[..]); + // Release server back to the pool if we are in transaction mode. + // If we are in session mode, we keep the server until the client disconnects. + if self.transaction_mode { + return Ok(ClientFlowControl::ReleaseConnection); } + } + } - // Bind - // The placeholder's replacements are here, e.g. 'user@email.com' and 'true' - 'B' => { - self.buffer.put(&message[..]); - } + // Terminate + 'X' => { + server.checkin_cleanup().await?; + self.release(); - // Describe - // Command a client can issue to describe a previously prepared named statement. - 'D' => { - self.buffer.put(&message[..]); - } + return Ok(ClientFlowControl::Disconnect); + } - // Execute - // Execute a prepared statement prepared in `P` and bound in `B`. - 'E' => { - self.buffer.put(&message[..]); - } + // Parse + // The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`. + 'P' => { + self.buffer.put(&message[..]); + } - // Sync - // Frontend (client) is asking for the query result now. - 'S' => { - debug!("Sending query to server"); - - self.buffer.put(&message[..]); - - let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char; - - // Almost certainly true - if first_message_code == 'P' { - // Message layout - // P followed by 32 int followed by null-terminated statement name - // So message code should be in offset 0 of the buffer, first character - // in prepared statement name would be index 5 - let first_char_in_name = *self.buffer.get(5).unwrap_or(&0); - if first_char_in_name != 0 { - // This is a named prepared statement - // Server connection state will need to be cleared at checkin - server.mark_dirty(); - } - } + // Bind + // The placeholder's replacements are here, e.g. 'user@email.com' and 'true' + 'B' => { + self.buffer.put(&message[..]); + } - self.send_and_receive_loop(code, None, server, &address, &pool) - .await?; + // Describe + // Command a client can issue to describe a previously prepared named statement. + 'D' => { + self.buffer.put(&message[..]); + } - self.buffer.clear(); + // Execute + // Execute a prepared statement prepared in `P` and bound in `B`. + 'E' => { + self.buffer.put(&message[..]); + } - if !server.in_transaction() { - self.stats.transaction(self.process_id, server.server_id()); + // Sync + // Frontend (client) is asking for the query result now. + 'S' => { + debug!("Sending query to server"); + + self.buffer.put(&message[..]); + + let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char; + + // Almost certainly true + if first_message_code == 'P' { + // Message layout + // P followed by 32 int followed by null-terminated statement name + // So message code should be in offset 0 of the buffer, first character + // in prepared statement name would be index 5 + let first_char_in_name = *self.buffer.get(5).unwrap_or(&0); + if first_char_in_name != 0 { + // This is a named prepared statement + // Server connection state will need to be cleared at checkin + server.mark_dirty(); + } + } - // Release server back to the pool if we are in transaction mode. - // If we are in session mode, we keep the server until the client disconnects. - if self.transaction_mode { - break; + match self.send_and_receive_loop(code, None, server, &address, &pool).await { + Ok(_) => self.retry_buffer = None, + Err(err) => { + if server.is_bad() && !server.in_transaction() && server.address().role == Role::Replica { + match self.retry_buffer { + Some(ref mut retry_buffer) => { + if retry_buffer.retry_count < 3 { + retry_buffer.retry_count += 1; + return Ok(ClientFlowControl::Retry); + } + self.retry_buffer = None; + return Err(err); + }, + None => { + let buffer = self.buffer.clone(); + self.buffer.clear(); + self.retry_buffer = Some(RetryBuffer { buffer: message, retry_count: 0 }); + return Ok(ClientFlowControl::Retry); + } } } } + } - // CopyData - 'd' => { - self.buffer.put(&message[..]); + self.buffer.clear(); - // Want to limit buffer size - if self.buffer.len() > 8196 { - // Forward the data to the server, - self.send_server_message(server, &self.buffer, &address, &pool) - .await?; - self.buffer.clear(); - } + if !server.in_transaction() { + self.stats.transaction(self.process_id, server.server_id()); + + // Release server back to the pool if we are in transaction mode. + // If we are in session mode, we keep the server until the client disconnects. + if self.transaction_mode { + return Ok(ClientFlowControl::ReleaseConnection); } + } + } - // CopyDone or CopyFail - // Copy is done, successfully or not. - 'c' | 'f' => { - // We may already have some copy data in the buffer, add this message to buffer - self.buffer.put(&message[..]); + // CopyData + 'd' => { + self.buffer.put(&message[..]); - self.send_server_message(server, &self.buffer, &address, &pool) - .await?; + // Want to limit buffer size + if self.buffer.len() > 8196 { + // Forward the data to the server, + self.send_server_message(server, &self.buffer, &address, &pool) + .await?; + self.buffer.clear(); + } + } - // Clear the buffer - self.buffer.clear(); + // CopyDone or CopyFail + // Copy is done, successfully or not. + 'c' | 'f' => { + // We may already have some copy data in the buffer, add this message to buffer + self.buffer.put(&message[..]); - let response = self.receive_server_message(server, &address, &pool).await?; + self.send_server_message(server, &self.buffer, &address, &pool) + .await?; - match write_all_half(&mut self.write, &response).await { - Ok(_) => (), - Err(err) => { - server.mark_bad(); - return Err(err); - } - }; + // Clear the buffer + self.buffer.clear(); - if !server.in_transaction() { - self.stats.transaction(self.process_id, server.server_id()); + let response = self.receive_server_message(server, &address, &pool).await?; - // Release server back to the pool if we are in transaction mode. - // If we are in session mode, we keep the server until the client disconnects. - if self.transaction_mode { - break; - } - } + match write_all_half(&mut self.write, &response).await { + Ok(_) => (), + Err(err) => { + server.mark_bad(); + return Err(err); } + }; - // Some unexpected message. We either did not implement the protocol correctly - // or this is not a Postgres client we're talking to. - _ => { - error!("Unexpected code: {}", code); + if !server.in_transaction() { + self.stats.transaction(self.process_id, server.server_id()); + + // Release server back to the pool if we are in transaction mode. + // If we are in session mode, we keep the server until the client disconnects. + if self.transaction_mode { + return Ok(ClientFlowControl::ReleaseConnection); } } } - // The server is no longer bound to us, we can't cancel it's queries anymore. - debug!("Releasing server back into the pool"); - server.checkin_cleanup().await?; - self.stats.server_idle(server.server_id()); - self.connected_to_server = false; + // Some unexpected message. We either did not implement the protocol correctly + // or this is not a Postgres client we're talking to. + _ => { + return Err(Error::ProtocolSyncError("bad message code".to_string())); + } + } + return Ok(ClientFlowControl::PerformNextCommand); + } + + + + /// Handle a connected and authenticated client. + pub async fn handle(&mut self) -> Result<(), Error> { + // The client wants to cancel a query it has issued previously. + if self.cancel_mode { + return self.cancel_query().await; + } + + // The query router determines where the query is going to go, + // e.g. primary, replica, which shard. + let mut query_router = QueryRouter::new(); + self.stats.client_register( + self.process_id, + self.pool_name.clone(), + self.username.clone(), + self.application_name.clone(), + ); + + // Our custom protocol loop. + // We expect the client to either start a transaction with regular queries + // or issue commands for our sharding and server selection protocol. + + loop { + + self.client_proc(&mut query_router).await?; - self.release(); - self.stats.client_idle(self.process_id); } }