NiFi processor scripting guide

This guide covers two ways to script custom logic in this repository:

  1. Python scripts executed by ExecuteStreamCommand (stdin -> stdout).

  2. Python extension processors (FlowFileTransform) that appear as processors in the NiFi UI.

Where code should live

  • nifi/user_scripts/processors/: Python scripts executed by ExecuteStreamCommand.

  • nifi/user_python_extensions/: Python FlowFileTransform processors loaded by NiFi.

  • nifi/user_scripts/utils/: shared helpers used by both approaches.

Option 1: ExecuteStreamCommand stream scripts

Use this option when you want a fast way to run Python on FlowFile content without creating a full NiFi extension processor.

Input/output contract

  • NiFi sends FlowFile content to your script on stdin.

  • Your script must write transformed content to stdout.

  • Use stderr for errors/logs and exit non-zero on failure.

  • The content written to stdout becomes the outgoing FlowFile content.

For text payloads, use sys.stdin.read() and sys.stdout.write(...). For binary payloads, use sys.stdin.buffer.read() and sys.stdout.buffer.write(...).

Minimal script template (with args)

#!/usr/bin/env python3
import json
import sys
import traceback


def parse_args(argv: list[str]) -> tuple[str, bool]:
    # Common repo pattern: key=value arguments
    text_field_name = "text"
    uppercase = False
    for token in argv:
        if "=" not in token:
            continue
        key, value = token.split("=", 1)
        if key == "text_field_name":
            text_field_name = value
        elif key == "uppercase":
            uppercase = value.lower() in {"1", "true", "yes"}
    return text_field_name, uppercase


def main() -> None:
    text_field_name, uppercase = parse_args(sys.argv[1:])
    records = json.loads(sys.stdin.read())

    if isinstance(records, dict):
        records = [records]

    for record in records:
        current_value = str(record.get(text_field_name, ""))
        record[text_field_name] = current_value.upper() if uppercase else current_value

    sys.stdout.buffer.write(
        json.dumps(records, ensure_ascii=False).encode("utf-8")
    )


if __name__ == "__main__":
    try:
        main()
    except Exception:
        traceback.print_exc(file=sys.stderr)
        sys.exit(1)

How args are passed to .py scripts

This repository primarily uses key=value tokens in Command Arguments, for example:

  • text_field_name=text

  • output_mode=base64

  • binary_field_name=binarydoc

You can see this pattern in scripts such as:

  • nifi/user_scripts/processors/clean_doc.py

  • nifi/user_scripts/processors/record_decompress_cerner_blob.py

  • nifi/user_scripts/processors/get_files_from_storage.py

Configure ExecuteStreamCommand

Recommended baseline properties:

  • Command Path: python3.11 (or your Python binary)

  • Command Arguments Strategy: Command Arguments Property

  • Working Directory: /opt/nifi/user_scripts/processors/

  • Ignore STDIN: false

  • Output Destination Attribute: leave empty (so output stays in FlowFile content)

Argument examples:

  • If Argument Delimiter is ;: clean_doc.py;text_field_name=text

  • If delimiter is whitespace: clean_doc.py text_field_name=text

Concrete example using clean_doc.py:

Processor: ExecuteStreamCommand
Command Path: python3.11
Command Arguments Strategy: Command Arguments Property
Working Directory: /opt/nifi/user_scripts/processors/
Argument Delimiter: ;
Command Arguments: clean_doc.py;text_field_name=text
Ignore STDIN: false
Output Destination Attribute: (empty)

Option 2: Python extension processors (FlowFileTransform)

Use this option when you want a first-class processor in the NiFi UI with explicit properties and relationships.

At minimum, extension processors in this repo follow this shape:

  • Class implements org.apache.nifi.python.processor.FlowFileTransform via nested Java class.

  • Nested ProcessorDetails with version metadata.

  • transform(...) entrypoint returns FlowFileTransformResult.

Reference implementations:

  • nifi/user_python_extensions/sample_processor.py

  • nifi/user_scripts/utils/nifi/base_nifi_processor.py

Imports, packaging, and runtime notes

  • PYTHONPATH is set to /opt/nifi/nifi-current/python/framework in deployment.

  • The image build installs package dependencies and local nifi modules into that framework path.

  • If you import shared modules from nifi.user_scripts, rebuild the NiFi image after code/dependency changes.

Quick local test before wiring in NiFi

Run a processor script locally with sample input:

echo '[{"text":"hello"}]' | python3 nifi/user_scripts/processors/clean_doc.py text_field_name=text

For binary streams, prefer:

sys.stdin.buffer.read()
sys.stdout.buffer.write(...)

Troubleshooting

  • Script not found: verify Command Path, Working Directory, and script filename.

  • Import errors: verify PYTHONPATH and rebuild image if package contents changed.

  • Unexpected empty output: ensure script writes to stdout and not only logs.

  • Routing to error/nonzero relationship: ensure exceptions print to stderr and process exits with non-zero code.