Apache Kafka Plugin
The Apache Kafka plugin (tika-pipes-kafka) provides an emitter (publishes parsed documents to a Kafka topic) and an iterator (consumes fetch requests from a Kafka topic).
| Interface | Component name | Class |
|---|---|---|
Emitter |
|
|
Iterator |
|
|
Kafka Emitter (kafka-emitter)
Publishes each parsed document as a record to a Kafka topic.
{
"emitters": {
"kafe": {
"kafka-emitter": {
"topic": "tika-parsed-docs",
"bootstrapServers": "kafka1.example.com:9092,kafka2.example.com:9092",
"acks": "all",
"lingerMs": 5000,
"batchSize": 16384,
"compressionType": "lz4",
"enableIdempotence": true,
"maxRequestSize": 1048576,
"requestTimeoutMs": 30000,
"deliveryTimeoutMs": 120000,
"clientId": "tika-pipes-emitter"
}
}
}
}
Configuration
Most fields map directly to standard Kafka producer settings; the defaults listed here match Kafka’s own defaults unless noted.
| Field | Default | Description |
|---|---|---|
|
required |
Kafka topic to publish to (validated non-blank). |
|
required |
Comma-separated |
|
|
Producer acks setting: |
|
|
Producer linger in milliseconds. |
|
|
Producer batch size in bytes. |
|
|
Producer buffer memory in bytes (32 MiB). |
|
|
One of |
|
|
Producer connection idle timeout. |
|
|
End-to-end delivery timeout. |
|
|
Enable the idempotent producer. Requires |
|
no default |
Comma-separated list of producer interceptor class names. |
|
|
How long the producer blocks on |
|
|
In-flight requests per connection. |
|
|
Maximum request size in bytes (1 MiB). |
|
|
Metadata refresh interval. |
|
|
Request timeout. |
|
|
Producer retries. Default is |
|
|
Backoff between retries. |
|
|
Transaction timeout (only meaningful with |
|
no default |
Set to enable transactional producer. |
|
no default |
|
|
no default |
Fully-qualified serializer class names. Leave unset to use the plugin’s defaults (string keys, JSON values). |
Kafka Iterator (kafka-pipes-iterator)
Consumes fetch-request messages from a Kafka topic and emits one FetchEmitTuple per message. Useful for building event-driven pipelines where some upstream system pushes work to a queue.
{
"pipes-iterator": {
"kafka-pipes-iterator": {
"topic": "tika-fetch-requests",
"bootstrapServers": "kafka1.example.com:9092,kafka2.example.com:9092",
"groupId": "tika-pipes-iterator",
"autoOffsetReset": "earliest",
"pollDelayMs": 100,
"emitMax": -1,
"fetcherId": "fsf",
"emitterId": "kafe"
}
}
}
Configuration
| Field | Default | Description |
|---|---|---|
|
required |
Kafka topic to consume from. |
|
required |
Broker list. |
|
optional |
Kafka consumer group ID. Strongly recommended in production for failover and partition reassignment. |
|
optional |
Custom (de)serializer class names. |
|
|
What to do on first connect: |
|
|
Sleep between |
|
|
Maximum tuples to emit. |
|
|
Initial rebalance delay for the consumer group. |
|
required |
IDs of the fetcher and emitter to bind to each emitted tuple. See Pipes Iterators for the shared iterator contract. |
Complete Pipeline Example
The example below wires the Kafka iterator (consuming fetch requests) with a filesystem fetcher and a Kafka emitter (publishing parsed results). Common for stream-processing-style document pipelines.
{
"content-handler-factory": {
"basic-content-handler-factory": {
"type": "TEXT",
"writeLimit": -1,
"throwOnWriteLimitReached": true
}
},
"fetchers": {
"fsf": {
"file-system-fetcher": {
"basePath": "/data/input",
"extractFileSystemMetadata": false
}
}
},
"emitters": {
"kafe": {
"kafka-emitter": {
"topic": "tika-parsed-docs",
"bootstrapServers": "kafka1.example.com:9092",
"acks": "all",
"compressionType": "lz4",
"enableIdempotence": true
}
}
},
"pipes-iterator": {
"kafka-pipes-iterator": {
"topic": "tika-fetch-requests",
"bootstrapServers": "kafka1.example.com:9092",
"groupId": "tika-pipes-iterator",
"autoOffsetReset": "earliest",
"fetcherId": "fsf",
"emitterId": "kafe"
}
},
"pipes": {
"parseMode": "RMETA",
"onParseException": "EMIT",
"numClients": 4
}
}
Notes
-
The Kafka plugin uses the official
kafka-clientsSDK. -
The emitter is fire-and-forget at the Tika level; durability is determined by Kafka’s
acksand broker replication factor, not by Tika. -
For exactly-once semantics, set
enableIdempotence: true(and ensureacks: all); for transactional semantics, also settransactionalId. -
The iterator’s
groupIdcontrols partition assignment. Set it explicitly in production — without one, the consumer receives a transient assignment that resets on restart.