Skip to content

Commit

Permalink
[HUDI-8811] Handle Kafka source configuration-related failures with s…
Browse files Browse the repository at this point in the history
…pecific exception handling (apache#12569)

Co-authored-by: Vamsi <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
  • Loading branch information
3 people authored Jan 13, 2025
1 parent 96ab46b commit c3a7880
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
Expand All @@ -31,6 +32,8 @@
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.OffsetRange;
Expand Down Expand Up @@ -75,6 +78,11 @@ protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> lastCheckpoint, lo
lastCheckpoint, sourceLimit));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
} catch (KafkaException ex) {
if (hasConfigException(ex)) {
throw new HoodieReadFromSourceException("kafka source config issue ", ex);
}
throw ex;
}
}

Expand Down Expand Up @@ -126,4 +134,16 @@ public void onCommit(String lastCkptStr) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}

private boolean hasConfigException(Throwable e) {
if (e == null) {
return false;
}

if (e instanceof ConfigException || e instanceof io.confluent.common.config.ConfigException) {
return true;
}

return hasConfigException(e.getCause());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
Expand Down Expand Up @@ -64,6 +65,7 @@
import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
Expand Down Expand Up @@ -153,6 +155,30 @@ private Properties getProducerProperties() {
return props;
}

@Test
void testKafkaSource_InvalidHostException() throws IOException {
UtilitiesTestBase.Helpers.saveStringsToDFS(
new String[] {dataGen.generateGenericRecord().getSchema().toString()}, hoodieStorage(),
SCHEMA_PATH);
final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");

props.put("hoodie.streamer.schemaprovider.source.schema.file", SCHEMA_PATH);
SchemaProvider schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());

AvroKafkaSource avroSourceWithConfluentConfigException = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
// this should throw io.confluent.common.config.ConfigException because of missing `schema.registry.url` config
assertThrows(HoodieReadFromSourceException.class, () -> avroSourceWithConfluentConfigException.readFromCheckpoint(Option.empty(), Long.MAX_VALUE));

props.setProperty("schema.registry.url", "schema-registry-url");
// add invalid brokers address in the props
props.setProperty("bootstrap.servers", "unknownhost");
AvroKafkaSource avroSourceWithKafkaConfiException = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
// this should throw org.apache.kafka.common.config.ConfigException because of invalid kafka broker address
assertThrows(HoodieReadFromSourceException.class, () -> avroSourceWithKafkaConfiException.readFromCheckpoint(Option.empty(), Long.MAX_VALUE));
}

@Test
public void testAppendKafkaOffsets() throws IOException {
UtilitiesTestBase.Helpers.saveStringsToDFS(
Expand Down

0 comments on commit c3a7880

Please sign in to comment.