NiFi processors catalog

This page is a working inventory of custom processors and processor scripts shipped in this repository.

Scope:

  • Python extension processors (FlowFileTransform) in nifi/user_python_extensions/

  • script-based processors used with ExecuteStreamCommand in nifi/user_scripts/processors/

Python extension processors

Processor class

Source file

Purpose

Key properties

CogStackConvertAvroBinaryRecordFieldToBase64

nifi/user_python_extensions/convert_avro_binary_field_to_base64.py

Rewrites Avro binary field(s) to base64 string values and updates output Avro schema accordingly.

binary_field_name, operation_mode, document_id_field_name

ConvertJsonRecordSchema

nifi/user_python_extensions/convert_json_record_schema.py

Remaps JSON records using a mapping schema file, including nested and composite field handling.

json_mapper_schema_path, preserve_non_mapped_fields, composite_first_non_empty_field

CogStackConvertJsonToAttribute

nifi/user_python_extensions/convert_json_to_attribute.py

Extracts numeric IDs from JSON records and stores them in FlowFile attributes (ids_csv, counts, lengths).

field_name

CogStackConvertParquetToJson

nifi/user_python_extensions/convert_record_parquet_to_json.py

Converts Parquet FlowFile content to NDJSON output.

None

CogStackParseCogStackServiceResult

nifi/user_python_extensions/parse_service_response.py

Normalizes OCR/MedCAT service responses into a consistent JSON output shape.

service_message_type, output_text_field_name, document_id_field_name, document_text_field_name, medcat_output_mode, medcat_deid_keep_annotations

CogStackPrepareRecordForNlp

nifi/user_python_extensions/prepare_record_for_nlp.py

Prepares records for NLP service requests as {content: ...} payloads with text and footer.

document_id_field_name, document_text_field_name, process_flow_file_type

CogStackPrepareRecordForOcr

nifi/user_python_extensions/prepare_record_for_ocr.py

Prepares records for OCR service requests with binary_data and footer fields.

binary_field_name, output_text_field_name, operation_mode, document_id_field_name, process_flow_file_type

CogStackJsonRecordAddGeolocation

nifi/user_python_extensions/record_add_geolocation.py

Adds geolocation (lat/lon) to JSON records using postcode lookup data.

lookup_datafile_url, lookup_datafile_path, postcode_field_name, geolocation_field_name

CogStackJsonRecordDecompressCernerBlob

nifi/user_python_extensions/record_decompress_cerner_blob.py

Reassembles ordered blob fragments and decompresses Cerner LZW payloads.

binary_field_name, blob_sequence_order_field_name, binary_field_source_encoding, output_mode, document_id_field_name

CogStackSampleTestProcessor

nifi/user_python_extensions/sample_processor.py

Reference/sample processor template for implementing new processors.

sample_property_one, sample_property_two, sample_property_three

Script-based processors (ExecuteStreamCommand)

Script

Source file

Purpose

Typical arguments

clean_doc.py

nifi/user_scripts/processors/clean_doc.py

Cleans PII-like patterns from text fields in JSON records.

text_field_name

convert_record_parquet_to_json.py

nifi/user_scripts/processors/convert_record_parquet_to_json.py

Converts Parquet bytes from stdin to NDJSON on stdout.

None

record_decompress_cerner_blob.py

nifi/user_scripts/processors/record_decompress_cerner_blob.py

Reassembles + decompresses Cerner blob parts and emits merged JSON record.

binary_field_name, blob_sequence_order_field_name, output_mode, document_id_field_name

get_files_from_storage.py

nifi/user_scripts/processors/get_files_from_storage.py

Reads files (and optional CSV metadata) from storage folders and emits JSON records for ingestion.

root_project_data_dir, folder_to_ingest, folder_pattern, operation_mode, output_batch_size

generate_location.py

nifi/user_scripts/processors/generate_location.py

Adds random geolocation points for records using configured city polygons.

locations, subject_id_field, location_name_field

cogstack_cohort_generate_data.py

nifi/user_scripts/processors/cogstack_cohort_generate_data.py

Cohort export utility; builds cohort aggregation artifacts from patient and annotation files.

input_folder_path, file name patterns, patient/document field names

cogstack_cohort_generate_random_data.py

nifi/user_scripts/processors/cogstack_cohort_generate_random_data.py

Test/dummy cohort data generator from patient and annotation input files.

input file paths and patient/annotation field names

elastic_schema_converter.py

nifi/user_scripts/processors/elastic_schema_converter.py

Experimental schema conversion helper for Elasticsearch mappings.

input_index_name, output_index_name, json_field_mapper_schema_file_path

Notes

  • Some scripts in nifi/user_scripts/processors/ are stream processors; others are batch-style utilities.

  • sample_processor.py is a template/reference implementation, not a production flow processor.

  • Keep this page updated when adding/removing processor files so developers can discover what is available.