Apache Kafka Plugin

The Apache Kafka plugin (tika-pipes-kafka) provides an emitter (publishes parsed documents to a Kafka topic) and an iterator (consumes fetch requests from a Kafka topic).

Interface Component name Class

Emitter

kafka-emitter

KafkaEmitter

Iterator

kafka-pipes-iterator

KafkaPipesIterator

Kafka Emitter (kafka-emitter)

Publishes each parsed document as a record to a Kafka topic.

{
  "emitters": {
    "kafe": {
      "kafka-emitter": {
        "topic": "tika-parsed-docs",
        "bootstrapServers": "kafka1.example.com:9092,kafka2.example.com:9092",
        "acks": "all",
        "lingerMs": 5000,
        "batchSize": 16384,
        "compressionType": "lz4",
        "enableIdempotence": true,
        "maxRequestSize": 1048576,
        "requestTimeoutMs": 30000,
        "deliveryTimeoutMs": 120000,
        "clientId": "tika-pipes-emitter"
      }
    }
  }
}

Configuration

Most fields map directly to standard Kafka producer settings; the defaults listed here match Kafka’s own defaults unless noted.

Field Default Description

topic

required

Kafka topic to publish to (validated non-blank).

bootstrapServers

required

Comma-separated host:port list of Kafka brokers (validated non-blank).

acks

all

Producer acks setting: 0, 1, or all.

lingerMs

5000

Producer linger in milliseconds.

batchSize

16384

Producer batch size in bytes.

bufferMemory

33554432

Producer buffer memory in bytes (32 MiB).

compressionType

none

One of none, gzip, snappy, lz4, zstd.

connectionsMaxIdleMs

540000

Producer connection idle timeout.

deliveryTimeoutMs

120000

End-to-end delivery timeout.

enableIdempotence

false

Enable the idempotent producer. Requires acks=all and maxInFlightRequestsPerConnection⇐5.

interceptorClasses

no default

Comma-separated list of producer interceptor class names.

maxBlockMs

60000

How long the producer blocks on send() when the buffer is full.

maxInFlightRequestsPerConnection

5

In-flight requests per connection.

maxRequestSize

1048576

Maximum request size in bytes (1 MiB).

metadataMaxAgeMs

300000

Metadata refresh interval.

requestTimeoutMs

30000

Request timeout.

retries

2147483647

Producer retries. Default is Integer.MAX_VALUE; capped by deliveryTimeoutMs.

retryBackoffMs

100

Backoff between retries.

transactionTimeoutMs

60000

Transaction timeout (only meaningful with transactionalId).

transactionalId

no default

Set to enable transactional producer.

clientId

no default

client.id to send with each request.

keySerializer / valueSerializer

no default

Fully-qualified serializer class names. Leave unset to use the plugin’s defaults (string keys, JSON values).

Kafka Iterator (kafka-pipes-iterator)

Consumes fetch-request messages from a Kafka topic and emits one FetchEmitTuple per message. Useful for building event-driven pipelines where some upstream system pushes work to a queue.

{
  "pipes-iterator": {
    "kafka-pipes-iterator": {
      "topic": "tika-fetch-requests",
      "bootstrapServers": "kafka1.example.com:9092,kafka2.example.com:9092",
      "groupId": "tika-pipes-iterator",
      "autoOffsetReset": "earliest",
      "pollDelayMs": 100,
      "emitMax": -1,
      "fetcherId": "fsf",
      "emitterId": "kafe"
    }
  }
}

Configuration

Field Default Description

topic

required

Kafka topic to consume from.

bootstrapServers

required

Broker list.

groupId

optional

Kafka consumer group ID. Strongly recommended in production for failover and partition reassignment.

keySerializer / valueSerializer

optional

Custom (de)serializer class names.

autoOffsetReset

earliest

What to do on first connect: earliest or latest.

pollDelayMs

100

Sleep between poll() calls when the topic is idle.

emitMax

-1

Maximum tuples to emit. -1 means unbounded.

groupInitialRebalanceDelayMs

3000

Initial rebalance delay for the consumer group.

fetcherId / emitterId

required

IDs of the fetcher and emitter to bind to each emitted tuple. See Pipes Iterators for the shared iterator contract.

Complete Pipeline Example

The example below wires the Kafka iterator (consuming fetch requests) with a filesystem fetcher and a Kafka emitter (publishing parsed results). Common for stream-processing-style document pipelines.

{
  "content-handler-factory": {
    "basic-content-handler-factory": {
      "type": "TEXT",
      "writeLimit": -1,
      "throwOnWriteLimitReached": true
    }
  },
  "fetchers": {
    "fsf": {
      "file-system-fetcher": {
        "basePath": "/data/input",
        "extractFileSystemMetadata": false
      }
    }
  },
  "emitters": {
    "kafe": {
      "kafka-emitter": {
        "topic": "tika-parsed-docs",
        "bootstrapServers": "kafka1.example.com:9092",
        "acks": "all",
        "compressionType": "lz4",
        "enableIdempotence": true
      }
    }
  },
  "pipes-iterator": {
    "kafka-pipes-iterator": {
      "topic": "tika-fetch-requests",
      "bootstrapServers": "kafka1.example.com:9092",
      "groupId": "tika-pipes-iterator",
      "autoOffsetReset": "earliest",
      "fetcherId": "fsf",
      "emitterId": "kafe"
    }
  },
  "pipes": {
    "parseMode": "RMETA",
    "onParseException": "EMIT",
    "numClients": 4
  }
}

Notes

  • The Kafka plugin uses the official kafka-clients SDK.

  • The emitter is fire-and-forget at the Tika level; durability is determined by Kafka’s acks and broker replication factor, not by Tika.

  • For exactly-once semantics, set enableIdempotence: true (and ensure acks: all); for transactional semantics, also set transactionalId.

  • The iterator’s groupId controls partition assignment. Set it explicitly in production — without one, the consumer receives a transient assignment that resets on restart.