NiFi processor scripting guide
This guide covers two ways to script custom logic in this repository:
Python scripts executed by
ExecuteStreamCommand(stdin -> stdout).Python extension processors (
FlowFileTransform) that appear as processors in the NiFi UI.
Where code should live
nifi/user_scripts/processors/: Python scripts executed byExecuteStreamCommand.nifi/user_python_extensions/: PythonFlowFileTransformprocessors loaded by NiFi.nifi/user_scripts/utils/: shared helpers used by both approaches.
Option 1: ExecuteStreamCommand stream scripts
Use this option when you want a fast way to run Python on FlowFile content without creating a full NiFi extension processor.
Input/output contract
NiFi sends FlowFile content to your script on
stdin.Your script must write transformed content to
stdout.Use
stderrfor errors/logs and exit non-zero on failure.The content written to
stdoutbecomes the outgoing FlowFile content.
For text payloads, use sys.stdin.read() and sys.stdout.write(...).
For binary payloads, use sys.stdin.buffer.read() and sys.stdout.buffer.write(...).
Minimal script template (with args)
#!/usr/bin/env python3
import json
import sys
import traceback
def parse_args(argv: list[str]) -> tuple[str, bool]:
# Common repo pattern: key=value arguments
text_field_name = "text"
uppercase = False
for token in argv:
if "=" not in token:
continue
key, value = token.split("=", 1)
if key == "text_field_name":
text_field_name = value
elif key == "uppercase":
uppercase = value.lower() in {"1", "true", "yes"}
return text_field_name, uppercase
def main() -> None:
text_field_name, uppercase = parse_args(sys.argv[1:])
records = json.loads(sys.stdin.read())
if isinstance(records, dict):
records = [records]
for record in records:
current_value = str(record.get(text_field_name, ""))
record[text_field_name] = current_value.upper() if uppercase else current_value
sys.stdout.buffer.write(
json.dumps(records, ensure_ascii=False).encode("utf-8")
)
if __name__ == "__main__":
try:
main()
except Exception:
traceback.print_exc(file=sys.stderr)
sys.exit(1)
How args are passed to .py scripts
This repository primarily uses key=value tokens in Command Arguments, for example:
text_field_name=textoutput_mode=base64binary_field_name=binarydoc
You can see this pattern in scripts such as:
nifi/user_scripts/processors/clean_doc.pynifi/user_scripts/processors/record_decompress_cerner_blob.pynifi/user_scripts/processors/get_files_from_storage.py
Configure ExecuteStreamCommand
Recommended baseline properties:
Command Path:python3.11(or your Python binary)Command Arguments Strategy:Command Arguments PropertyWorking Directory:/opt/nifi/user_scripts/processors/Ignore STDIN:falseOutput Destination Attribute: leave empty (so output stays in FlowFile content)
Argument examples:
If
Argument Delimiteris;:clean_doc.py;text_field_name=textIf delimiter is whitespace:
clean_doc.py text_field_name=text
Concrete example using clean_doc.py:
Processor: ExecuteStreamCommand
Command Path: python3.11
Command Arguments Strategy: Command Arguments Property
Working Directory: /opt/nifi/user_scripts/processors/
Argument Delimiter: ;
Command Arguments: clean_doc.py;text_field_name=text
Ignore STDIN: false
Output Destination Attribute: (empty)
Option 2: Python extension processors (FlowFileTransform)
Use this option when you want a first-class processor in the NiFi UI with explicit properties and relationships.
At minimum, extension processors in this repo follow this shape:
Class implements
org.apache.nifi.python.processor.FlowFileTransformvia nestedJavaclass.Nested
ProcessorDetailswith version metadata.transform(...)entrypoint returnsFlowFileTransformResult.
Reference implementations:
nifi/user_python_extensions/sample_processor.pynifi/user_scripts/utils/nifi/base_nifi_processor.py
Imports, packaging, and runtime notes
PYTHONPATHis set to/opt/nifi/nifi-current/python/frameworkin deployment.The image build installs package dependencies and local
nifimodules into that framework path.If you import shared modules from
nifi.user_scripts, rebuild the NiFi image after code/dependency changes.
Quick local test before wiring in NiFi
Run a processor script locally with sample input:
echo '[{"text":"hello"}]' | python3 nifi/user_scripts/processors/clean_doc.py text_field_name=text
For binary streams, prefer:
sys.stdin.buffer.read()
sys.stdout.buffer.write(...)
Troubleshooting
Script not found: verify
Command Path,Working Directory, and script filename.Import errors: verify
PYTHONPATHand rebuild image if package contents changed.Unexpected empty output: ensure script writes to
stdoutand not only logs.Routing to error/nonzero relationship: ensure exceptions print to
stderrand process exits with non-zero code.