FME Transformers: 2026.1
KafkaConnector
Connects to Apache Kafka, Confluent, RedPanda or other Kafka-compatible data stream services to send and receive messages.
Typical Uses
- Sending and receiving messages
- Creating topics
How does it work?
The KafkaConnector uses your account credentials to perform various tasks:
| Action | Task |
|---|---|
|
Commit |
Manually commit read offsets. |
|
Get Metadata |
Retrieve metadata into an attribute. |
|
Receive |
Receive messages from topics. |
|
Send |
Send messages to queues or topics. |
Optional Input Port
This transformer has two modes, depending on whether a connector is attached to the Input port or not:
- Input-driven: When input features are connected, the transformer runs once for each feature it receives in the Input port.
- Run Once: When no input features are connected, the transformer runs one time.
When the Input port is in use, the Initiator output port is also enabled.
Heartbeat
When Action is Receive and the connection is still alive (Receive Mode is Stream or a batch is not yet complete), heartbeat features are output at a specified interval after a period of non-activity.
Heartbeat features have two attributes:
- _heartbeat_id: Integer starting at 0. This is a unique identifier for each heartbeat feature output from this port.
- _heartbeat_counter: Integer starting at 0 that resets whenever the connector receives a new message and a new period of inactivity begins. This counter represents the number of consecutive heartbeat features.
Usage Notes
- The KafkaConnector connects to a list of bootstrap servers initially, but it may connect to further servers as they are discovered. It does not connect to the ZooKeeper instance directly.
- The KafkaConnector allows for automatic or manual offset commits. See Committing offsets and reset policy.
Configuration
Input Ports
This transformer accepts any feature.
Output Ports
Features with added attributes, as specified in parameters and according to Action:
|
Action |
Output - Input-Driven |
Output - Run Once |
|---|---|---|
|
Commit |
Input feature with commit details. |
One new feature with commit details. |
|
Get Metadata |
Input feature with metadata details. |
New feature with metadata details. |
|
Receive |
Multiple features, one for each message received, with details about the message. |
Multiple features, one for each message received, with details about the message. |
|
Send |
Input feature with details about the sent message. |
New feature with details about the sent message. |
When Action is Receive, heartbeat features are output through this port.
When the optional Input port is used, input features are output here unmodified, in addition to any other output locations (Output or <Rejected>).
Features that cause the operation to fail are output through this port. An fme_rejection_code attribute describing the category of the error will be added, along with a more descriptive fme_rejection_message which contains more specific details as to the reason for the failure.
If an Input feature already has a value for fme_rejection_code, this value will be removed.
Rejected Feature Handling: can be set to either terminate the translation or continue running when it encounters a rejected feature. This setting is available both as a default FME option and as a workspace parameter.
Parameters
|
Credential Source |
Select the type of credentials to use:
|
||||
|
Account |
When Credential Source is Web Connection, select or create a Web Connection connecting to an Apache Kafka Web Service. |
||||
|
User Name and Password |
When Credential Source is Embedded:
|
|
Security Protocol |
Select a protocol:
|
|
SASL Mechanism |
When Security Protocol is SASL Plaintext or SASL SSL, select an option:
|
|
SSL CA Certificate |
(Optional) When Security Protocol is SSL or SASL SSL, provide the path and file name of a certificate. |
|
Brokers |
Provide one or more bootstrap servers to connect to.
|
|
Action |
Select an operation to perform. Choices include:
|
Topic
|
Topic |
Specify the topic for which to set the commit offset. |
Commit Options
|
Receiving Connector Identifier |
Set to the value of the _connector_id attribute produced by the upstream receiving KafkaConnector. |
|
Offset |
Specify the offset to commit, typically set to the _offset attribute of the received message. |
|
Partition |
Specify the partition to commit, typically set to the _partition attribute of the received message. |
|
Advanced Consumer Options |
Provide any additional configuration options, as JSON objects. Example: Copy
Any options set here will override settings in other parameters. |
Output Attributes
|
Attributes to Add |
Select any available attributes to be included on the output features:
|
Metadata is retrieved into the _metadata attribute, and is formatted as JSON. It includes cluster, broker, topic and partition metadata.
{
"cluster_id": < cluster ID > ,
"controller_broker_id": < controller broker ID > ,
"brokers": [
{
"id": < broker ID > ,
"host": < host > ,
"port": < port >
},
...
],
"topics": [
{
"name": < topic name > ,
"partitions": [
{
"id": < partition ID > ,
"leader_broker_id": < leader broker ID > ,
"replica_broker_ids": [< replica broker ID >, ...],
"in_sync_replicas": [< in-sync replica ID >, ...],
},
...
],
},
...
],
}
Metadata Options
|
Advanced Metadata Options |
Provide any additional configuration options, as JSON objects. Example: Copy
Any options set here will override settings in other parameters. |
Output Attributes
|
Attributes to Add |
Select any available attributes to be included on the output features:
|
Topics
|
Topics |
Specify the topics to receive from. |
Receive Behavior
|
Mode |
Select a message receiving method:
|
|
Maximum per Second |
(Optional) When Mode is Stream, specify the maximum number of messages that will be received per second. If blank, no limit is applied. |
|
Batch Size |
When Mode is Batch, specify the maximum number of messages per batch. |
Receive Options
|
Consumer Group ID |
Specify the consumer group the connector belongs to. See Kafka documentation. |
||||
|
Starting Offset |
Select a starting offset position:
|
||||
|
Partition |
When Starting Offset is Custom, specify the partition to assign the connector to. |
||||
|
Offset |
When Starting Offset is Custom, specify the offset to start receiving from. |
||||
|
Auto Commit |
Select a commit option:
|
||||
|
Heartbeat |
|
||||
|
Advanced Consumer Options |
Provide any additional configuration options, as JSON objects. Example: Copy
Any options set here will override settings in other parameters. |
Message Options
|
Message Schema |
Select the message schema type:
|
|
Encoding |
When Message Schema is Simple, select the message encoding. The _key and _value output attributes contain the raw bytes received from the server by default. If another encoding is selected, the message will be decoded to a string accordingly. |
|
Schema Registry |
When Message Schema is Avro, provide the URL for the schema registry. |
Output Attributes
|
Attributes to Add |
Select any available attributes to be included on the output features:
|
Topics
|
Topics |
Specify the topics to send to. |
Send Options
|
Create Missing Topics |
Select an option for handling topics that do not exist:
If No, a feature will be output via the <Rejected> port for any messages sent to missing topics. |
|
Number of Partitions |
When Create Missing Topics is Yes, specify the number of partitions for new topics. |
|
Replication Factor |
When Create Missing Topics is Yes, specify the replication factor for new topics. |
|
Advanced Producer Options |
Provide any additional configuration options, as JSON objects. Example: Copy
Any options set here will override settings in other parameters. |
Message Options
|
Message Key |
(Optional) Provide a message key, usually used for partitioning and log compaction. |
|
Message Value |
Provide the message as a string or binary value. |
|
Message Schema |
Select the message schema type:
|
|
Encoding |
When Message Schema is Simple, select the message encoding. Numeric attributes will be converted to text before encoding. If fme-binary is selected, strings will be encoded as utf-8, binary attributes will be sent as-is, and other attribute types will not be supported. |
|
Schema Registry |
When Message Schema is Avro, provide the URL for the schema registry. |
|
Key Schema |
(Optional) When Message Schema is Avro, provide a JSON-based Avro schema to use for the message key. |
|
Value Schema |
When Message Schema is Avro, provide a JSON-based Avro schema to use for the message value. |
Output Attributes
|
Attributes to Add |
Select any available attributes to be included on the output features:
|
Editing Transformer Parameters
Transformer parameters can be set by directly entering values, using expressions, or referencing other elements in the workspace such as attribute values or user parameters. Various editors and context menus are available to assist. To see what is available, click
beside the applicable parameter.
Defining Values
There are several ways to define a value for use in a Transformer. The simplest is to simply type in a value or string, which can include functions of various types such as attribute references, math and string functions, and workspace parameters.
Using the Text Editor
The Text Editor provides a convenient way to construct text strings (including regular expressions) from various data sources, such as attributes, parameters, and constants, where the result is used directly inside a parameter.
Using the Arithmetic Editor
The Arithmetic Editor provides a convenient way to construct math expressions from various data sources, such as attributes, parameters, and feature functions, where the result is used directly inside a parameter.
Conditional Values
Set values depending on one or more test conditions that either pass or fail.
Parameter Condition Definition Dialog
Content
Expressions and strings can include a number of functions, characters, parameters, and more.
When setting values - whether entered directly in a parameter or constructed using one of the editors - strings and expressions containing String, Math, Date/Time or FME Feature Functions will have those functions evaluated. Therefore, the names of these functions (in the form @<function_name>) should not be used as literal string values.
| These functions manipulate and format strings. | |
|
Special Characters |
A set of control characters is available in the Text Editor. |
| Math functions are available in both editors. | |
| Date/Time Functions | Date and time functions are available in the Text Editor. |
| These operators are available in the Arithmetic Editor. | |
| These return primarily feature-specific values. | |
| FME and workspace-specific parameters may be used. | |
| Creating and Modifying User Parameters | Create your own editable parameters. |
Table Tools
Transformers with table-style parameters have additional tools for populating and manipulating values.
|
Row Reordering
|
Enabled once you have clicked on a row item. Choices include:
|
|
Cut, Copy, and Paste
|
Enabled once you have clicked on a row item. Choices include:
Cut, copy, and paste may be used within a transformer, or between transformers. |
|
Filter
|
Start typing a string, and the matrix will only display rows matching those characters. Searches all columns. This only affects the display of attributes within the transformer - it does not alter which attributes are output. |
|
Import
|
Import populates the table with a set of new attributes read from a dataset. Specific application varies between transformers. |
|
Reset/Refresh
|
Generally resets the table to its initial state, and may provide additional options to remove invalid entries. Behavior varies between transformers. |
Note: Not all tools are available in all transformers.
For more information, see Transformer Parameter Menu Options.
Reference
|
Processing Behavior |
|
|
Feature Holding |
No |
| Dependencies | Apache Kafka |
| Aliases | ApacheKafkaConnector, ConfluentConnector, RedPandaConnector |
| History | Released FME 2019.0 |
FME Online Resources
The FME Community and Support Center Knowledge Base have a wealth of information, including active forums with 35,000+ members and thousands of articles.
Search for all results about the KafkaConnector on the FME Community.
Examples may contain information licensed under the Open Government Licence – Vancouver, Open Government Licence - British Columbia, and/or Open Government Licence – Canada.