Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] ncmec: store checkpoint occasionally when start, end diff is one second #1731

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
Simple implementation for the NCMEC hash sharing XML API

You can find the complete documentation at
You can find the complete documentation at
https://report.cybertip.org/hashsharing/v2/documentation.pdf
"""

Expand Down Expand Up @@ -565,7 +565,7 @@ def get_entries(
)

def get_entries_iter(
self, *, start_timestamp: int = 0, end_timestamp: int = 0
self, *, start_timestamp: int = 0, end_timestamp: int = 0, next_: str = ""
prenner marked this conversation as resolved.
Show resolved Hide resolved
) -> t.Iterator[GetEntriesResponse]:
"""
A simple wrapper around get_entries to keep fetching until complete.
Expand All @@ -574,7 +574,6 @@ def get_entries_iter(
much of the data you have fetched. @see get_entries
"""
has_more = True
next_ = ""
while has_more:
result = self.get_entries(
start_timestamp=start_timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ class NCMECCheckpoint(

# The biggest value of "to", and the next "from"
get_entries_max_ts: int
next_fetch: t.Optional[str] = ""
prenner marked this conversation as resolved.
Show resolved Hide resolved
last_fetch_time: t.Optional[int] = 0
prenner marked this conversation as resolved.
Show resolved Hide resolved

def get_progress_timestamp(self) -> t.Optional[int]:
return self.get_entries_max_ts

@classmethod
def from_ncmec_fetch(cls, response: api.GetEntriesResponse) -> "NCMECCheckpoint":
"""Synthesizes a checkpoint from the API response"""
return cls(response.max_timestamp)
return cls(response.max_timestamp, response.next, int(time.time()))
prenner marked this conversation as resolved.
Show resolved Hide resolved

def __setstate__(self, d: t.Dict[str, t.Any]) -> None:
"""Implemented for pickle version compatibility."""
Expand Down Expand Up @@ -240,15 +242,18 @@ def fetch_iter(
the cursor
"""
start_time = 0
next_fetch = ""
prenner marked this conversation as resolved.
Show resolved Hide resolved
if checkpoint is not None:
start_time = checkpoint.get_entries_max_ts
next_fetch = checkpoint.next_fetch or ""
# Avoid being exactly at end time for updates showing up multiple
# times in the fetch, since entries are not ordered by time
end_time = int(time.time()) - 5

client = self.get_client(self.collab.environment)
# We could probably mutate start time, but new variable for clarity
# We could probably mutate start time and next, but new variable for clarity
current_start = start_time
current_next_fetch = next_fetch
# The range we are fetching
duration = end_time - current_start
# A counter for when we want to increase our duration
Expand Down Expand Up @@ -277,7 +282,9 @@ def log(event: str) -> None:
updates: t.List[api.NCMECEntryUpdate] = []
for i, entry in enumerate(
client.get_entries_iter(
start_timestamp=current_start, end_timestamp=current_end
start_timestamp=current_start,
end_timestamp=current_end,
next_=current_next_fetch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: Danger! It's actually very easy to mess up this argument and accidentally trigger and endless loop. It may be that you have done so in the current code, but it's hard to tell.

The only time current_next_fetch should be populated is when you are resuming from checkpoint, and you need to explicitly disable the overfetch check (L290) then.

There might be a refactoring of this code that makes this easier, or now that we are switching over to the next pointer version we can get rid of the probing behavior, which simplifies the implementation quite a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah as I mentioned in slack looks like we need the probing behavior so I wasn't able to simplify. I added a check to disable the overfetch when resuming from a checkpoint

)
):
if i == 0: # First batch, check for overfetch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a comment, it turns out my implementation for estimation of the entries in range was completely off, and so this is basically always overly cautious. Not sure what to do about it, since the alternatives that I can think of are complicated.

Expand All @@ -303,12 +310,25 @@ def log(event: str) -> None:
# Our entry estimatation (based on the cursor parameters)
# occasionally seem to over-estimate
log(f"est {entry.estimated_entries_in_range} entries")
elif i % 100 == 0:

updates.extend(entry.updates)

if i % 100 == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: by change this from elif to if, I think it will now print the large update warning every update, which is incorrect, no?

Copy link
Contributor Author

@prenner prenner Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would print for the 0th, which we would not want. I updated this to be (i + 1) % 100 == 0, so it's every 100th iteration

we need to extend updates everytime, regardless of i, so this was cleaner than other things I thought of
but please suggest alternatives

# If we get down to one second, we can potentially be
# fetching an arbitrary large amount of data in one go,
# so log something occasionally
log(f"large fetch ({i}), up to {len(updates)}")
updates.extend(entry.updates)
# so store the checkpoint occasionally
log(f"large fetch ({i}), up to {len(updates)}. storing checkpoint")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You don't actually store the checkpoint by yielding, technically the caller can decide whether to keep calling or store.

Copy link
Contributor Author

@prenner prenner Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah so the original elif block doesn't need to change? the only real change that's needed is to use the next_url in the for loop on L283?

edit: I think the yield is still needed, just the comment might be incorrect.. let me know if not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the comment 👍

yield state.FetchDelta(
{f"{entry.member_id}-{entry.id}": entry for entry in updates},
NCMECCheckpoint(
get_entries_max_ts=current_start,
next_fetch=entry.next,
last_fetch_time=int(time.time()),
),
)
current_next_fetch = entry.next
prenner marked this conversation as resolved.
Show resolved Hide resolved
updates = []

else: # AKA a successful fetch
# If we're hovering near the single-fetch limit for a period
# of time, we can likely safely expand our range.
Expand All @@ -329,9 +349,14 @@ def log(event: str) -> None:
low_fetch_counter = 0
yield state.FetchDelta(
{f"{entry.member_id}-{entry.id}": entry for entry in updates},
NCMECCheckpoint(current_end),
NCMECCheckpoint(
get_entries_max_ts=current_end,
next_fetch="",
last_fetch_time=int(time.time()),
),
)
current_start = current_end
current_next_fetch = ""

@classmethod
def fetch_value_merge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

In the test cases, the class descriptions will be used for context, so
include:
1. The last version it was available
1. The last version it was available
2. The change

i.e.
Expand All @@ -24,7 +24,7 @@ class SignalOpinionOwnerRemoved:
'''
owner: int
category: SignalOpinionCategory
tags: t.Set[str]
tags: t.Set[str]
"""

import copy
Expand Down Expand Up @@ -147,8 +147,23 @@ def get_NCMECCheckpoint() -> t.Tuple[NCMECCheckpoint, t.Sequence[object]]:
## Current
max_ts = 1197433091

current = NCMECCheckpoint(
get_entries_max_ts=max_ts, next_fetch="", last_fetch_time=0
)

# 1.0.x
current = NCMECCheckpoint(get_entries_max_ts=max_ts)
@dataclass
class NCMECCheckpointWithoutNext(FetchCheckpointBase):
"""
0.99.x => 1.0.0
prenner marked this conversation as resolved.
Show resolved Hide resolved

get_entries_max_ts: int =>
get_entries_max_ts: int
next_fetch: str
last_fetch_time: int
"""

get_entries_max_ts: int
prenner marked this conversation as resolved.
Show resolved Hide resolved

# 0.99.x
@dataclass
Expand All @@ -161,9 +176,10 @@ class NCMECCheckpointTsMoved(FetchCheckpointBase):

max_timestamp: int

checkpoint_without_next = NCMECCheckpointWithoutNext(get_entries_max_ts=max_ts)
ts_moved = NCMECCheckpointTsMoved(max_timestamp=max_ts)

return (current, [ts_moved])
return (current, [checkpoint_without_next, ts_moved])


@pytest.mark.parametrize(
Expand Down
Loading