Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-2011: New parameter 'progress_estimate' for sync progress notification #7124

Merged
merged 24 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d53c56d
* Add new field 'progress_estimate'
kiburtse Nov 9, 2023
6b8aaf4
* Add parsing support to protocol codec for repurposed progress estim…
kiburtse Jan 24, 2024
9df7df8
Add object-store based test for progress notifications
kiburtse Jan 31, 2024
44155a9
Remove noop progress invocation in object-store api and simplify inte…
kiburtse Jan 22, 2024
e5a654e
Pass through progress estimate from client impl to notifier and call
kiburtse Feb 1, 2024
75b50d2
Move double value extraction into parser
kiburtse Mar 1, 2024
d7a426d
Clean up download message parsed data structs
kiburtse Mar 1, 2024
1fcdfda
Simplify tracking of transient downloaded bytes
kiburtse Mar 1, 2024
ad7df00
Calculate client-side progress from the start of the upload/download
kiburtse Mar 6, 2024
0dcc103
try to workaround tsan failure once again
kiburtse Mar 7, 2024
5ef4ea5
Merge tag 'v13.27.0' into kb/sync_progress_estimate_api
kiburtse Mar 7, 2024
b7550fc
Merge branch 'master' into kb/sync_progress_estimate_api
kiburtse Mar 19, 2024
9ffee29
Support all floats in message parsing, include changesets into downlo…
kiburtse Mar 19, 2024
a078d66
Don't rely on download/upload completion for starting estimate calcul…
kiburtse Mar 19, 2024
d50d223
Refactor test with sections and cleaner checks, add explanations
kiburtse Mar 19, 2024
16d3a86
Address a few more comments
kiburtse Mar 19, 2024
dd350a4
Add temporal supporession
kiburtse Mar 19, 2024
9833ac4
Minor fix for initial bytes after client reset
kiburtse Mar 19, 2024
4207c39
Remove some leftovers and fix typos
kiburtse Mar 19, 2024
934cb42
Add comment for the effect of progress estimate value calculation and
kiburtse Mar 21, 2024
ad2acce
Address comments, improve code comments
kiburtse Mar 21, 2024
6585ddc
Merge branch 'master' into kb/sync_progress_estimate_api
kiburtse Mar 21, 2024
d1ae9fb
Merge branch 'master' into kb/sync_progress_estimate_api
kiburtse Mar 22, 2024
5c058bf
Add changelog entry
kiburtse Mar 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bindgen/spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,7 @@ classes:
methods:
start: '(callback: AsyncCallback<(realm: Nullable<ThreadSafeReference>, error: Nullable<std::exception_ptr>) off_thread>)'
cancel: ()
register_download_progress_notifier: '(callback: (transferred_bytes: uint64_t, transferrable_bytes: uint64_t) off_thread) -> uint64_t'
register_download_progress_notifier: '(callback: (transferred_bytes: uint64_t, transferrable_bytes: uint64_t, progress_estimate: double) off_thread) -> uint64_t'
unregister_download_progress_notifier: '(token: uint64_t)'

SyncSession:
Expand All @@ -1357,7 +1357,7 @@ classes:
methods:
wait_for_upload_completion: '(callback: AsyncCallback<(err: Status) off_thread>)'
wait_for_download_completion: '(callback: AsyncCallback<(err: Status) off_thread>)'
register_progress_notifier: '(callback: (transferred_bytes: uint64_t, transferrable_bytes: uint64_t) off_thread, direction: ProgressDirection, is_streaming: bool) -> uint64_t'
register_progress_notifier: '(callback: (transferred_bytes: uint64_t, transferrable_bytes: uint64_t, progress_estimate: double) off_thread, direction: ProgressDirection, is_streaming: bool) -> uint64_t'
unregister_progress_notifier: '(token: uint64_t)'
register_connection_change_callback: '(callback: (old_state: SyncSessionConnectionState, new_state: SyncSessionConnectionState) off_thread) -> uint64_t'
unregister_connection_change_callback: '(token: uint64_t)'
Expand Down
2 changes: 1 addition & 1 deletion src/realm.h
Original file line number Diff line number Diff line change
Expand Up @@ -3567,7 +3567,7 @@ typedef void (*realm_sync_connection_state_changed_func_t)(realm_userdata_t user
realm_sync_connection_state_e old_state,
realm_sync_connection_state_e new_state);
typedef void (*realm_sync_progress_func_t)(realm_userdata_t userdata, uint64_t transferred_bytes,
uint64_t total_bytes);
uint64_t total_bytes, double progress_estimate);
typedef void (*realm_sync_error_handler_func_t)(realm_userdata_t userdata, realm_sync_session_t*,
const realm_sync_error_t);
typedef bool (*realm_sync_ssl_verify_func_t)(realm_userdata_t userdata, const char* server_address, short server_port,
Expand Down
12 changes: 6 additions & 6 deletions src/realm/object-store/c_api/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,9 +729,9 @@ realm_async_open_task_register_download_progress_notifier(realm_async_open_task_
realm_userdata_t userdata,
realm_free_userdata_func_t userdata_free) noexcept
{
auto cb = [notifier, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))](uint64_t transferred,
uint64_t transferrable) {
notifier(userdata.get(), transferred, transferrable);
auto cb = [notifier, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))](
uint64_t transferred, uint64_t transferrable, double progress_estimate) {
notifier(userdata.get(), transferred, transferrable, progress_estimate);
};
auto token = (*task)->register_download_progress_notifier(std::move(cb));
return new realm_async_open_task_progress_notification_token_t{(*task), token};
Expand Down Expand Up @@ -818,9 +818,9 @@ RLM_API realm_sync_session_connection_state_notification_token_t* realm_sync_ses
bool is_streaming, realm_userdata_t userdata, realm_free_userdata_func_t userdata_free) noexcept
{
std::function<realm::SyncSession::ProgressNotifierCallback> cb =
[notifier, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))](uint64_t transferred,
uint64_t transferrable) {
notifier(userdata.get(), transferred, transferrable);
[notifier, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))](
uint64_t transferred, uint64_t transferrable, double progress_estimate) {
notifier(userdata.get(), transferred, transferrable, progress_estimate);
};
auto token = (*session)->register_progress_notifier(std::move(cb), SyncSession::ProgressDirection(direction),
is_streaming);
Expand Down
5 changes: 2 additions & 3 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ void AsyncOpenTask::cancel()
}
}

uint64_t
AsyncOpenTask::register_download_progress_notifier(std::function<SyncSession::ProgressNotifierCallback>&& callback)
uint64_t AsyncOpenTask::register_download_progress_notifier(std::function<ProgressNotifierCallback>&& callback)
{
util::CheckedLockGuard lock(m_mutex);
if (m_session) {
auto token = m_session->register_progress_notifier(std::move(callback),
SyncSession::ProgressDirection::download, false);
SyncSession::ProgressDirection::download, true);
kiburtse marked this conversation as resolved.
Show resolved Hide resolved
kiburtse marked this conversation as resolved.
Show resolved Hide resolved
m_registered_callbacks.emplace_back(token);
return token;
}
Expand Down
6 changes: 4 additions & 2 deletions src/realm/object-store/sync/async_open_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ class AsyncOpenTask : public std::enable_shared_from_this<AsyncOpenTask> {
// Cancels the download and stops the session. No further functions should be called on this class.
void cancel() REQUIRES(!m_mutex);

uint64_t register_download_progress_notifier(
std::function<void(uint64_t transferred_bytes, uint64_t transferrable_bytes)>&& callback) REQUIRES(!m_mutex);
using ProgressNotifierCallback = void(uint64_t transferred_bytes, uint64_t transferrable_bytes,
double progress_estimate);
uint64_t register_download_progress_notifier(std::function<ProgressNotifierCallback>&& callback)
REQUIRES(!m_mutex);
void unregister_download_progress_notifier(uint64_t token) REQUIRES(!m_mutex);

private:
Expand Down
43 changes: 22 additions & 21 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -856,9 +856,11 @@ void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status erro
}

void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded,
uint64_t uploadable, uint64_t download_version, uint64_t snapshot_version)
uint64_t uploadable, uint64_t snapshot_version, double download_estimate,
double upload_estimate)
{
m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, download_version, snapshot_version);
m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, snapshot_version, download_estimate,
upload_estimate);
}

static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config,
Expand Down Expand Up @@ -991,10 +993,11 @@ void SyncSession::create_sync_session()
// Set up the wrapped progress handler callback
m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
uint_fast64_t uploaded, uint_fast64_t uploadable,
uint_fast64_t progress_version, uint_fast64_t snapshot_version) {
uint_fast64_t snapshot_version, double download_estimate,
double upload_estimate) {
if (auto self = weak_self.lock()) {
self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, progress_version,
snapshot_version);
self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, snapshot_version,
download_estimate, upload_estimate);
}
});

Expand Down Expand Up @@ -1235,10 +1238,9 @@ void SyncSession::initiate_access_token_refresh()
}
}

void SyncSession::add_completion_callback(util::UniqueFunction<void(Status)> callback,
_impl::SyncProgressNotifier::NotifierType direction)
void SyncSession::add_completion_callback(util::UniqueFunction<void(Status)> callback, ProgressDirection direction)
{
bool is_download = (direction == _impl::SyncProgressNotifier::NotifierType::download);
bool is_download = (direction == ProgressDirection::download);

m_completion_request_counter++;
m_completion_callbacks.emplace_hint(m_completion_callbacks.end(), m_completion_request_counter,
Expand Down Expand Up @@ -1575,16 +1577,13 @@ void SyncProgressNotifier::unregister_callback(uint64_t token)
}

void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
uint64_t download_version, uint64_t snapshot_version)
uint64_t snapshot_version, double download_estimate, double upload_estimate)
{
// Ignore progress messages from before we first receive a DOWNLOAD message
if (download_version == 0)
return;

std::vector<util::UniqueFunction<void()>> invocations;
{
std::lock_guard<std::mutex> lock(m_mutex);
m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded, snapshot_version};
m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded,
upload_estimate, download_estimate, snapshot_version};

for (auto it = m_packages.begin(); it != m_packages.end();) {
bool should_delete = false;
Expand All @@ -1608,13 +1607,15 @@ SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current
{
uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
if (!is_streaming) {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (!is_download && snapshot_version > current_progress.snapshot_version)
return [] {};
double progress_estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate;

// If the sync client has not yet processed all of the local
jbreams marked this conversation as resolved.
Show resolved Hide resolved
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (!is_download && snapshot_version > current_progress.snapshot_version)
return [] {};

if (!is_streaming) {
// The initial download size we get from the server is the uncompacted
// size, and so the download may complete before we actually receive
// that much data. When that happens, transferrable will drop and we
Expand All @@ -1628,7 +1629,7 @@ SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current
// as were originally considered transferrable.
is_expired = !is_streaming && transferred >= transferrable;
return [=, notifier = notifier] {
notifier(transferred, transferrable);
notifier(transferred, transferrable, progress_estimate);
};
}

Expand Down
11 changes: 7 additions & 4 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ struct SyncClient;
class SyncProgressNotifier {
public:
enum class NotifierType { upload, download };
using ProgressNotifierCallback = void(uint64_t transferred_bytes, uint64_t transferrable_bytes);
using ProgressNotifierCallback = void(uint64_t transferred_bytes, uint64_t transferrable_bytes,
double progress_estimate);

uint64_t register_callback(std::function<ProgressNotifierCallback>, NotifierType direction, bool is_streaming);
void unregister_callback(uint64_t);

void set_local_version(uint64_t);
void update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable, uint64_t,
uint64_t);
void update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
uint64_t snapshot_version, double download_estimate = 1.0, double upload_estimate = 1.0);

private:
mutable std::mutex m_mutex;
Expand All @@ -70,6 +71,8 @@ class SyncProgressNotifier {
uint64_t downloadable;
uint64_t uploaded;
uint64_t downloaded;
double upload_estimate;
double download_estimate;
uint64_t snapshot_version;
};

Expand Down Expand Up @@ -394,7 +397,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void cancel_pending_waits(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex);
enum class ShouldBackup { yes, no };
void update_error_and_mark_file_for_deletion(SyncError&, ShouldBackup) REQUIRES(m_state_mutex, !m_config_mutex);
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t);
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, double, double);
void handle_new_flx_sync_query(int64_t version);

void nonsync_transact_notify(VersionID::version_type) REQUIRES(!m_state_mutex);
Expand Down
Loading
Loading