-
Notifications
You must be signed in to change notification settings - Fork 328
Expose DataStreamWriter.Foreach API #387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
994ea3e
b9ab9e0
13a3d02
6325e71
214f1ad
d7ec1dc
b60fbb4
102fc3a
6221e92
b0d0a27
ed782a1
5339306
c2aa3be
24fe2bf
9cb49ef
cfb9e37
3a9b287
75ed8ed
41ebfde
4e93d8e
8860a44
57a028b
33daff3
ec35490
985a3af
a62197f
02bf06b
66c9837
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,11 @@ | |
// The .NET Foundation licenses this file to you under the MIT license. | ||
// See the LICENSE file in the project root for more information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.IO; | ||
using System.Linq; | ||
using Microsoft.Spark.E2ETest.Utils; | ||
using Microsoft.Spark.Sql; | ||
using Microsoft.Spark.Sql.Streaming; | ||
using Xunit; | ||
|
@@ -59,5 +63,204 @@ public void TestSignaturesV2_3_X() | |
|
||
Assert.IsType<DataStreamWriter>(dsw.Trigger(Trigger.Once())); | ||
} | ||
|
||
[SkipIfSparkVersionIsLessThan(Versions.V2_4_0)] | ||
public void TestForeach() | ||
{ | ||
// Temporary folder to put our test stream input. | ||
using var srcTempDirectory = new TemporaryDirectory(); | ||
string streamInputPath = Path.Combine(srcTempDirectory.Path, "streamInput"); | ||
|
||
// [1, 2, ..., 99] | ||
_spark.Range(1, 100).Write().Json(streamInputPath); | ||
|
||
// Test a scenario where ForeachWriter runs without issues. | ||
// If everything is working as expected, then: | ||
// - Triggering stream will not throw an exception | ||
// - 3 CSV files will be created in the temporary directory. | ||
// - 0 Exception files will be created in the temporary directory. | ||
// - The CSV files will contain valid data to read, where the | ||
// expected entries will contain [101, 102, ..., 199] | ||
TestAndValidateForeach( | ||
streamInputPath, | ||
new TestForeachWriter(), | ||
false, | ||
3, | ||
3, | ||
0, | ||
Enumerable.Range(101, 99)); | ||
|
||
// Test scenario where IForeachWriter.Open returns false. | ||
// When IForeachWriter.Open returns false, then IForeachWriter.Process | ||
// is not called. Verify that: | ||
// - Triggering stream will not throw an exception | ||
// - 3 CSV files will be created in the temporary directory. | ||
// - 0 Exception files will be created in the temporary directory. | ||
// - The CSV files will not contain valid data to read. | ||
TestAndValidateForeach( | ||
streamInputPath, | ||
new TestForeachWriterOpenFailure(), | ||
false, | ||
3, | ||
3, | ||
0, | ||
Enumerable.Empty<int>()); | ||
|
||
|
||
// Test scenario where ForeachWriter.Process throws an Exception. | ||
// When IForeachWriter.Process throws an Exception, then the exception | ||
// is rethrown by ForeachWriterWrapper. Verify that: | ||
// - Triggering stream throws an exception. | ||
// - 1 CSV file will be created in the temporary directory. | ||
// - 1 Exception will be created in the temporary directory. The | ||
// thrown exception from Process() will be sent to Close(). | ||
// - The CSV file will not contain valid data to read. | ||
TestAndValidateForeach( | ||
streamInputPath, | ||
new TestForeachWriterProcessFailure(), | ||
true, | ||
1, | ||
1, | ||
1, | ||
Enumerable.Empty<int>()); | ||
} | ||
|
||
private void TestAndValidateForeach( | ||
string streamInputPath, | ||
TestForeachWriter foreachWriter, | ||
bool foreachThrows, | ||
int partitions, | ||
imback82 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
long expectedCSVFiles, | ||
long expectedExceptionFiles, | ||
IEnumerable<int> expectedOutput) | ||
{ | ||
// Temporary folder the TestForeachWriter will write to. | ||
using var dstTempDirectory = new TemporaryDirectory(); | ||
|
||
foreachWriter.WritePath = dstTempDirectory.Path; | ||
|
||
// Read streamInputPath, repartitions data into `partitions`, then | ||
// calls TestForeachWriter on the data. | ||
DataStreamWriter dsw = _spark | ||
.ReadStream() | ||
.Schema("id INT") | ||
.Json(streamInputPath) | ||
.Repartition(partitions) | ||
.WriteStream() | ||
.Foreach(foreachWriter); | ||
|
||
// Trigger the stream batch once. | ||
if (foreachThrows) | ||
suhsteve marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
{ | ||
Assert.Throws<Exception>( | ||
() => dsw.Trigger(Trigger.Once()).Start().AwaitTermination()); | ||
} | ||
else | ||
{ | ||
dsw.Trigger(Trigger.Once()).Start().AwaitTermination(); | ||
} | ||
|
||
// Verify that TestForeachWriter created a unique .csv for each | ||
// partition. | ||
Assert.Equal( | ||
expectedCSVFiles, | ||
Directory.GetFiles(dstTempDirectory.Path, "*.csv").Length); | ||
|
||
// When ForeachWriter.Process(Row) throws an exception, | ||
// ForeachWriter.Close(Exception) will create a file with the | ||
// .exeception extension. | ||
Assert.Equal( | ||
expectedExceptionFiles, | ||
Directory.GetFiles(dstTempDirectory.Path, "*.exception").Length); | ||
|
||
// Read in the *.csv file(s) generated by the TestForeachWriter. | ||
// If there are multiple input files, sorting by "id" will make | ||
// validation simpler. | ||
DataFrame foreachWriterOutputDF = _spark | ||
.Read() | ||
.Schema("id INT") | ||
imback82 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.Csv(dstTempDirectory.Path) | ||
.Sort("id"); | ||
|
||
// TestForeachWriterProcessFailure.Process(Row) did not write anything to | ||
// the .csv files. | ||
Assert.Equal( | ||
expectedOutput.Select(i => new object[] { i }), | ||
suhsteve marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
foreachWriterOutputDF.Collect().Select(r => r.Values)); | ||
} | ||
|
||
[Serializable] | ||
private class TestForeachWriter : IForeachWriter | ||
{ | ||
// Mark the StreamWriter as ThreadStatic. If there are multiple Tasks | ||
// running this ForeachWriter, on the same Worker, and in parallel, then | ||
// it may be possible that they may use the same StreamWriter object. This | ||
// may cause an unintended side effect of a Task writing the output to a | ||
|
||
// file meant for a different Task. | ||
[ThreadStatic] | ||
private static StreamWriter s_streamWriter; | ||
imback82 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
private long _partitionId; | ||
|
||
private long _epochId; | ||
|
||
internal string WritePath { get; set; } | ||
|
||
public void Close(Exception errorOrNull) | ||
{ | ||
if (errorOrNull != null) | ||
{ | ||
FileStream fs = File.Create( | ||
Path.Combine( | ||
WritePath, | ||
$"Close-{_partitionId}-{_epochId}.exception")); | ||
fs.Dispose(); | ||
} | ||
|
||
s_streamWriter?.Dispose(); | ||
} | ||
|
||
public virtual bool Open(long partitionId, long epochId) | ||
{ | ||
_partitionId = partitionId; | ||
_epochId = epochId; | ||
try | ||
{ | ||
s_streamWriter = new StreamWriter( | ||
Path.Combine( | ||
WritePath, | ||
$"sink-foreachWriter-{_partitionId}-{_epochId}.csv")); | ||
return true; | ||
} | ||
catch | ||
{ | ||
return false; | ||
imback82 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
public virtual void Process(Row value) | ||
{ | ||
s_streamWriter.WriteLine(string.Join(",", value.Values.Select(v => 100 + (int)v))); | ||
} | ||
} | ||
|
||
[Serializable] | ||
private class TestForeachWriterOpenFailure : TestForeachWriter | ||
{ | ||
public override bool Open(long partitionId, long epochId) | ||
{ | ||
base.Open(partitionId, epochId); | ||
return false; | ||
} | ||
} | ||
|
||
[Serializable] | ||
private class TestForeachWriterProcessFailure : TestForeachWriter | ||
{ | ||
public override void Process(Row value) | ||
{ | ||
throw new Exception("TestForeachWriterProcessFailure Process(Row) failure."); | ||
} | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.