Skip to content

Commit

Permalink
Throw exception if Dataflow template file writing fails.
Browse files Browse the repository at this point in the history
  • Loading branch information
baeminbo committed Jan 21, 2025
1 parent 5e5e147 commit ea01ea0
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 126 deletions.
4 changes: 2 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@

## Bugfixes

* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)).
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)).
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))
* Prism
* Fixed an edge case where Bundle Finalization might not become enabled. ([#33493](https://github.com/apache/beam/issues/33493)).
* Fixed session window aggregation, which wasn't being performed per-key. ([#33542](https://github.com/apache/beam/issues/33542)).)
* Fixed a Dataflow template creation issue that ignores template file creation errors (Java) ([#33636](https://github.com/apache/beam/issues/33636))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
2 changes: 1 addition & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ dependencies {
testImplementation project(path: ":sdks:java:extensions:python", configuration: "testRuntimeMigration")
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.jackson_dataformat_yaml
testImplementation library.java.mockito_core
testImplementation library.java.mockito_inline
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
validatesRunner library.java.hamcrest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
Expand Down Expand Up @@ -1524,15 +1522,9 @@ public DataflowPipelineJob run(Pipeline pipeline) {
fileLocation.startsWith("/") || fileLocation.startsWith("gs://"),
"Location must be local or on Cloud Storage, got %s.",
fileLocation);
ResourceId fileResource = FileSystems.matchNewResource(fileLocation, false /* isDirectory */);
String workSpecJson = DataflowPipelineTranslator.jobToString(newJob);
try (PrintWriter printWriter =
new PrintWriter(
new BufferedWriter(
new OutputStreamWriter(
Channels.newOutputStream(FileSystems.create(fileResource, MimeTypes.TEXT)),
UTF_8)))) {
printWriter.print(workSpecJson);

try {
printWorkSpecJsonToFile(fileLocation, newJob);
LOG.info("Printed job specification to {}", fileLocation);
} catch (IOException ex) {
String error = String.format("Cannot create output file at %s", fileLocation);
Expand All @@ -1542,6 +1534,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
LOG.warn(error, ex);
}
}

if (isTemplate) {
LOG.info("Template successfully created.");
return new DataflowTemplateJob();
Expand Down Expand Up @@ -1629,6 +1622,18 @@ public DataflowPipelineJob run(Pipeline pipeline) {
return dataflowPipelineJob;
}

private static void printWorkSpecJsonToFile(String fileLocation, Job job) throws IOException {
String workSpecJson = DataflowPipelineTranslator.jobToString(job);
ResourceId fileResource = FileSystems.matchNewResource(fileLocation, false /* isDirectory */);
try (OutputStreamWriter writer =
new OutputStreamWriter(
Channels.newOutputStream(FileSystems.create(fileResource, MimeTypes.TEXT)), UTF_8)) {
// Not using PrintWriter as it swallows IOException.
// Not using BufferedWriter as this invokes write() only once.
writer.write(workSpecJson);
}
}

private static EnvironmentInfo getEnvironmentInfoFromEnvironmentId(
String environmentId, RunnerApi.Pipeline pipelineProto) {
RunnerApi.Environment environment =
Expand Down
Loading

0 comments on commit ea01ea0

Please sign in to comment.