- 
                Notifications
    You must be signed in to change notification settings 
- Fork 329
Expose DataStreamWriter.Foreach API #387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
994ea3e
              b9ab9e0
              13a3d02
              6325e71
              214f1ad
              d7ec1dc
              b60fbb4
              102fc3a
              6221e92
              b0d0a27
              ed782a1
              5339306
              c2aa3be
              24fe2bf
              9cb49ef
              cfb9e37
              3a9b287
              75ed8ed
              41ebfde
              4e93d8e
              8860a44
              57a028b
              33daff3
              ec35490
              985a3af
              a62197f
              02bf06b
              66c9837
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -2,7 +2,11 @@ | |
| // The .NET Foundation licenses this file to you under the MIT license. | ||
| // See the LICENSE file in the project root for more information. | ||
|  | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.IO; | ||
| using System.Linq; | ||
| using Microsoft.Spark.E2ETest.Utils; | ||
| using Microsoft.Spark.Sql; | ||
| using Microsoft.Spark.Sql.Streaming; | ||
| using Xunit; | ||
|  | @@ -59,5 +63,205 @@ public void TestSignaturesV2_3_X() | |
|  | ||
| Assert.IsType<DataStreamWriter>(dsw.Trigger(Trigger.Once())); | ||
| } | ||
|  | ||
| [SkipIfSparkVersionIsLessThan(Versions.V2_4_0)] | ||
| public void TestForeach() | ||
| { | ||
| // Temporary folder to put our test stream input. | ||
| using var srcTempDirectory = new TemporaryDirectory(); | ||
| string streamInputPath = Path.Combine(srcTempDirectory.Path, "streamInput"); | ||
|  | ||
| // [1, 2, ..., 99] | ||
| _spark.Range(1, 100).Write().Json(streamInputPath); | ||
|  | ||
| // Test a scenario where ForeachWriter runs without issues. | ||
| // If everything is working as expected, then: | ||
| // - Triggering stream will not throw an exception | ||
| // - 3 CSV files will be created in the temporary directory. | ||
| // - 0 Exception files will be created in the temporary directory. | ||
| // - The CSV files will contain valid data to read, where the | ||
| // expected entries will contain [101, 102, ..., 199] | ||
| TestAndValidateForeach( | ||
| streamInputPath, | ||
| new TestForeachWriter(), | ||
| false, | ||
| 3, | ||
| 3, | ||
| 0, | ||
| Enumerable.Range(101, 99)); | ||
|  | ||
| // Test scenario where IForeachWriter.Open returns false. | ||
| // When IForeachWriter.Open returns false, then IForeachWriter.Process | ||
| // is not called. Verify that: | ||
| // - Triggering stream will not throw an exception | ||
| // - 3 CSV files will be created in the temporary directory. | ||
| // - 0 Exception files will be created in the temporary directory. | ||
| // - The CSV files will not contain valid data to read. | ||
| TestAndValidateForeach( | ||
| streamInputPath, | ||
| new TestForeachWriterOpenFailure(), | ||
| false, | ||
| 3, | ||
| 3, | ||
| 0, | ||
| Enumerable.Empty<int>()); | ||
|  | ||
|  | ||
| // Test scenario where ForeachWriter.Process throws an Exception. | ||
| // When IForeachWriter.Process throws an Exception, then the exception | ||
| // is rethrown by ForeachWriterWrapper. We will limit the partitions | ||
| // to 1 to make validating this scenario simpler. Verify that: | ||
| // - Triggering stream throws an exception. | ||
| // - 1 CSV file will be created in the temporary directory. | ||
| // - 1 Exception will be created in the temporary directory. The | ||
| // thrown exception from Process() will be sent to Close(). | ||
| // - The CSV file will not contain valid data to read. | ||
| TestAndValidateForeach( | ||
| streamInputPath, | ||
| new TestForeachWriterProcessFailure(), | ||
| true, | ||
| 1, | ||
| 1, | ||
| 1, | ||
| Enumerable.Empty<int>()); | ||
| } | ||
|  | ||
| private void TestAndValidateForeach( | ||
| string streamInputPath, | ||
| TestForeachWriter foreachWriter, | ||
| bool foreachThrows, | ||
| int partitions, | ||
|         
                  imback82 marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| long expectedCSVFiles, | ||
| long expectedExceptionFiles, | ||
| IEnumerable<int> expectedOutput) | ||
| { | ||
| // Temporary folder the TestForeachWriter will write to. | ||
| using var dstTempDirectory = new TemporaryDirectory(); | ||
|  | ||
| foreachWriter.WritePath = dstTempDirectory.Path; | ||
|  | ||
| // Read streamInputPath, repartitions data into `partitions`, then | ||
| // calls TestForeachWriter on the data. | ||
| DataStreamWriter dsw = _spark | ||
| .ReadStream() | ||
| .Schema("id INT") | ||
| .Json(streamInputPath) | ||
| .Repartition(partitions) | ||
| .WriteStream() | ||
| .Foreach(foreachWriter); | ||
|  | ||
| // Trigger the stream batch once. | ||
| if (foreachThrows) | ||
|         
                  suhsteve marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| { | ||
| Assert.Throws<Exception>( | ||
| () => dsw.Trigger(Trigger.Once()).Start().AwaitTermination()); | ||
| } | ||
| else | ||
| { | ||
| dsw.Trigger(Trigger.Once()).Start().AwaitTermination(); | ||
| } | ||
|  | ||
| // Verify that TestForeachWriter created a unique .csv when | ||
| // ForeachWriter.Open was called on each partitionId. | ||
| Assert.Equal( | ||
| expectedCSVFiles, | ||
| Directory.GetFiles(dstTempDirectory.Path, "*.csv").Length); | ||
|  | ||
| // Only if ForeachWriter.Process(Row) throws an exception, will | ||
| // ForeachWriter.Close(Exception) create a file with the | ||
| // .exeception extension. | ||
| Assert.Equal( | ||
| expectedExceptionFiles, | ||
| Directory.GetFiles(dstTempDirectory.Path, "*.exception").Length); | ||
|  | ||
| // Read in the *.csv file(s) generated by the TestForeachWriter. | ||
| // If there are multiple input files, sorting by "id" will make | ||
| // validation simpler. Contents of the *.csv will only be populated | ||
| // on successful calls to the ForeachWriter.Process method. | ||
| DataFrame foreachWriterOutputDF = _spark | ||
| .Read() | ||
| .Schema("id INT") | ||
|         
                  imback82 marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| .Csv(dstTempDirectory.Path) | ||
| .Sort("id"); | ||
|  | ||
| // Validated expected *.csv data. | ||
| Assert.Equal( | ||
| expectedOutput.Select(i => new object[] { i }), | ||
|         
                  suhsteve marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| foreachWriterOutputDF.Collect().Select(r => r.Values)); | ||
| } | ||
|  | ||
| [Serializable] | ||
| private class TestForeachWriter : IForeachWriter | ||
| { | ||
| // Mark the StreamWriter as ThreadStatic. If there are multiple Tasks | ||
| // running this ForeachWriter, on the same Worker, and in parallel, then | ||
| // it may be possible that they may use the same StreamWriter object. This | ||
| // may cause an unintended side effect of a Task writing the output to a | ||
|          | ||
| // file meant for a different Task. | ||
| [ThreadStatic] | ||
| private static StreamWriter s_streamWriter; | ||
|         
                  imback82 marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
|  | ||
| private long _partitionId; | ||
|  | ||
| private long _epochId; | ||
|  | ||
| internal string WritePath { get; set; } | ||
|  | ||
| public void Close(Exception errorOrNull) | ||
| { | ||
| if (errorOrNull != null) | ||
| { | ||
| FileStream fs = File.Create( | ||
| Path.Combine( | ||
| WritePath, | ||
| $"Close-{_partitionId}-{_epochId}.exception")); | ||
| fs.Dispose(); | ||
| } | ||
|  | ||
| s_streamWriter?.Dispose(); | ||
| } | ||
|  | ||
| public virtual bool Open(long partitionId, long epochId) | ||
| { | ||
| _partitionId = partitionId; | ||
| _epochId = epochId; | ||
| try | ||
| { | ||
| s_streamWriter = new StreamWriter( | ||
| Path.Combine( | ||
| WritePath, | ||
| $"sink-foreachWriter-{_partitionId}-{_epochId}.csv")); | ||
| return true; | ||
| } | ||
| catch | ||
| { | ||
| return false; | ||
|         
                  imback82 marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| } | ||
| } | ||
|  | ||
| public virtual void Process(Row value) | ||
| { | ||
| s_streamWriter.WriteLine(string.Join(",", value.Values.Select(v => 100 + (int)v))); | ||
| } | ||
| } | ||
|  | ||
| [Serializable] | ||
| private class TestForeachWriterOpenFailure : TestForeachWriter | ||
| { | ||
| public override bool Open(long partitionId, long epochId) | ||
| { | ||
| base.Open(partitionId, epochId); | ||
| return false; | ||
| } | ||
| } | ||
|  | ||
| [Serializable] | ||
| private class TestForeachWriterProcessFailure : TestForeachWriter | ||
| { | ||
| public override void Process(Row value) | ||
| { | ||
| throw new Exception("TestForeachWriterProcessFailure Process(Row) failure."); | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.