Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]: Remote Caching with IngestionPipeline? #17583

Open
1 task done
davidrpugh opened this issue Jan 22, 2025 · 3 comments
Open
1 task done

[Question]: Remote Caching with IngestionPipeline? #17583

davidrpugh opened this issue Jan 22, 2025 · 3 comments
Labels
question Further information is requested

Comments

@davidrpugh
Copy link

Question Validation

  • I have searched both the documentation and discord for an answer.

Question

I am trying to implement an advanced document ingestion pipeline with a Redis cache using the examples found in the documentation.

https://docs.llamaindex.ai/en/stable/module_guides/loading/ingestion_pipeline/

Examining the source code for the ingestion pipeline

cache.put(hash, nodes, collection=cache_collection)

it would seem that all nodes are cached following each transformation. I am processing several thousand documents in my ingestion pipeline and am noticing that the Redis cache crashes when the number of documents processed during a single call of run or arun is too large. I believe this is because once the number of nodes gets sufficiently high the volume of data being put into the cache at once is too big for Redis to handle.

Is there a way to control the "batch" size of the nodes being processed so that I don't overflow the cache?

@davidrpugh davidrpugh added the question Further information is requested label Jan 22, 2025
Copy link

dosubot bot commented Jan 22, 2025

To control the batch size of nodes being processed in the IngestionPipeline and prevent Redis cache overflow, you can use the _node_batcher method. This method allows you to yield successive chunks of nodes, effectively managing the load and reducing the risk of cache overflow when processing a large number of documents.

Here's an example of how you can use the _node_batcher method:

def _node_batcher(
    num_batches: int, nodes: Union[Sequence[BaseNode], List[Document]]
) -> Generator[Union[Sequence[BaseNode], List[Document]], Any, Any]:
    """Yield successive n-sized chunks from lst."""
    batch_size = max(1, int(len(nodes) / num_batches))
    for i in range(0, len(nodes), batch_size):
        yield nodes[i : i + batch_size]

By determining the number of batches you want to process, you can divide your nodes into manageable chunks using _node_batcher. This approach allows you to control the batch size and process nodes incrementally, which can help prevent cache overflow [1].

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Other

@davidrpugh
Copy link
Author

@dosu this "private" method seems to be called only when using multiprocessing by setting num_workers > 1 which will divide the nodes equally across the number of available CPUs which doesn't help much it the number of documents is much larger than the number of CPUs available. I also don't think this will work with an asynchronous IngestionPipeline.

Is there a way to configure the IngestionPipeline to batch the documents before applying transformations?

Copy link

dosubot bot commented Jan 22, 2025

Yes, the IngestionPipeline can be configured to batch documents before applying transformations, especially when dealing with a large number of documents and using an asynchronous pipeline. This is achieved using the _node_batcher method, which divides the nodes into batches. The arun method then processes these batches in parallel using a ProcessPoolExecutor if num_workers is specified and greater than 1 [1].

Here's a relevant excerpt from the code:

@staticmethod
def _node_batcher(
    num_batches: int, nodes: Union[Sequence[BaseNode], List[Document]]
) -> Generator[Union[Sequence[BaseNode], List[Document]], Any, Any]:
    """Yield successive n-sized chunks from lst."""
    batch_size = max(1, int(len(nodes) / num_batches))
    for i in range(0, len(nodes), batch_size):
        yield nodes[i : i + batch_size]

@dispatcher.span
async def arun(
    ...
    num_workers: Optional[int] = None,
    ...
) -> Sequence[BaseNode]:
    ...
    if num_workers and num_workers > 1:
        ...
        with ProcessPoolExecutor(max_workers=num_workers) as p:
            node_batches = self._node_batcher(
                num_batches=num_workers, nodes=nodes_to_run
            )
            tasks = [
                loop.run_in_executor(
                    p,
                    partial(
                        arun_transformations_wrapper,
                        transformations=self.transformations,
                        in_place=in_place,
                        cache=self.cache if not self.disable_cache else None,
                        cache_collection=cache_collection,
                    ),
                    batch,
                )
                for batch in node_batches
            ]
            result: Sequence[Sequence[BaseNode]] = await asyncio.gather(*tasks)
            nodes: Sequence[BaseNode] = reduce(lambda x, y: x + y, result, [])  # type: ignore
    ...

This setup allows the pipeline to handle large numbers of documents efficiently by processing them in parallel batches.

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Other

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant