Skip to content

Commit 271cf1d

Browse files
committed
NIFI-15024 - Add executed query as a FlowFile attribute for ExecuteSQL and ExecuteSQLRecord
1 parent 4346001 commit 271cf1d

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
6565
public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
6666
public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
6767
public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
68+
public static final String RESULT_QUERY_STATEMENT = "executesql.query";
6869
public static final String RESULTSET_INDEX = "executesql.resultset.index";
6970
public static final String RESULT_ERROR_MESSAGE = "executesql.error.message";
7071
public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
@@ -272,6 +273,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
272273
selectQuery = queryContents.toString();
273274
}
274275

276+
if (fileToProcess != null) {
277+
fileToProcess = session.putAttribute(fileToProcess, RESULT_QUERY_STATEMENT, selectQuery);
278+
}
279+
275280
int resultCount = 0;
276281
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
277282
final boolean isAutoCommit = con.getAutoCommit();
@@ -378,6 +383,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
378383
attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
379384
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
380385
attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
386+
attributesToAdd.put(RESULT_QUERY_STATEMENT, selectQuery);
381387
attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
382388
if (inputFileUUID != null) {
383389
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
@@ -479,14 +485,14 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
479485
session.transfer(fileToProcess, REL_SUCCESS);
480486
} else {
481487
// Set Empty Results as the default behavior based on strategy or null property
482-
session.transfer(setFlowFileEmptyResults(session, fileToProcess, sqlWriter), REL_SUCCESS);
488+
session.transfer(setFlowFileEmptyResults(session, fileToProcess, sqlWriter, selectQuery), REL_SUCCESS);
483489
}
484490
}
485491
} else if (resultCount == 0) {
486492
// If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
487493
// Then generate an empty Output FlowFile
488494
FlowFile resultSetFF = session.create();
489-
session.transfer(setFlowFileEmptyResults(session, resultSetFF, sqlWriter), REL_SUCCESS);
495+
session.transfer(setFlowFileEmptyResults(session, resultSetFF, sqlWriter, selectQuery), REL_SUCCESS);
490496
}
491497
}
492498
} catch (final ProcessException | SQLException e) {
@@ -505,16 +511,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
505511
context.yield();
506512
}
507513
session.putAttribute(fileToProcess, RESULT_ERROR_MESSAGE, e.getMessage());
514+
session.putAttribute(fileToProcess, RESULT_QUERY_STATEMENT, selectQuery);
508515
session.transfer(fileToProcess, REL_FAILURE);
509516
}
510517
}
511518
}
512519

513-
protected FlowFile setFlowFileEmptyResults(final ProcessSession session, FlowFile flowFile, SqlWriter sqlWriter) {
520+
protected FlowFile setFlowFileEmptyResults(final ProcessSession session, FlowFile flowFile, SqlWriter sqlWriter, final String selectQuery) {
514521
flowFile = session.write(flowFile, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
515522
final Map<String, String> attributesToAdd = new HashMap<>();
516523
attributesToAdd.put(RESULT_ROW_COUNT, "0");
517524
attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
525+
if (selectQuery != null) {
526+
attributesToAdd.put(RESULT_QUERY_STATEMENT, selectQuery);
527+
}
518528
return session.putAllAttributes(flowFile, attributesToAdd);
519529
}
520530

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
@WritesAttributes({
8484
@WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned by the query. "
8585
+ "If 'Max Rows Per Flow File' is set, then this number will reflect the number of rows in the Flow File instead of the entire result set."),
86+
@WritesAttribute(attribute = "executesql.query", description = "SQL query executed for the FlowFile."),
8687
@WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds. "
8788
+ "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."),
8889
@WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds. "

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
})
8383
@WritesAttributes({
8484
@WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned in the select query"),
85+
@WritesAttribute(attribute = "executesql.query", description = "SQL query executed for the FlowFile."),
8586
@WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"),
8687
@WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
8788
@WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),

0 commit comments

Comments
 (0)