Streams

Use the Streams page to manage jobs you want to run continuously. When a job runs as a stream, it restarts automatically, regardless of whether the job completed successfully, failed, or the server crashed or was shut down.

Workspaces powered by streams usually have an event queue or message broker as a source. Messages are handled by transformers such as AzureQueueStorageConnector or KafkaConnector, that can run in Mode: Stream. The stream allows you to integrate and analyze data in real-time as it is produced. With stream mode, there is no overhead for starting and stopping jobs, which means a very high volume of data can be processed with low latency.

Note   There is no restriction on the type of workspace you can run as a stream, but streams are intended to use a workspace containing a transformer set to Mode: Stream, such as AzureQueueStorageConnector or KafkaConnector. Using a workspace that does not contain such a transformer may cause unexpected behavior including excess logging leading to storage issues, since the job will be resubmitted every time it finishes.

Streams provide the following additional advantages over running jobs continuously through other interfaces:

  • When you create a stream, the FME Engines you assign to run its jobs are dedicated to that stream.
  • You can scale processing throughput by running jobs from the same workspace simultaneously by multiple engines. The number of jobs a stream runs is equal to the number of engines assigned to the stream.
  • Note  If the FME Engines you specify for the stream belong to any queues, they are removed from those queues so that they are dedicated to the stream.
  • You can configure a stream and then pause it.
  • The Streams page shows you all the jobs you run continuously, so you can manage them in one convenient place.

Getting Started with Streams

To create a stream, select Streams > Create Stream. Or, select Streams > Manage Streams and click Create.

  1. Complete the following fields:
    • Name: Provide a name for the stream.
    • Description: (Optional) Provide a meaningful description of the stream, such as the workspace it runs and why you want it to run continuously. Use the Markdown tools as desired.
    • Repository: Specify the repository that contains the workspace you want to run continuously.
    • Workspace: Specify the workspace you want to run continuously.
    • Note  The workspace should contain a transformer set to Mode: Stream, such as AzureQueueStorageConnector or KafkaConnector.
    • Published Parameters: If the workspace has any published parameters, specify them here.
  2. Click OK.
  3. A page opens that summarizes the properties of the stream you created. The stream is ready to run as soon as you assign FME Engines to run the jobs. If you do not want the stream to run immediately, slick Stop.
  4. To assign FME Engines to run the jobs associated with the stream, click Manage under Assigned Engines. When you assign engines to a stream, keep in mind the following:
    • Assigning multiple engines means you can scale processing throughput by running the same job simultaneously. The number of jobs a stream runs is equal to the number of engines assigned to the stream.
    • You can only assign engines that are not already dedicated to other streams.
    • If the FME Engines you specify for the stream belong to any queues, they are removed from those queues so that they are dedicated to the stream.
    • If an engine disconnects from FME Flow (for example, a machine that hosts a separate FME Engine crashes), any running job reverts to a "queued" state, and is returned to the engine when it becomes available.
  5. In the Manage Engine Rule dialog, specify:
    • Type: How you want to assign FME Engines to the stream:
      • Name: Engines are assigned to the stream by engine name
      • Property: Engines are assigned to the stream based on their properties.
    • Engines: If Type is Name, select one or more engines by name to assign to the stream.
    • Rule: If Type is Property, use boolean logic to specify the properties that determine how engines are assigned to the stream.
    • The following properties are available to select without manual entry:

      To add one of these properties, click + Rule and select it.

      Additionally, you can add other engine properties as they appear on the Engines tab, such as the name or operating system of the host machine. To add other properties to the Rule drop-down, click + Create Property. Beside Type, specify Custom, and enter a Property exactly as it appears on the Engines tab.

      As soon as you specify a property, the rule appears beneath the Rule drop-down. To add more properties, click + Rule. To nest properties in the rule, click + Ruleset.

      For example, to specify any dynamic engine on server WHISTLER (a Custom property), specify Dynamic AND WHISTLER.

      When finished, click OK.

  6. Once you assign engines to a stream, it begins to run, unless you stopped it previously. When you are ready to run the stream, click Start; or from the Manage Streams page, select the stream and click Actions > Start.

Managing Streams

Select Streams > Manage Streams.

To start, stop, or remove streams

Select the streams, click Actions, and select Start, Stop, or Remove, respectively.

When a stream is stopped, the associated workspace does not submit any jobs to the stream. Any jobs that are queued or running are canceled.

To view the jobs that are running or in queue through a stream

Click the stream to open it, and scroll to Jobs. Click on a job to open it in the Jobs page.

Tip  Use the Status drop-down to filter the jobs that appear, as follows:
  • All: Queued and Running jobs.
  • Running: Jobs that are currently being run by an engine.
  • Queued: Jobs that are submitted but not yet picked up by an engine to run.

To view all the engines that are assigned to streams

You can view all the engines that are assigned to streams on the Engines page.

To change the engines assigned to a stream

Click the stream to open it. Under Assigned Engines, click Manage to assign engines. Newly-assigned engines run jobs shortly after assignment.