KafkaConnector
Connects to an Apache Kafka cluster. Supports sending (producing) and receiving (consuming) messages.
Typical Uses
- Geoenrichment of data on a Kafka topic
- Aggregation of data from Kafka topics and other message streams
- Interoperability with Big Data analytics platforms
Usage Notes
- The KafkaConnector connects to a list of bootstrap servers initially, but it may connect to further servers as they are discovered. It does not connect to the ZooKeeper instance directly.
Configuration
Input Ports
This transformer accepts any feature.
Output Ports
The output of this transformer will vary depending on the action performed. Message data are presented as feature attributes.
- After a Receive action, output features represent messages received from the cluster.
- After a Send action, output features represent messages sent to the cluster.
- After a Get Metadata action, the output feature represents the state of the cluster.
The incoming feature is output through this port.
Features that cause the operation to fail are output through this port. An fme_rejection_code attribute, having the value ERROR_DURING_PROCESSING, will be added, along with a more descriptive fme_rejection_message attribute which contains more specific details as to the reason for the failure.
Note: If a feature comes in to the KafkaConnector already having a value for fme_rejection_code, this value will be removed.
Rejected Feature Handling: can be set to either terminate the translation or continue running when it encounters a rejected feature. This setting is available both as a default FME option and as a workspace parameter.
Parameters
Credential Source |
In addition to anonymous access, the KafkaConnector can authenticate with the cluster. Using a web connection integrates best with FME, but in some cases, you may wish to use one of the other sources.
|
Account |
Available when the credential source is Web Connection. To create an Apache Kafka connection, click the 'Account' drop-down box and select 'Add Web Connection...'. The connection can then be managed via Tools -> FME Options... -> Web Connections. |
User Name and Password | Available when the credential source is Embedded. A user name and password can be specified directly in the transformer instead of in a web connection. |
The list of bootstrap servers (host and port) to connect to. See the “bootstrap.servers” configuration option at https://kafka.apache.org/documentation.html for more information.
Action |
The type of operation to perform. Choices are:
|
The remaining parameters available depend on the value of the Request > Action parameter. Parameters for each Action are detailed below.
Topics
Topics |
The list of topics to receive from. Topics can be entered manually, or selected interactively by clicking the ellipses in each row of the table. |
Receive Behavior
Mode |
Two message receiving options are available:
|
Maximum per Second |
In Stream mode, this option limits the number of messages that will be received per second. Leaving the field blank will allow the connector to receive as many messages as possible. |
Batch Size |
In Batch mode, specifies the number of messages to read per batch. |
Receive Options
Consumer Group ID |
The consumer group the KafkaConnector is part of. For more information about consumer groups, consult the Kafka documentation: https://kafka.apache.org/documentation/#intro_consumers |
Starting Offset |
A default starting offset can be specified:
|
Partition |
When Starting Offset is set to Custom, specifies the partition to assign the connector to |
Offset |
When Starting Offset is set to Custom, specifies the offset to start receiving from |
Topics
Topics |
The list of topics to send to. Topics can be entered manually, or selected interactively by clicking the ellipses in each row of the table. |
Send Options
Create Missing Topics |
If Yes is selected, topics will be created automatically if they do not exist. |
Number of Partitions |
When creating topics, new topics will be created with this number of partitions. |
Replication Factor |
When creating topics, new topics will be created with this replication factor. |
The "message options" section is available for sending and receiving messages, but some options are only available when sending.
Message Key |
The contents to set for the message key. The message key is usually used for partitioning and log compaction. Only available for sending. |
Message Value |
The actual message data. This must be a string or binary value. Only available for sending. |
Message Schema |
The message schema type can be either
For more information about Avro schemas, see https://avro.apache.org/docs/current/spec.html. |
Encoding |
When using Simple schema, sets the encoding used to encode or decode the message key and value |
Schema Registry |
When using Avro encoding, the URL for the schema registry. See https://docs.confluent.io/current/schema-registry/index.html for more information on schema management using Confluent Schema Registry. |
Key Schema |
The JSON-based Avro schema to use for the message key. Only available for sending. |
Value Schema |
The JSON-based Avro schema to use for the message value. Only available for sending. |
Metadata Options
Getting metadata produces only one output attribute: _metadata. This attribute contains JSON-formatted metadata. The format is an aggregation of cluster, broker, topic and partition metadata.
{
"cluster_id": < cluster ID > ,
"controller_broker_id": < controller broker ID > ,
"brokers": [
{
"id": < broker ID > ,
"host": < host > ,
"port": < port >
},
...
],
"topics": [
{
"name": < topic name > ,
"partitions": [
{
"id": < partition ID > ,
"leader_broker_id": < leader broker ID > ,
"replica_broker_ids": [< replica broker ID >, ...],
"in_sync_replicas": [< in-sync replica ID >, ...],
},
...
],
},
...
],
}
For all actions, it is possible to supply additional advanced options. For sending and receiving, these options are called producer and consumer, respectively.
The options are supplied as a JSON object, and match the options documented at https://docs.confluent.io/current/clients/librdkafka/CONFIGURATION_8md.html. Note that not all options are available for use in the connector.
Example
{
"socket.timeout.ms": 100,
"check.crcs": true
}
If any user-supplied options conflict with those generated internally by the connector, the user-supplied options will be used. For example, FME usually sets "bootstrap.servers" from the Brokers parameter, but if the advanced options contain "bootstrap.servers" as well, that value will be used instead.
The following attributes can be selected for inclusion on the output features. Each output feature represents a message that was either sent or received.
_key |
The key of the message |
_value |
The value of the message |
_len |
The length of the message in bytes |
_offset |
The offset of the message in the partition |
_timestamp |
The message timestamp from the cluster. Depending on server configuration, this timestamp may have slightly different meanings. |
_topic |
The topic the message received from or sent to |
_partition |
The partition the message is stored in |
_headers{}.name |
Structured list attributes containing the header keys and values of a received message. Note that headers for sent messages are not available. |
Editing Transformer Parameters
Using a set of menu options, transformer parameters can be assigned by referencing other elements in the workspace. More advanced functions, such as an advanced editor and an arithmetic editor, are also available in some transformers. To access a menu of these options, click beside the applicable parameter. For more information, see Transformer Parameter Menu Options.
Defining Values
There are several ways to define a value for use in a Transformer. The simplest is to simply type in a value or string, which can include functions of various types such as attribute references, math and string functions, and workspace parameters. There are a number of tools and shortcuts that can assist in constructing values, generally available from the drop-down context menu adjacent to the value field.
Using the Text Editor
The Text Editor provides a convenient way to construct text strings (including regular expressions) from various data sources, such as attributes, parameters, and constants, where the result is used directly inside a parameter.
Using the Arithmetic Editor
The Arithmetic Editor provides a convenient way to construct math expressions from various data sources, such as attributes, parameters, and feature functions, where the result is used directly inside a parameter.
Conditional Values
Set values depending on one or more test conditions that either pass or fail.
Parameter Condition Definition Dialog
Content
Expressions and strings can include a number of functions, characters, parameters, and more.
When setting values - whether entered directly in a parameter or constructed using one of the editors - strings and expressions containing String, Math, Date/Time or FME Feature Functions will have those functions evaluated. Therefore, the names of these functions (in the form @<function_name>) should not be used as literal string values.
These functions manipulate and format strings. | |
Special Characters |
A set of control characters is available in the Text Editor. |
Math functions are available in both editors. | |
Date/Time Functions | Date and time functions are available in the Text Editor. |
These operators are available in the Arithmetic Editor. | |
These return primarily feature-specific values. | |
FME and workspace-specific parameters may be used. | |
Creating and Modifying User Parameters | Create your own editable parameters. |
Reference
Processing Behavior |
|
Feature Holding |
No |
Dependencies | Apache Kafka |
FME Licensing Level | FME Base Edition and above |
Aliases | ApacheKafkaConnector |
History | Released FME 2019.0 |
Categories |
FME Community
The FME Community is the place for demos, how-tos, articles, FAQs, and more. Get answers to your questions, learn from other users, and suggest, vote, and comment on new features.
Search for all results about the KafkaConnector on the FME Community.
Examples may contain information licensed under the Open Government Licence – Vancouver