Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions nifi-docs/src/main/asciidoc/python-developer-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,30 @@ If the partition has more than one field in the dictionary, all fields in the di
the Records to be written to the same output FlowFile.


==== Working with Record Paths

Manipulating just a small portion of an incoming record is a very common use case for `RecordTransform` processors. To make
this easier the Python API exposes the helper function `nifiapi.recordpath.evaluate(record, schema, record_path)`.

The helper accepts the record dictionary, the optional schema supplied by NiFi, and a simple RecordPath-like expression. The
syntax supports forward-slash navigation and zero-based list indexes such as `/customer/address/street` or `/items[0]/price`.
The function returns an iterable collection of field wrapper objects. Each wrapper exposes `get_value()` and `set_value(...)`
methods so you can read and update the targeted value without walking the nested dictionary yourself. Any call to
`set_value(...)` automatically updates the backing record. Example:

----
path_result = recordpath.evaluate(record, schema, '/my/example')
for field in path_result:
value = field.get_value()
if value is not None:
field.set_value(hashlib.sha256(str(value).encode('utf-8')).hexdigest())
----

Because `recordpath.evaluate` returns wrappers, the loop above will transparently update the original record. When the
processor returns the modified record via `RecordTransformResult`, NiFi writes the new value using the configured Record
Reader and Record Writer services.


[[flowfile-source]]
=== FlowFileSource

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.nifi.py4j;

import org.apache.commons.codec.digest.DigestUtils;

import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
import org.apache.nifi.components.state.Scope;
Expand Down Expand Up @@ -470,8 +472,75 @@ public void testRecordTransformWithDynamicProperties() throws InitializationExce
}


@Test
public void testHashRecordFieldHappyPath() throws InitializationException {
final TestRunner runner = createRecordTransformRunner("HashRecordField");
runner.setProperty("Record Path", "/my/example");

final String json = "[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
runner.enqueue(json);
waitForValid(runner);
runner.run();

runner.assertTransferCount("success", 1);
runner.assertTransferCount("original", 1);
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
final String expectedHash = DigestUtils.sha256Hex("value");
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"" + expectedHash + "\"}}]");
}

@Test
public void testHashRecordFieldMissingField() throws InitializationException {
final TestRunner runner = createRecordTransformRunner("HashRecordField");
runner.setProperty("Record Path", "/does/not/exist");

final String json = "[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
runner.enqueue(json);
waitForValid(runner);
runner.run();

runner.assertTransferCount("success", 1);
runner.assertTransferCount("original", 1);
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]");
}

@Test
public void testHashRecordFieldNonScalar() throws InitializationException {
final TestRunner runner = createRecordTransformRunner("HashRecordField");
runner.setProperty("Record Path", "/my");

final String json = "[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
runner.enqueue(json);
waitForValid(runner);
runner.run();

runner.assertTransferCount("success", 1);
runner.assertTransferCount("original", 1);
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]");
}

@Test
public void testHashRecordFieldLongValue() throws InitializationException {
final TestRunner runner = createRecordTransformRunner("HashRecordField");
runner.setProperty("Record Path", "/count");

final String json = "[{\"count\":7}]";
runner.enqueue(json);
waitForValid(runner);
runner.run();

runner.assertTransferCount("success", 1);
runner.assertTransferCount("original", 1);
final MockFlowFile out = runner.getFlowFilesForRelationship("success").get(0);
final String expectedHash = DigestUtils.sha256Hex("7");
out.assertContentEquals("[{\"count\":\"" + expectedHash + "\"}]");
}


private TestRunner createRecordTransformRunner(final String type) throws InitializationException {
final TestRunner runner = createProcessor("SetRecordField");
final TestRunner runner = createProcessor(type);
runner.setValidateExpressionUsage(false);

final JsonTreeReader reader = new JsonTreeReader();
Expand All @@ -487,6 +556,7 @@ private TestRunner createRecordTransformRunner(final String type) throws Initial
return runner;
}


@Test
public void testRecordTransformWithInnerRecord() throws InitializationException {
// Create a SetRecordField Processor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import hashlib

from nifiapi.properties import PropertyDescriptor, StandardValidators
from nifiapi.recordtransform import RecordTransform, RecordTransformResult
from nifiapi import recordpath


class HashRecordField(RecordTransform):
class Java:
implements = ['org.apache.nifi.python.processor.RecordTransform']

class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = 'Hashes a record field using SHA-256.'
tags = ['record', 'hash', 'security']

def __init__(self, **kwargs):
super().__init__()
self.record_path = PropertyDescriptor(
name='Record Path',
description='The RecordPath identifying the field that should be hashed.',
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR]
)
self.descriptors = [self.record_path]

def getPropertyDescriptors(self):
return self.descriptors

def transform(self, context, record, schema, attributemap):
record_path_value = context.getProperty(self.record_path.name).getValue()
if not record_path_value:
return RecordTransformResult(record=record, schema=schema, relationship='success')

path_result = recordpath.evaluate(record, schema, record_path_value)
if path_result.is_empty():
self.logger.warn('Record Path {} did not resolve to a value; record left unchanged'.format(record_path_value))
return RecordTransformResult(record=record, schema=schema, relationship='success')

for field in path_result:
value = field.get_value()
if value is None:
continue
if isinstance(value, (dict, list)):
self.logger.warn('Record Path {} resolved to a non-scalar value; skipping hash operation'.format(record_path_value))
continue
try:
value_as_string = value if isinstance(value, str) else str(value)
hashed = hashlib.sha256(value_as_string.encode('utf-8')).hexdigest()
except Exception as exc:
self.logger.error('Failed to hash value at {}: {}'.format(record_path_value, exc))
continue
field.set_value(hashed)

path_result.apply()

return RecordTransformResult(record=record, schema=schema, relationship='success')
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def evaluate(record, schema, record_path):
"""Evaluate a RecordPath-like expression against the provided record.

This lightweight implementation supports simple '/' separated paths with
optional zero-based list indexes (for example '/parent/child[0]'). It
returns wrappers that allow callers to read or update the targeted value.
"""
if record is None or record_path is None:
return RecordPathResult(record, None, [], dirty=False)

wrappers = _evaluate_simple_path(record, record_path)
return RecordPathResult(record, None, wrappers, dirty=False)


def _evaluate_simple_path(record, record_path):
tokens = [segment for segment in record_path.split('/') if segment]
if not tokens:
return []

current = record
parent = None
key = None

for token in tokens:
if '[' in token and token.endswith(']'):
base, index_text = token.split('[', 1)
index_text = index_text[:-1]
else:
base, index_text = token, None

if not isinstance(current, dict) or base not in current:
return []

parent = current
key = base
current = current[base]

if index_text is not None:
if not isinstance(current, list):
return []
try:
index = int(index_text)
except ValueError:
return []
if index < 0 or index >= len(current):
return []
parent = current
key = index
current = current[index]

if parent is None:
return []

return [_SimpleFieldValueWrapper(parent, key)]


class RecordPathResult:
def __init__(self, python_record, java_record, field_wrappers, dirty=True):
self._python_record = python_record
self._java_record = java_record
self._field_wrappers = field_wrappers
self._dirty = False

for wrapper in self._field_wrappers:
wrapper._attach(self)

def __iter__(self):
return iter(self._field_wrappers)

def is_empty(self):
return len(self._field_wrappers) == 0

def fields(self):
return list(self._field_wrappers)

def apply(self):
return self._python_record

def _mark_dirty(self):
self._dirty = True


class _SimpleFieldValueWrapper:
def __init__(self, parent, key):
self._parent = parent
self._key = key
self._parent_result = None

def _attach(self, parent_result):
self._parent_result = parent_result

def get_value(self):
try:
return self._parent[self._key]
except Exception:
return None

def set_value(self, new_value):
self.update_value(new_value)

def update_value(self, new_value):
try:
self._parent[self._key] = new_value
if self._parent_result is not None:
self._parent_result._mark_dirty()
except Exception:
pass

def get_field(self):
return None

def get_parent(self):
return None