KafkaConnector
Connects to an Apache Kafka cluster to send and receive messages.
Typical Uses
- Geoenrichment of data on a Kafka topic
- Aggregation of data from Kafka topics and other message streams
- Interoperability with Big Data analytics platforms
Usage Notes
- The KafkaConnector connects to a list of bootstrap servers initially, but it may connect to further servers as they are discovered. It does not connect to the ZooKeeper instance directly.
- The KafkaConnector allows for automatic or manual offset commits. For more information about this concept, see "Commits and Offsets" at https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html.
Configuration
Input Ports
This transformer accepts any feature.
Output Ports
The output of this transformer will vary depending on the action performed. Message data are presented as feature attributes.
- After a Receive action, output features represent messages received from the cluster.
- After a Send action, output features represent messages sent to the cluster.
- After a Get Metadata action, the output feature represents the state of the cluster.
- After a Commit action, the output feature represents the committed partition and offset.
The incoming feature is output through this port.
Features that cause the operation to fail are output through this port. An fme_rejection_code attribute, having the value ERROR_DURING_PROCESSING, will be added, along with a more descriptive fme_rejection_message attribute which contains more specific details as to the reason for the failure.
Note: If a feature comes in to the KafkaConnector already having a value for fme_rejection_code, this value will be removed.
Rejected Feature Handling: can be set to either terminate the translation or continue running when it encounters a rejected feature. This setting is available both as a default FME option and as a workspace parameter.
Parameters
Credential Source |
In addition to anonymous access, the KafkaConnector can authenticate with the cluster. Using a web connection integrates best with FME, but in some cases, you may wish to use one of the other sources.
|
Account |
Available when the credential source is Web Connection. To create an Apache Kafka connection, click the 'Account' drop-down box and select 'Add Web Connection...'. The connection can then be managed via Tools -> FME Options... -> Web Connections. |
User Name and Password | Available when the credential source is Embedded. A user name and password can be specified directly in the transformer instead of in a web connection. |
Security Protocol |
|
SASL Mechanism |
|
SSL CA Certificate | When using one of the SSL security protocols, an optional path to an SSL CA certificate for verifying the cluster |
The list of bootstrap servers (host and port) to connect to. See the “bootstrap.servers” configuration option at https://kafka.apache.org/documentation.html for more information.
Action |
The type of operation to perform. Choices are:
|
The remaining parameters available depend on the value of the Request > Action parameter. Parameters for each Action are detailed below.
Topics
Topics |
The list of topics to receive from. Topics can be entered manually, or selected interactively by clicking the ellipses in each row of the table. |
Receive Behavior
Mode |
Two message receiving options are available:
|
Maximum per Second |
In Stream mode, this option limits the number of messages that will be received per second. Leaving the field blank will allow the connector to receive as many messages as possible. |
Batch Size |
In Batch mode, specifies the number of messages to read per batch. |
Receive Options
Consumer Group ID |
The consumer group the KafkaConnector is part of. For more information about consumer groups, consult the Kafka documentation: https://kafka.apache.org/documentation/#intro_consumers |
Starting Offset |
A default starting offset can be specified:
|
Partition |
When Starting Offset is set to Custom, specifies the partition to assign the connector to |
Offset |
When Starting Offset is set to Custom, specifies the offset to start receiving from |
Auto Commit |
Whether to automatically commit read offsets
|
Topics
Topics |
The list of topics to send to. Topics can be entered manually, or selected interactively by clicking the ellipses in each row of the table. |
Send Options
Create Missing Topics |
If Yes is selected, topics will be created automatically if they do not exist. |
Number of Partitions |
When creating topics, new topics will be created with this number of partitions. |
Replication Factor |
When creating topics, new topics will be created with this replication factor. |
The "message options" section is available for sending and receiving messages, but some options are only available when sending.
Message Key |
The contents to set for the message key. The message key is usually used for partitioning and log compaction. Only available for sending. |
Message Value |
The actual message data. This must be a string or binary value. Only available for sending. |
Message Schema |
The message schema type can be either
For more information about Avro schemas, see https://avro.apache.org/docs/current/spec.html. |
Encoding |
For sending, the encoding to use to convert text input value to binary. Numeric attributes will be converted to a text representation of the number before encoding. If fme-binary is selected, strings will be encoded as utf-8, binary attributes will be sent as-is, and other attribute types will not be supported. For receiving, the _key and _value attributes contain the raw bytes received from the server by default. If an encoding is selected, the connector will instead attempt to decode the value of received message to a string using the selected encoding. Available when Simple is selected for Message Schema. |
Schema Registry |
When using Avro schema, the URL for the schema registry. See https://docs.confluent.io/current/schema-registry/index.html for more information on schema management using Confluent Schema Registry. |
Key Schema |
The JSON-based Avro schema to use for the message key. Only available for sending. |
Value Schema |
The JSON-based Avro schema to use for the message value. Only available for sending. |
Metadata Options
Getting metadata produces only one output attribute: _metadata. This attribute contains JSON-formatted metadata. The format is an aggregation of cluster, broker, topic and partition metadata.
{
"cluster_id": < cluster ID > ,
"controller_broker_id": < controller broker ID > ,
"brokers": [
{
"id": < broker ID > ,
"host": < host > ,
"port": < port >
},
...
],
"topics": [
{
"name": < topic name > ,
"partitions": [
{
"id": < partition ID > ,
"leader_broker_id": < leader broker ID > ,
"replica_broker_ids": [< replica broker ID >, ...],
"in_sync_replicas": [< in-sync replica ID >, ...],
},
...
],
},
...
],
}
Topic
Topic |
The topic for which to set the commit offset. |
Send Options
Receiving Connector Identifier |
This parameter must be set to the value of the _connector_id attribute produced by the reading KafkaConnector. |
Offset |
The offset to commit, typically set from the _offset attribute associated with the message. |
Partition |
The partition to commit, typically set from the _partition attribute associated with the message. |
For all actions, it is possible to supply additional advanced options. For sending and receiving, these options are called producer and consumer, respectively.
The options are supplied as a JSON object, and match the options documented at https://docs.confluent.io/current/clients/librdkafka/CONFIGURATION_8md.html. Note that not all options are available for use in the connector.
Example
{
"socket.timeout.ms": 100,
"check.crcs": true
}
If any user-supplied options conflict with those generated internally by the connector, the user-supplied options will be used. For example, FME usually sets "bootstrap.servers" from the Brokers parameter, but if the advanced options contain "bootstrap.servers" as well, that value will be used instead.
The following attributes can be selected for inclusion on the output features. Each output feature represents a message that was either sent or received.
_key |
The key of the message |
_value |
The value of the message |
_len |
The length of the message in bytes |
_offset |
The offset of the message in the partition |
_timestamp |
The message timestamp from the cluster. Depending on server configuration, this timestamp may have slightly different meanings. |
_topic |
The topic the message received from or sent to |
_partition |
The partition the message is stored in |
_headers{}.name |
Structured list attributes containing the header keys and values of a received message. Note that headers for sent messages are not available. |
Editing Transformer Parameters
Using a set of menu options, transformer parameters can be assigned by referencing other elements in the workspace. More advanced functions, such as an advanced editor and an arithmetic editor, are also available in some transformers. To access a menu of these options, click beside the applicable parameter. For more information, see Transformer Parameter Menu Options.
Defining Values
There are several ways to define a value for use in a Transformer. The simplest is to simply type in a value or string, which can include functions of various types such as attribute references, math and string functions, and workspace parameters. There are a number of tools and shortcuts that can assist in constructing values, generally available from the drop-down context menu adjacent to the value field.
Using the Text Editor
The Text Editor provides a convenient way to construct text strings (including regular expressions) from various data sources, such as attributes, parameters, and constants, where the result is used directly inside a parameter.
Using the Arithmetic Editor
The Arithmetic Editor provides a convenient way to construct math expressions from various data sources, such as attributes, parameters, and feature functions, where the result is used directly inside a parameter.
Conditional Values
Set values depending on one or more test conditions that either pass or fail.
Parameter Condition Definition Dialog
Content
Expressions and strings can include a number of functions, characters, parameters, and more.
When setting values - whether entered directly in a parameter or constructed using one of the editors - strings and expressions containing String, Math, Date/Time or FME Feature Functions will have those functions evaluated. Therefore, the names of these functions (in the form @<function_name>) should not be used as literal string values.
These functions manipulate and format strings. | |
Special Characters |
A set of control characters is available in the Text Editor. |
Math functions are available in both editors. | |
Date/Time Functions | Date and time functions are available in the Text Editor. |
These operators are available in the Arithmetic Editor. | |
These return primarily feature-specific values. | |
FME and workspace-specific parameters may be used. | |
Creating and Modifying User Parameters | Create your own editable parameters. |
Dialog Options - Tables
Transformers with table-style parameters have additional tools for populating and manipulating values.
Row Reordering
|
Enabled once you have clicked on a row item. Choices include:
|
Cut, Copy, and Paste
|
Enabled once you have clicked on a row item. Choices include:
Cut, copy, and paste may be used within a transformer, or between transformers. |
Filter
|
Start typing a string, and the matrix will only display rows matching those characters. Searches all columns. This only affects the display of attributes within the transformer - it does not alter which attributes are output. |
Import
|
Import populates the table with a set of new attributes read from a dataset. Specific application varies between transformers. |
Reset/Refresh
|
Generally resets the table to its initial state, and may provide additional options to remove invalid entries. Behavior varies between transformers. |
Note: Not all tools are available in all transformers.
Reference
Processing Behavior |
|
Feature Holding |
No |
Dependencies | Apache Kafka |
FME Licensing Level | FME Base Edition and above |
Aliases | ApacheKafkaConnector |
History | Released FME 2019.0 |
FME Community
The FME Community is the place for demos, how-tos, articles, FAQs, and more. Get answers to your questions, learn from other users, and suggest, vote, and comment on new features.
Search for all results about the KafkaConnector on the FME Community.
Examples may contain information licensed under the Open Government Licence – Vancouver and/or the Open Government Licence – Canada.