Skip to content

Commit 89c12aa

Browse files
committed
NIFI-15057 - Add Record Path like helper methods for Record Transform Python processors
1 parent 4346001 commit 89c12aa

File tree

4 files changed

+296
-1
lines changed

4 files changed

+296
-1
lines changed

nifi-docs/src/main/asciidoc/python-developer-guide.adoc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,30 @@ If the partition has more than one field in the dictionary, all fields in the di
283283
the Records to be written to the same output FlowFile.
284284

285285

286+
==== Working with Record Paths
287+
288+
Manipulating just a small portion of an incoming record is a very common use case for `RecordTransform` processors. To make
289+
this easier the Python API exposes the helper function `nifiapi.recordpath.evaluate(record, schema, record_path)`.
290+
291+
The helper accepts the record dictionary, the optional schema supplied by NiFi, and a simple RecordPath-like expression. The
292+
syntax supports forward-slash navigation and zero-based list indexes such as `/customer/address/street` or `/items[0]/price`.
293+
The function returns an iterable collection of field wrapper objects. Each wrapper exposes `get_value()` and `set_value(...)`
294+
methods so you can read and update the targeted value without walking the nested dictionary yourself. Any call to
295+
`set_value(...)` automatically updates the backing record. Example:
296+
297+
----
298+
path_result = recordpath.evaluate(record, schema, '/my/example')
299+
for field in path_result:
300+
value = field.get_value()
301+
if value is not None:
302+
field.set_value(hashlib.sha256(str(value).encode('utf-8')).hexdigest())
303+
----
304+
305+
Because `recordpath.evaluate` returns wrappers, the loop above will transparently update the original record. When the
306+
processor returns the modified record via `RecordTransformResult`, NiFi writes the new value using the configured Record
307+
Reader and Record Writer services.
308+
309+
286310
[[flowfile-source]]
287311
=== FlowFileSource
288312

nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.nifi.py4j;
1919

20+
import org.apache.commons.codec.digest.DigestUtils;
21+
2022
import org.apache.nifi.components.AsyncLoadedProcessor;
2123
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
2224
import org.apache.nifi.components.state.Scope;
@@ -470,8 +472,75 @@ public void testRecordTransformWithDynamicProperties() throws InitializationExce
470472
}
471473

472474

475+
@Test
476+
public void testHashRecordFieldHappyPath() throws InitializationException {
477+
final TestRunner runner = createRecordTransformRunner("HashRecordField");
478+
runner.setProperty("Record Path", "/my/example");
479+
480+
final String json = "[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
481+
runner.enqueue(json);
482+
waitForValid(runner);
483+
runner.run();
484+
485+
runner.assertTransferCount("success", 1);
486+
runner.assertTransferCount("original", 1);
487+
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
488+
final String expectedHash = DigestUtils.sha256Hex("value");
489+
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"" + expectedHash + "\"}}]");
490+
}
491+
492+
@Test
493+
public void testHashRecordFieldMissingField() throws InitializationException {
494+
final TestRunner runner = createRecordTransformRunner("HashRecordField");
495+
runner.setProperty("Record Path", "/does/not/exist");
496+
497+
final String json = "[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
498+
runner.enqueue(json);
499+
waitForValid(runner);
500+
runner.run();
501+
502+
runner.assertTransferCount("success", 1);
503+
runner.assertTransferCount("original", 1);
504+
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
505+
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]");
506+
}
507+
508+
@Test
509+
public void testHashRecordFieldNonScalar() throws InitializationException {
510+
final TestRunner runner = createRecordTransformRunner("HashRecordField");
511+
runner.setProperty("Record Path", "/my");
512+
513+
final String json = "[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
514+
runner.enqueue(json);
515+
waitForValid(runner);
516+
runner.run();
517+
518+
runner.assertTransferCount("success", 1);
519+
runner.assertTransferCount("original", 1);
520+
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
521+
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]");
522+
}
523+
524+
@Test
525+
public void testHashRecordFieldLongValue() throws InitializationException {
526+
final TestRunner runner = createRecordTransformRunner("HashRecordField");
527+
runner.setProperty("Record Path", "/count");
528+
529+
final String json = "[{\"count\":7}]";
530+
runner.enqueue(json);
531+
waitForValid(runner);
532+
runner.run();
533+
534+
runner.assertTransferCount("success", 1);
535+
runner.assertTransferCount("original", 1);
536+
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
537+
final String expectedHash = DigestUtils.sha256Hex("7");
538+
out.assertContentEquals("[{\"count\":\"" + expectedHash + "\"}]");
539+
}
540+
541+
473542
private TestRunner createRecordTransformRunner(final String type) throws InitializationException {
474-
final TestRunner runner = createProcessor("SetRecordField");
543+
final TestRunner runner = createProcessor(type);
475544
runner.setValidateExpressionUsage(false);
476545

477546
final JsonTreeReader reader = new JsonTreeReader();
@@ -487,6 +556,7 @@ private TestRunner createRecordTransformRunner(final String type) throws Initial
487556
return runner;
488557
}
489558

559+
490560
@Test
491561
public void testRecordTransformWithInnerRecord() throws InitializationException {
492562
// Create a SetRecordField Processor
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import hashlib
17+
18+
from nifiapi.properties import PropertyDescriptor, StandardValidators
19+
from nifiapi.recordtransform import RecordTransform, RecordTransformResult
20+
from nifiapi import recordpath
21+
22+
23+
class HashRecordField(RecordTransform):
24+
class Java:
25+
implements = ['org.apache.nifi.python.processor.RecordTransform']
26+
27+
class ProcessorDetails:
28+
version = '0.0.1-SNAPSHOT'
29+
description = 'Hashes a record field using SHA-256.'
30+
tags = ['record', 'hash', 'security']
31+
32+
def __init__(self, **kwargs):
33+
super().__init__()
34+
self.record_path = PropertyDescriptor(
35+
name='Record Path',
36+
description='The RecordPath identifying the field that should be hashed.',
37+
required=True,
38+
validators=[StandardValidators.NON_EMPTY_VALIDATOR]
39+
)
40+
self.descriptors = [self.record_path]
41+
42+
def getPropertyDescriptors(self):
43+
return self.descriptors
44+
45+
def transform(self, context, record, schema, attributemap):
46+
record_path_value = context.getProperty(self.record_path.name).getValue()
47+
if not record_path_value:
48+
return RecordTransformResult(record=record, schema=schema, relationship='success')
49+
50+
path_result = recordpath.evaluate(record, schema, record_path_value)
51+
if path_result.is_empty():
52+
self.logger.warn('Record Path {} did not resolve to a value; record left unchanged'.format(record_path_value))
53+
return RecordTransformResult(record=record, schema=schema, relationship='success')
54+
55+
for field in path_result:
56+
value = field.get_value()
57+
if value is None:
58+
continue
59+
if isinstance(value, (dict, list)):
60+
self.logger.warn('Record Path {} resolved to a non-scalar value; skipping hash operation'.format(record_path_value))
61+
continue
62+
try:
63+
value_as_string = value if isinstance(value, str) else str(value)
64+
hashed = hashlib.sha256(value_as_string.encode('utf-8')).hexdigest()
65+
except Exception as exc:
66+
self.logger.error('Failed to hash value at {}: {}'.format(record_path_value, exc))
67+
continue
68+
field.set_value(hashed)
69+
70+
path_result.apply()
71+
72+
return RecordTransformResult(record=record, schema=schema, relationship='success')
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
def evaluate(record, schema, record_path):
18+
"""Evaluate a RecordPath-like expression against the provided record.
19+
20+
This lightweight implementation supports simple '/' separated paths with
21+
optional zero-based list indexes (for example '/parent/child[0]'). It
22+
returns wrappers that allow callers to read or update the targeted value.
23+
"""
24+
if record is None or record_path is None:
25+
return RecordPathResult(record, None, [], dirty=False)
26+
27+
wrappers = _evaluate_simple_path(record, record_path)
28+
return RecordPathResult(record, None, wrappers, dirty=False)
29+
30+
31+
def _evaluate_simple_path(record, record_path):
32+
tokens = [segment for segment in record_path.split('/') if segment]
33+
if not tokens:
34+
return []
35+
36+
current = record
37+
parent = None
38+
key = None
39+
40+
for token in tokens:
41+
if '[' in token and token.endswith(']'):
42+
base, index_text = token.split('[', 1)
43+
index_text = index_text[:-1]
44+
else:
45+
base, index_text = token, None
46+
47+
if not isinstance(current, dict) or base not in current:
48+
return []
49+
50+
parent = current
51+
key = base
52+
current = current[base]
53+
54+
if index_text is not None:
55+
if not isinstance(current, list):
56+
return []
57+
try:
58+
index = int(index_text)
59+
except ValueError:
60+
return []
61+
if index < 0 or index >= len(current):
62+
return []
63+
parent = current
64+
key = index
65+
current = current[index]
66+
67+
if parent is None:
68+
return []
69+
70+
return [_SimpleFieldValueWrapper(parent, key)]
71+
72+
73+
class RecordPathResult:
74+
def __init__(self, python_record, java_record, field_wrappers, dirty=True):
75+
self._python_record = python_record
76+
self._java_record = java_record
77+
self._field_wrappers = field_wrappers
78+
self._dirty = False
79+
80+
for wrapper in self._field_wrappers:
81+
wrapper._attach(self)
82+
83+
def __iter__(self):
84+
return iter(self._field_wrappers)
85+
86+
def is_empty(self):
87+
return len(self._field_wrappers) == 0
88+
89+
def fields(self):
90+
return list(self._field_wrappers)
91+
92+
def apply(self):
93+
return self._python_record
94+
95+
def _mark_dirty(self):
96+
self._dirty = True
97+
98+
99+
class _SimpleFieldValueWrapper:
100+
def __init__(self, parent, key):
101+
self._parent = parent
102+
self._key = key
103+
self._parent_result = None
104+
105+
def _attach(self, parent_result):
106+
self._parent_result = parent_result
107+
108+
def get_value(self):
109+
try:
110+
return self._parent[self._key]
111+
except Exception:
112+
return None
113+
114+
def set_value(self, new_value):
115+
self.update_value(new_value)
116+
117+
def update_value(self, new_value):
118+
try:
119+
self._parent[self._key] = new_value
120+
if self._parent_result is not None:
121+
self._parent_result._mark_dirty()
122+
except Exception:
123+
pass
124+
125+
def get_field(self):
126+
return None
127+
128+
def get_parent(self):
129+
return None

0 commit comments

Comments
 (0)