Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
prenner committed Jan 22, 2025
1 parent c9376ad commit c4a004e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,11 @@ def get_entries(
)

def get_entries_iter(
self, *, start_timestamp: int = 0, end_timestamp: int = 0, next_: str = ""
self,
*,
start_timestamp: int = 0,
end_timestamp: int = 0,
checkpointed_paging_url: str = "",
) -> t.Iterator[GetEntriesResponse]:
"""
A simple wrapper around get_entries to keep fetching until complete.
Expand All @@ -574,6 +578,7 @@ def get_entries_iter(
much of the data you have fetched. @see get_entries
"""
has_more = True
next_ = checkpointed_paging_url
while has_more:
result = self.get_entries(
start_timestamp=start_timestamp,
Expand Down
45 changes: 28 additions & 17 deletions python-threatexchange/threatexchange/exchanges/impl/ncmec_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,26 @@ class NCMECCheckpoint(
NCMEC IDs seem to stay around forever, so no need for is_stale()
"""

PAGING_URL_EXPIRATION = 12 * 60 * 60

# The biggest value of "to", and the next "from"
get_entries_max_ts: int
next_fetch: t.Optional[str] = ""
last_fetch_time: t.Optional[int] = 0
# A url to fetch the next page of results.
# Only reference this value through get_paging_url_if_recent
paging_url: str
# a timestamp for the last fetch time, specifically used with a paging_url.
# NCMEC suggests not storing paging_urls long term so we consider them invalid
# 12hr after the last_fetch_time
last_fetch_time: int

def get_progress_timestamp(self) -> t.Optional[int]:
return self.get_entries_max_ts

def get_paging_url_if_recent(self) -> str:
if int(time.time()) - self.last_fetch_time < self.PAGING_URL_EXPIRATION:
return self.paging_url
return ""

@classmethod
def from_ncmec_fetch(cls, response: api.GetEntriesResponse) -> "NCMECCheckpoint":
"""Synthesizes a checkpoint from the API response"""
Expand Down Expand Up @@ -242,18 +254,17 @@ def fetch_iter(
the cursor
"""
start_time = 0
next_fetch = ""
current_paging_url = ""
if checkpoint is not None:
start_time = checkpoint.get_entries_max_ts
next_fetch = checkpoint.next_fetch or ""
current_paging_url = checkpoint.get_paging_url_if_recent()
# Avoid being exactly at end time for updates showing up multiple
# times in the fetch, since entries are not ordered by time
end_time = int(time.time()) - 5

client = self.get_client(self.collab.environment)
# We could probably mutate start time and next, but new variable for clarity
# We could probably mutate start time, but new variable for clarity
current_start = start_time
current_next_fetch = next_fetch
# The range we are fetching
duration = end_time - current_start
# A counter for when we want to increase our duration
Expand Down Expand Up @@ -284,10 +295,12 @@ def log(event: str) -> None:
client.get_entries_iter(
start_timestamp=current_start,
end_timestamp=current_end,
next_=current_next_fetch,
checkpointed_paging_url=current_paging_url,
)
):
if i == 0: # First batch, check for overfetch
if (
i == 0 and not current_paging_url
): # First batch, check for overfetch when not using a checkpoint
if (
entry.estimated_entries_in_range > self.MAX_FETCH_SIZE
and duration > 1
Expand All @@ -313,20 +326,17 @@ def log(event: str) -> None:

updates.extend(entry.updates)

if i % 100 == 0:
# If we get down to one second, we can potentially be
# fetching an arbitrary large amount of data in one go,
# so store the checkpoint occasionally
log(f"large fetch ({i}), up to {len(updates)}. storing checkpoint")
if (i + 1) % 100 == 0:
# On large fetches, yield a checkpoint to avoid re-fetching later
log(f"large fetch ({i}), up to {len(updates)}. yielding checkpoint")
yield state.FetchDelta(
{f"{entry.member_id}-{entry.id}": entry for entry in updates},
NCMECCheckpoint(
get_entries_max_ts=current_start,
next_fetch=entry.next,
paging_url=entry.next,
last_fetch_time=int(time.time()),
),
)
current_next_fetch = entry.next
updates = []

else: # AKA a successful fetch
Expand All @@ -347,16 +357,17 @@ def log(event: str) -> None:
low_fetch_counter = 0
else: # Not too small, not too large, just right
low_fetch_counter = 0

current_paging_url = ""
yield state.FetchDelta(
{f"{entry.member_id}-{entry.id}": entry for entry in updates},
NCMECCheckpoint(
get_entries_max_ts=current_end,
next_fetch="",
paging_url=current_paging_url,
last_fetch_time=int(time.time()),
),
)
current_start = current_end
current_next_fetch = ""

@classmethod
def fetch_value_merge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def get_NCMECCheckpoint() -> t.Tuple[NCMECCheckpoint, t.Sequence[object]]:
max_ts = 1197433091

current = NCMECCheckpoint(
get_entries_max_ts=max_ts, next_fetch="", last_fetch_time=0
get_entries_max_ts=max_ts, paging_url="", last_fetch_time=0
)

# 1.0.x
Expand All @@ -159,7 +159,7 @@ class NCMECCheckpointWithoutNext(FetchCheckpointBase):
get_entries_max_ts: int =>
get_entries_max_ts: int
next_fetch: str
paging_url: str
last_fetch_time: int
"""

Expand Down

0 comments on commit c4a004e

Please sign in to comment.