Skip to content

Commit

Permalink
chore(bors): merge pull request #913
Browse files Browse the repository at this point in the history
913: refactor(csi-driver): use tokio process command for findmnt and add path prefix r=Abhinandan-Purkait a=Abhinandan-Purkait

- Use tokio async process for findmnt
- Use the kubelet path as prefix in exhaustive mount path search during cleanup

Co-authored-by: Abhinandan Purkait <[email protected]>
  • Loading branch information
mayastor-bors and Abhinandan-Purkait committed Jan 10, 2025
2 parents f6e0654 + 264e3f3 commit c536151
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 23 deletions.
2 changes: 1 addition & 1 deletion control-plane/csi-driver/src/bin/node/block_vol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu
//target exists and is a special file

// Idempotency, if we have done this already just return success.
match findmnt::get_devicepath(target_path) {
match findmnt::get_devicepath(target_path).await {
Ok(findmnt_dev) => {
if let Some(fm_devpath) = findmnt_dev {
if fm_devpath == device_path {
Expand Down
1 change: 1 addition & 0 deletions control-plane/csi-driver/src/bin/node/filesystem_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl FileSystemOps for Ext4Fs {
let dev_path = match dev_path {
Some(path) => path,
None => get_devicepath(mount_path)
.await
.map_err(|error| {
format!("failed to get dev path for mountpoint {mount_path}: {error}")
})?
Expand Down
41 changes: 29 additions & 12 deletions control-plane/csi-driver/src/bin/node/findmnt.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::{error::DeviceError, filesystem_ops::FileSystem};
use csi_driver::filesystem::FileSystem as Fs;
use utils::csi_plugin_name;

use serde_json::Value;
use std::{collections::HashMap, process::Command, str::FromStr, string::String, vec::Vec};
use std::{collections::HashMap, str::FromStr, string::String, vec::Vec};
use tracing::{error, warn};
use utils::csi_plugin_name;

// Keys of interest we expect to find in the JSON output generated
// by findmnt.
Expand Down Expand Up @@ -145,8 +145,12 @@ const FIND_MNT_ARGS: [&str; 3] = ["-J", "-o", "SOURCE,TARGET,FSTYPE"];

/// Execute the Linux utility findmnt, collect the json output,
/// invoke the filter function and return the filtered results.
fn findmnt(params: Filter) -> Result<Vec<HashMap<String, String>>, DeviceError> {
let output = Command::new(FIND_MNT).args(FIND_MNT_ARGS).output()?;
async fn findmnt(params: Filter<'_>) -> Result<Vec<HashMap<String, String>>, DeviceError> {
let output = tokio::process::Command::new(FIND_MNT)
.args(FIND_MNT_ARGS)
.output()
.await?;

if output.status.success() {
let json_str = String::from_utf8(output.stdout)?;
let json: Value = serde_json::from_str(&json_str)?;
Expand All @@ -161,12 +165,12 @@ fn findmnt(params: Filter) -> Result<Vec<HashMap<String, String>>, DeviceError>
/// Use the Linux utility findmnt to find the name of the device mounted at a
/// directory or block special file, if any.
/// mount_path is the path a device is mounted on.
pub(crate) fn get_devicepath(mount_path: &str) -> Result<Option<String>, DeviceError> {
pub(crate) async fn get_devicepath(mount_path: &str) -> Result<Option<String>, DeviceError> {
let tgt_filter = Filter {
key: TARGET_KEY,
value: mount_path,
};
let sources = findmnt(tgt_filter)?;
let sources = findmnt(tgt_filter).await?;
{
match sources.len() {
0 => Ok(None),
Expand Down Expand Up @@ -194,12 +198,12 @@ pub(crate) fn get_devicepath(mount_path: &str) -> Result<Option<String>, DeviceE
/// Use the Linux utility findmnt to find the mount paths for a block device,
/// if any.
/// device_path is the path to the device for example "/dev/sda1"
pub(crate) fn get_mountpaths(device_path: &str) -> Result<Vec<DeviceMount>, DeviceError> {
pub(crate) async fn get_mountpaths(device_path: &str) -> Result<Vec<DeviceMount>, DeviceError> {
let dev_filter = Filter {
key: SOURCE_KEY,
value: device_path,
};
match findmnt(dev_filter) {
match findmnt(dev_filter).await {
Ok(results) => {
let mut mountpaths: Vec<DeviceMount> = Vec::new();
for entry in results {
Expand Down Expand Up @@ -249,8 +253,18 @@ struct FilterCsiMounts<'a> {
}

/// Finds CSI mount points using `findmnt` and filters based on the given criteria.
fn find_csi_mount(filter: FilterCsiMounts) -> Result<Vec<HashMap<String, String>>, DeviceError> {
let output = Command::new(FIND_MNT).args(FIND_MNT_ARGS).output()?;
async fn find_csi_mount(
filter: FilterCsiMounts<'_>,
path_prefix: Option<&str>,
) -> Result<Vec<HashMap<String, String>>, DeviceError> {
let mut command = tokio::process::Command::new(FIND_MNT);
command.args(FIND_MNT_ARGS);

if let Some(path_prefix) = path_prefix {
command.args(["-R", path_prefix]);
}

let output = command.output().await?;

if !output.status.success() {
return Err(DeviceError::new(String::from_utf8(output.stderr)?.as_str()));
Expand Down Expand Up @@ -321,14 +335,17 @@ fn read_vol_data_json(path: &str) -> Result<serde_json::Map<String, Value>, Devi
}

/// Retrieves mount paths for a given CSI volume ID by parsing the metadata file.
pub(crate) async fn get_csi_mountpaths(volume_id: &str) -> Result<Vec<DeviceMount>, DeviceError> {
pub(crate) async fn get_csi_mountpaths(
volume_id: &str,
path_prefix: Option<&str>,
) -> Result<Vec<DeviceMount>, DeviceError> {
let filter = FilterCsiMounts {
driver: &csi_plugin_name(),
volume_id,
file_name: METADATA_FILE,
device_pattern: DEVICE_PATTERN,
};
match find_csi_mount(filter) {
match find_csi_mount(filter, path_prefix).await {
Ok(results) => {
let mut mountpaths: Vec<DeviceMount> = Vec::new();
for entry in results {
Expand Down
8 changes: 4 additions & 4 deletions control-plane/csi-driver/src/bin/node/fsfreeze/bin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ pub(crate) async fn fsfreeze(volume_id: &str, command: FsFreezeOpt) -> Result<()
// Use findmnt to work out if volume is mounted as a raw
// block, i.e. we get some matches, and return the
// BlockDeviceMount error.
let mountpaths = findmnt::get_mountpaths(&device_path).map_err(|error| {
FsfreezeError::InternalFailure {
let mountpaths = findmnt::get_mountpaths(&device_path)
.await
.map_err(|error| FsfreezeError::InternalFailure {
source: error,
volume_id: volume_id.to_string(),
}
})?;
})?;
if !mountpaths.is_empty() {
return Err(FsfreezeError::BlockDeviceMount {
volume_id: volume_id.to_string(),
Expand Down
10 changes: 9 additions & 1 deletion control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ pub(super) async fn main() -> anyhow::Result<()> {
even though volume target is rdma capable."
)
)
.arg(
Arg::new("kubelet-path")
.long("kubelet-path")
.default_value("/var/lib/kubelet")
.help("Kubelet path on the host system")
)
.subcommand(
clap::Command::new("fs-freeze")
.arg(
Expand Down Expand Up @@ -346,12 +352,14 @@ pub(super) async fn main() -> anyhow::Result<()> {
// Parse instance and grpc endpoints from the command line arguments and validate.
let grpc_sock_addr = validate_endpoints(&matches, registration_enabled)?;

let kubelet_path = matches.get_one::<String>("kubelet-path").unwrap();

// Start the CSI server, node plugin grpc server and registration loop if registration is
// enabled.
*crate::config::config().nvme_as_mut() = TryFrom::try_from(&matches)?;
let (csi, grpc, registration) = tokio::join!(
CsiServer::run(csi_socket, &matches)?,
NodePluginGrpcServer::run(grpc_sock_addr),
NodePluginGrpcServer::run(grpc_sock_addr, kubelet_path.to_owned()),
run_registration_loop(
node_name.clone(),
grpc_sock_addr.to_string(),
Expand Down
16 changes: 11 additions & 5 deletions control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use tracing::{debug, error, info};
use uuid::Uuid;

#[derive(Debug, Default)]
pub(crate) struct NodePluginSvc {}
pub(crate) struct NodePluginSvc {
kubelet_path: String,
}

#[tonic::async_trait]
impl NodePlugin for NodePluginSvc {
Expand Down Expand Up @@ -105,7 +107,7 @@ impl NodePlugin for NodePluginSvc {
})?;

if let Ok(Some(device)) = Device::lookup(&uuid).await {
let mountpaths = findmnt::get_mountpaths(&device.devname())?;
let mountpaths = findmnt::get_mountpaths(&device.devname()).await?;
debug!(
"Device: {} found, with mount paths: {}, issuing unmount",
device.devname(),
Expand All @@ -117,7 +119,8 @@ impl NodePlugin for NodePluginSvc {
);
lazy_unmount_mountpaths(&mountpaths).await?;
} else {
let mountpaths = findmnt::get_csi_mountpaths(&volume_id).await?;
let mountpaths =
findmnt::get_csi_mountpaths(&volume_id, Some(&self.kubelet_path)).await?;
debug!(
"Device was not found, detected mount paths: {}, issuing unmount",
mountpaths
Expand Down Expand Up @@ -149,13 +152,16 @@ pub(crate) struct NodePluginGrpcServer {}

impl NodePluginGrpcServer {
/// Run `Self` as a tonic server.
pub(crate) async fn run(endpoint: std::net::SocketAddr) -> anyhow::Result<()> {
pub(crate) async fn run(
endpoint: std::net::SocketAddr,
kubelet_path: String,
) -> anyhow::Result<()> {
info!(
"node plugin gRPC server configured at address {:?}",
endpoint
);
Server::builder()
.add_service(NodePluginServer::new(NodePluginSvc {}))
.add_service(NodePluginServer::new(NodePluginSvc { kubelet_path }))
.serve_with_shutdown(endpoint, Shutdown::wait())
.await
.map_err(|error| {
Expand Down
1 change: 1 addition & 0 deletions control-plane/csi-driver/src/bin/node/nodeplugin_svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(crate) async fn find_mount(
) -> Result<Option<TypeOfMount>, tonic::Status> {
let device_path = device.devname();
let mountpaths = findmnt::get_mountpaths(&device_path)
.await
.map_err(|error| tonic::Status::internal(error.to_string()))?;
debug!(
volume.uuid = volume_id,
Expand Down

0 comments on commit c536151

Please sign in to comment.