From baf7b27bd17aa33908f05acc4366558064e93067 Mon Sep 17 00:00:00 2001 From: Malcolm Morgan Date: Wed, 11 Sep 2024 17:08:36 +0100 Subject: [PATCH] Towards #47 Replacing all functions with purrr/furrr --- DESCRIPTION | 5 --- R/atoc_export.R | 23 +++++++---- R/atoc_shapes.R | 9 ++-- R/gtfs_interpolate_times.R | 26 ++++++++---- R/stops_per_week_functions.R | 7 +++- R/transxchange.R | 80 +++++++++++++++++++++++------------- 6 files changed, 95 insertions(+), 55 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 09bb3cd..286f30d 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -24,22 +24,17 @@ Imports: collapse, data.table, dodgr, - doSNOW, dplyr, digest, - foreach, future, furrr, - future.apply, geodist, httr, iotools, stringr, sf, - parallel, lubridate, purrr (>= 1.0), - pbapply, readr (>= 2.0), RcppSimdJson, tidyr, diff --git a/R/atoc_export.R b/R/atoc_export.R index c842540..95de717 100644 --- a/R/atoc_export.R +++ b/R/atoc_export.R @@ -564,15 +564,22 @@ duplicate.stop_times_alt <- function(calendar, stop_times, ncores = 1) { } if (ncores == 1) { - stop_times.dup <- pbapply::pblapply(stop_times_split, duplicate.stop_times.int) + #stop_times.dup <- pbapply::pblapply(stop_times_split, duplicate.stop_times.int) + stop_times.dup <- purrr::map(stop_times_split, duplicate.stop_times.int, .progress = TRUE) } else { - cl <- parallel::makeCluster(ncores) - stop_times.dup <- pbapply::pblapply(stop_times_split, - duplicate.stop_times.int, - cl = cl - ) - parallel::stopCluster(cl) - rm(cl) + # cl <- parallel::makeCluster(ncores) + # stop_times.dup <- pbapply::pblapply(stop_times_split, + # duplicate.stop_times.int, + # cl = cl + # ) + # parallel::stopCluster(cl) + # rm(cl) + + future::plan(future::multisession, workers = ncores) + res <- furrr::future_map(.x = stop_times_split, + .f = duplicate.stop_times.int, + .progress = TRUE) + future::plan(future::sequential) } stop_times.dup <- dplyr::bind_rows(stop_times.dup) diff --git a/R/atoc_shapes.R b/R/atoc_shapes.R index 6c4a128..df06f7c 100644 --- a/R/atoc_shapes.R +++ b/R/atoc_shapes.R @@ -93,7 +93,8 @@ ATOC_shapes <- function(gtfs) { } } - dp.list <- pbapply::pblapply(dp.list, path_to_sf, verts = verts) + #dp.list <- pbapply::pblapply(dp.list, path_to_sf, verts = verts) + dp.list <- purrr::map(dp.list, path_to_sf, verts = verts, .progress = TRUE) dp.list <- unname(dp.list) pairs$geometry <- sf::st_sfc(dp.list, crs = 4326) rm(dp.list, verts) @@ -110,7 +111,8 @@ ATOC_shapes <- function(gtfs) { } message(paste0(Sys.time()," Invert routes")) - pairs_opp$geometry <- pbapply::pblapply(pairs_opp$geometry, invert_linestring) + #pairs_opp$geometry <- pbapply::pblapply(pairs_opp$geometry, invert_linestring) + pairs_opp$geometry <- purrr::map(pairs_opp$geometry, invert_linestring, .progress = TRUE) pairs_opp$geometry <- sf::st_as_sfc(pairs_opp$geometry, crs = 4326) pairs_opp <- sf::st_as_sf(pairs_opp) pairs_opp <- pairs_opp[, names(pairs)] @@ -135,7 +137,8 @@ ATOC_shapes <- function(gtfs) { message(paste0(Sys.time()," final formatting")) rm(graph, pairs) - shape_res <- pbapply::pblapply(st_split, match_lines) + #shape_res <- pbapply::pblapply(st_split, match_lines) + shape_res <- purrr::map(st_split, match_lines, .progress = TRUE) str5 <- lapply(shape_res, `[[`, 2) shapes <- lapply(shape_res, `[[`, 1) diff --git a/R/gtfs_interpolate_times.R b/R/gtfs_interpolate_times.R index 56bc9d4..06b97e1 100644 --- a/R/gtfs_interpolate_times.R +++ b/R/gtfs_interpolate_times.R @@ -36,16 +36,24 @@ gtfs_interpolate_times <- function(gtfs, ncores = 1){ stop_times <- dplyr::group_split(stop_times) if(ncores == 1){ - stop_times <- pbapply::pblapply(stop_times, stops_interpolate) + #stop_times <- pbapply::pblapply(stop_times, stops_interpolate) + stop_times <- purrr::map(stop_times, stops_interpolate, .progress = TRUE) } else { - cl <- parallel::makeCluster(ncores) - parallel::clusterEvalQ(cl, {loadNamespace("UK2GTFS")}) - stop_times <- pbapply::pblapply(stop_times, - stops_interpolate, - cl = cl - ) - parallel::stopCluster(cl) - rm(cl) + # cl <- parallel::makeCluster(ncores) + # parallel::clusterEvalQ(cl, {loadNamespace("UK2GTFS")}) + # stop_times <- pbapply::pblapply(stop_times, + # stops_interpolate, + # cl = cl + # ) + # parallel::stopCluster(cl) + # rm(cl) + + future::plan(future::multisession, workers = ncores) + keep <- furrr::future_map(.x = stop_times, + .f = stops_interpolate, + .progress = TRUE) + future::plan(future::sequential) + } stop_times <- data.table::rbindlist(stop_times) diff --git a/R/stops_per_week_functions.R b/R/stops_per_week_functions.R index 4a19a9a..d3b7370 100644 --- a/R/stops_per_week_functions.R +++ b/R/stops_per_week_functions.R @@ -335,7 +335,12 @@ gtfs_trips_per_zone <- function(gtfs, res <- dplyr::group_by(stop_times, zone_id) res <- dplyr::group_split(res) future::plan(future::multisession) - res <- future.apply::future_lapply(res, internal_trips_per_zone, by_mode, days_tot) + #res <- future.apply::future_lapply(res, internal_trips_per_zone, by_mode, days_tot) + res <- furrr::future_map(.x = res, + .f = internal_trips_per_zone, + by_mode = by_mode, + days_tot = days_tot, + .progress = TRUE) future::plan(future::sequential) diff --git a/R/transxchange.R b/R/transxchange.R index 1b6a3ee..9e16bc7 100644 --- a/R/transxchange.R +++ b/R/transxchange.R @@ -135,18 +135,28 @@ transxchange2gtfs <- function(path_in, } else { message(paste0(Sys.time(), " Importing TransXchange files, multicore")) - pb <- utils::txtProgressBar(max = length(files), style = 3) - progress <- function(n) utils::setTxtProgressBar(pb, n) - opts <- list(progress = progress, preschedule = FALSE) - cl <- parallel::makeCluster(ncores) - doSNOW::registerDoSNOW(cl) - boot <- foreach::foreach(i = seq_len(length(files)), .options.snow = opts) - res_all <- foreach::`%dopar%`(boot, { - UK2GTFS:::transxchange_import_try(files[i], - try_mode = try_mode) - }) - parallel::stopCluster(cl) - rm(cl, boot, opts, pb, progress) + future::plan(future::multisession, workers = ncores) + res_all <- furrr::future_map(.x = files, + .f = transxchange_import_try, + run_debug = TRUE, + full_import = FALSE, + try_mode = try_mode, + .progress = TRUE) + future::plan(future::sequential) + + + # pb <- utils::txtProgressBar(max = length(files), style = 3) + # progress <- function(n) utils::setTxtProgressBar(pb, n) + # opts <- list(progress = progress, preschedule = FALSE) + # cl <- parallel::makeCluster(ncores) + # doSNOW::registerDoSNOW(cl) + # boot <- foreach::foreach(i = seq_len(length(files)), .options.snow = opts) + # res_all <- foreach::`%dopar%`(boot, { + # UK2GTFS:::transxchange_import_try(files[i], + # try_mode = try_mode) + # }) + # parallel::stopCluster(cl) + # rm(cl, boot, opts, pb, progress) res_all_message <- res_all[sapply(res_all, class) == "character"] res_all <- res_all[sapply(res_all, class) == "list"] @@ -173,23 +183,35 @@ transxchange2gtfs <- function(path_in, message(" ") message(paste0(Sys.time(), " Converting to GTFS, multicore")) - pb <- utils::txtProgressBar(min = 0, max = length(res_all), style = 3) - progress <- function(n) utils::setTxtProgressBar(pb, n) - opts <- list(progress = progress, preschedule = FALSE) - cl <- parallel::makeCluster(ncores) - doSNOW::registerDoSNOW(cl) - boot <- foreach::foreach(i = seq_len(length(res_all)), .options.snow = opts) - gtfs_all <- foreach::`%dopar%`(boot, { - UK2GTFS:::transxchange_export_try(res_all[[i]], - cal = cal, - naptan = naptan_trim, - scotland = scotland, - try_mode = try_mode) - # setTxtProgressBar(pb, i) - }) - - parallel::stopCluster(cl) - rm(cl, boot, opts, pb, progress) + future::plan(future::multisession, workers = ncores) + gtfs_all <- furrr::future_map(.x = res_all, + .f = transxchange_export_try, + run_debug = TRUE, + cal = cal, + naptan = naptan, + scotland = scotland, + try_mode = try_mode, + .progress = TRUE) + future::plan(future::sequential) + + + # pb <- utils::txtProgressBar(min = 0, max = length(res_all), style = 3) + # progress <- function(n) utils::setTxtProgressBar(pb, n) + # opts <- list(progress = progress, preschedule = FALSE) + # cl <- parallel::makeCluster(ncores) + # doSNOW::registerDoSNOW(cl) + # boot <- foreach::foreach(i = seq_len(length(res_all)), .options.snow = opts) + # gtfs_all <- foreach::`%dopar%`(boot, { + # UK2GTFS:::transxchange_export_try(res_all[[i]], + # cal = cal, + # naptan = naptan_trim, + # scotland = scotland, + # try_mode = try_mode) + # # setTxtProgressBar(pb, i) + # }) + # + # parallel::stopCluster(cl) + # rm(cl, boot, opts, pb, progress) } unlink(file.path(tempdir(), "txc"), recursive = TRUE)