Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction Behavior and Incremental Data Visibility Issues in Hudi MOR Bucketed Tables #12619

Open
lijintao-by opened this issue Jan 11, 2025 · 2 comments

Comments

@lijintao-by
Copy link

We encountered the following three issues when using Hudi MOR bucketed tables:

  1. After synchronizing historical data using Spark's insert_bulk mode, we started a Flink task in upsert mode to write incremental data. We found that the compaction operation could only complete when each bucket had data. Additionally, after the compaction operation was completed, each bucket contained only one file.
  2. When querying the Hudi table through Hive, we found that only the data after compaction was readable. If the number of buckets is large, the time taken for compaction significantly delays the availability of data.
  3. After compaction, each bucket contains only one file, and historical files are cleaned up. This happens even though we have configured the following cleaning policies:
    options.put("hoodie.clean.automatic", "true");
    options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
    options.put("hoodie.cleaner.commits.retained", "5");
    options.put("hoodie.clean.async", "true");
    We would like to get answers to the following questions:
  4. Does the compaction operation have to wait until all buckets have files before it can complete?
  5. Is it expected behavior that Hive can only read data after the compaction operation is completed?
  6. After compaction, is it expected that each bucket contains only one file? Is there a way to retain more historical files?

To Reproduce
Steps to reproduce:

  1. Use Spark in insert_bulk mode to write historical data into the Hudi MOR bucketed table.
  2. Start Flink in upsert mode to incrementally write new data.
  3. After incremental data is written, trigger the compaction operation.
  4. Use Hive to query the Hudi table and find that only data after compaction is readable.
  5. Check the file storage and find that only one file is retained in each bucket, and historical files are cleaned up.
    Flink-related parameters:
    options.put("hoodie.write.concurrency.mode","optimistic_concurrency_control" );
    options.put("hoodie.upsert.shuffle.parallelism", "20");
    options.put("hoodie.insert.shuffle.parallelism", "20");
    options.put("write.operation", "upsert");
    options.put("write.tasks", "2");

options.put("index.type","BUCKET");
options.put("hoodie.bucket.index.num.buckets","10");
options.put("hoodie.index.bucket.engine","SIMPLE");

options.put("hoodie.clean.automatic", "true");
options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
options.put("hoodie.cleaner.commits.retained", "5");
options.put("hoodie.clean.async", "true");
options.put("hoodie.archive.min.commits", "20");
options.put("hoodie.archive.max.commits", "30");
options.put("hoodie.clean.parallelism", "20");
options.put("hoodie.archive.parallelism", "20");

options.put("hoodie.compact.inline", "false");
options.put("hoodie.compact.inline.max.delta.commits", "1");
options.put("hoodie.compact.schedule.inline", "true");

Expected behavior

  1. Can the compaction operation be executed without relying on all buckets having files? Is there any configuration to optimize this behavior?
  2. Is it possible for Hive to read incremental data from the Hudi table without waiting for the compaction to be completed?
  3. After the compaction is completed, is it possible to retain more historical files instead of having only one file per bucket?
  4. There is a table with 3 billion records and 300 buckets. The Flink job runs normally, but the compaction status remains "INFLIGHT".

Environment Description
● Hudi version: 0.14.0
● Spark version: 3.2.1
● Hive version: 3.1.2
● Hadoop version: 3.2.2
● Storage: HDFS
● Running on Docker?: No

Additional context

  1. In the scenario where historical data is written using Spark and incremental data is written using Flink, the following features are required:
  2. Faster visibility of incremental data to reduce data latency.
  3. Retention of multiple historical files after compaction to enable more flexible historical queries and failure recovery.

Stacktrace
There are no specific error logs; the issue is a question about functional behavior.

@danny0405
Copy link
Contributor

Does the compaction operation have to wait until all buckets have files before it can complete?

Not really, it depends on what the compaction plan looks like, currently the plan scheduler would scan all the partitions for all the buckets to see if there is any log file in the latest file slice, by default the pllan include all the log files.

Is it expected behavior that Hive can only read data after the compaction operation is completed?

It depends on what kind of inputformat you configured, here is a doc on how to query with Hive in Chinese: https://www.yuque.com/yuzhao-my9fz/kb/kgv2rb?singleDoc# 《Hive On Hudi》

After compaction, is it expected that each bucket contains only one file? Is there a way to retain more historical files?

If the compaction is triggered by a separate job, you may need to check the clean options specifically.

BTW, the bulk_insert write parquets in your job right?

@TheR1sing3un
Copy link
Member

TheR1sing3un commented Jan 13, 2025

  1. After compaction, is it expected that each bucket contains only one file? Is there a way to retain more historical files?

The compaction job simply creates a new file group version for the file groups in the compaction plan, and the retention of the file group version depends on the policy of the clean job. Your current clean policy is KEEP_LATEST_COMMITS , which is to make a clean judgment based on the number of commit. I guess what you need is KEEP_LATEST_FILE_VERSIONS , which is to reserve the file group version, please refer to the official document: https://hudi.apache.org/docs/cleaning for details

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Awaiting Triage
Development

No branches or pull requests

4 participants