Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] ncmec: store checkpoint occasionally when start, end diff is one second #1731

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
Simple implementation for the NCMEC hash sharing XML API

You can find the complete documentation at
You can find the complete documentation at
https://report.cybertip.org/hashsharing/v2/documentation.pdf
"""

Expand Down Expand Up @@ -565,7 +565,11 @@ def get_entries(
)

def get_entries_iter(
self, *, start_timestamp: int = 0, end_timestamp: int = 0
self,
*,
start_timestamp: int = 0,
end_timestamp: int = 0,
checkpointed_paging_url: str = "",
) -> t.Iterator[GetEntriesResponse]:
"""
A simple wrapper around get_entries to keep fetching until complete.
Expand All @@ -574,7 +578,7 @@ def get_entries_iter(
much of the data you have fetched. @see get_entries
"""
has_more = True
next_ = ""
next_ = checkpointed_paging_url
while has_more:
result = self.get_entries(
start_timestamp=start_timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,45 @@ class NCMECCheckpoint(
NCMEC IDs seem to stay around forever, so no need for is_stale()
"""

PAGING_URL_EXPIRATION = 12 * 60 * 60

# The biggest value of "to", and the next "from"
get_entries_max_ts: int
# A url to fetch the next page of results
# Only reference this value through get_paging_url_if_recent
paging_url: str = ""
# a timestamp for the last fetch time, specifically used with a pagingpyth_url
# NCMEC suggests not storing paging_urls long term so we consider them invalid
# 12hr after the last_fetch_time
last_fetch_time: int = field(hash=True, default_factory=lambda: int(time.time()))

def get_progress_timestamp(self) -> t.Optional[int]:
return self.get_entries_max_ts

def get_paging_url_if_recent(self) -> str:
if int(time.time()) - self.last_fetch_time < self.PAGING_URL_EXPIRATION:
return self.paging_url
return ""

@classmethod
def from_ncmec_fetch(cls, response: api.GetEntriesResponse) -> "NCMECCheckpoint":
"""Synthesizes a checkpoint from the API response"""
return cls(response.max_timestamp)
return cls(response.max_timestamp, response.next, int(time.time()))
prenner marked this conversation as resolved.
Show resolved Hide resolved

def __setstate__(self, d: t.Dict[str, t.Any]) -> None:
"""Implemented for pickle version compatibility."""
# 0.99.0 => 1.0.0:
### field 'max_timestamp' renamed to 'get_entries_max_ts'
if "max_timestamp" in d:
d["get_entries_max_ts"] = d.pop("max_timestamp")

# 1.0.0 => 1.2.3:
# Add last_fetch_time
# note: the default_factory value was not being set correctly when
# reading from pickle
if not "last_fetch_time" in d:
d["last_fetch_time"] = int(time.time())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was getting AttributeError: 'NCMECCheckpoint' object has no attribute 'last_fetch_time' without this in the test_state_compatibility test

seems sort of related to pydantic/pydantic#7821, since default was working (but wouldn't work if we want to set it to the current time)


self.__dict__ = d


Expand Down Expand Up @@ -240,8 +262,10 @@ def fetch_iter(
the cursor
"""
start_time = 0
current_paging_url = ""
if checkpoint is not None:
start_time = checkpoint.get_entries_max_ts
current_paging_url = checkpoint.get_paging_url_if_recent()
# Avoid being exactly at end time for updates showing up multiple
# times in the fetch, since entries are not ordered by time
end_time = int(time.time()) - 5
Expand Down Expand Up @@ -277,10 +301,14 @@ def log(event: str) -> None:
updates: t.List[api.NCMECEntryUpdate] = []
for i, entry in enumerate(
client.get_entries_iter(
start_timestamp=current_start, end_timestamp=current_end
start_timestamp=current_start,
end_timestamp=current_end,
checkpointed_paging_url=current_paging_url,
)
):
if i == 0: # First batch, check for overfetch
if (
i == 0 and not current_paging_url
): # First batch, check for overfetch when not using a checkpoint
if (
entry.estimated_entries_in_range > self.MAX_FETCH_SIZE
and duration > 1
Expand All @@ -303,12 +331,22 @@ def log(event: str) -> None:
# Our entry estimatation (based on the cursor parameters)
# occasionally seem to over-estimate
log(f"est {entry.estimated_entries_in_range} entries")
elif i % 100 == 0:
# If we get down to one second, we can potentially be
# fetching an arbitrary large amount of data in one go,
# so log something occasionally
log(f"large fetch ({i}), up to {len(updates)}")

updates.extend(entry.updates)

if (i + 1) % 100 == 0:
# On large fetches, yield a checkpoint to avoid re-fetching later
log(f"large fetch ({i}), up to {len(updates)}. yielding checkpoint")
yield state.FetchDelta(
{f"{entry.member_id}-{entry.id}": entry for entry in updates},
NCMECCheckpoint(
get_entries_max_ts=current_start,
paging_url=entry.next,
last_fetch_time=int(time.time()),
),
)
updates = []

else: # AKA a successful fetch
# If we're hovering near the single-fetch limit for a period
# of time, we can likely safely expand our range.
Expand All @@ -327,9 +365,15 @@ def log(event: str) -> None:
low_fetch_counter = 0
else: # Not too small, not too large, just right
low_fetch_counter = 0

current_paging_url = ""
yield state.FetchDelta(
{f"{entry.member_id}-{entry.id}": entry for entry in updates},
NCMECCheckpoint(current_end),
NCMECCheckpoint(
get_entries_max_ts=current_end,
paging_url=current_paging_url,
last_fetch_time=int(time.time()),
),
)
current_start = current_end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

In the test cases, the class descriptions will be used for context, so
include:
1. The last version it was available
1. The last version it was available
2. The change

i.e.
Expand All @@ -24,12 +24,13 @@ class SignalOpinionOwnerRemoved:
'''
owner: int
category: SignalOpinionCategory
tags: t.Set[str]
tags: t.Set[str]
"""

import copy
from dataclasses import dataclass, field
import pickle
import time
import typing as t

import pytest
Expand Down Expand Up @@ -147,8 +148,25 @@ def get_NCMECCheckpoint() -> t.Tuple[NCMECCheckpoint, t.Sequence[object]]:
## Current
max_ts = 1197433091

current = NCMECCheckpoint(
get_entries_max_ts=max_ts,
paging_url="",
last_fetch_time=int(time.time()),
)

# 1.0.x
current = NCMECCheckpoint(get_entries_max_ts=max_ts)
@dataclass
class NCMECCheckpointWithoutNext(FetchCheckpointBase):
"""
0.99.x => 1.2.3

get_entries_max_ts: int =>
get_entries_max_ts: int
paging_url: str
last_fetch_time: int
"""

get_entries_max_ts: int

# 0.99.x
@dataclass
Expand All @@ -161,23 +179,28 @@ class NCMECCheckpointTsMoved(FetchCheckpointBase):

max_timestamp: int

checkpoint_without_next = NCMECCheckpointWithoutNext(get_entries_max_ts=max_ts)
ts_moved = NCMECCheckpointTsMoved(max_timestamp=max_ts)

return (current, [ts_moved])
return (current, [checkpoint_without_next, ts_moved])


@pytest.mark.parametrize(
("current_version", "historical_versions"),
"get_checkpoint_func",
[
get_SignalOpinion(),
get_FBThreatExchangeOpinion(),
get_NCMECOpinion(),
get_NCMECCheckpoint(),
get_SignalOpinion,
get_FBThreatExchangeOpinion,
get_NCMECOpinion,
get_NCMECCheckpoint,
],
)
def test_previous_pickle_state(
current_version: object, historical_versions: t.Sequence[object]
get_checkpoint_func: t.Callable[[], t.Tuple[object, t.Sequence[object]]],
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setattr("time.time", lambda: 10**8)

current_version, historical_versions = get_checkpoint_func()
# Sanity
serialized = pickle.dumps(current_version)
assert (
Expand Down
Loading