5

I am using Apache Kafka V3.1 in docker and trying to orchestrate it with Nomad. I am facing a problem creating a distributed cluster.

the goal is to have 3 broker/controller nodes on 3 EC2 Instances

:~$ nslookup broker.service.brain.consul
Server:         127.0.0.1
Address:        127.0.0.1#53

Name:   broker.service.brain.consul
Address: 30.10.12.52
Name:   broker.service.brain.consul
Address: 30.10.11.8
Name:   broker.service.brain.consul
Address: 30.10.13.172

from inside one of the Nomad Client Instances:

IPv4 address for docker0: 172.17.0.1
IPv4 address for ens5:    30.10.13.172
IPv4 address for nomad:   172.26.64.1

here is the relevant Nomad Job configuration

job "kafka" {
  datacenters = ["stream"]
  type = "service"
  group "broker" {
    count = 3
    service {
      name = "broker"
      port = "9092"
      tags = ["kafka","broker"]
      connect {
        sidecar_service {}
      }
    }
    network {
      mode = "bridge"
      hostname = "${attr.unique.hostname}"
      dns {
        servers = ["172.17.0.1"]
      }
      port "broker" {
        static = 9092
        to     = 9092
      }
      port "controler" {
        static = 9093
        to     = 9093
      }
    }
...
    task "broker" {

      driver = "docker"
      config {

        image = "registry.gitlab.com/.../kafka"
        volumes = ["files/server.properties:/kafka/config/kraft/server.properties"]
        

        ports = [
          "broker",
          "controler"
        ]
...

the server.properties after rendering from template looks as following: (the node.id changes accross 3 brokers)

process.roles=broker,controller
node.id=2
controller.quorum.voters=1@30.10.11.8:9093,2@30.10.12.52:9093,3@30.10.13.172:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://:9092
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
num.network.threads=3
num.io.threads=8
request.timeout.ms=60000
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/logs/kraft-combined-logs

However, The cluster is not able to start and it seems to be a connections issue.


[2022-01-24 01:31:15,405] ERROR [BrokerLifecycleManager id=2] Shutting down because we were unable to register with the controller quorum. (kafka.server.BrokerLifecycleManager)
[2022-01-24 01:31:15,407] INFO [BrokerLifecycleManager id=2] registrationTimeout: shutting down event queue. (org.apache.kafka.queue.KafkaEventQueue)
[2022-01-24 01:31:15,407] INFO [BrokerLifecycleManager id=2] Transitioning from STARTING to SHUTTING_DOWN. (kafka.server.BrokerLifecycleManager)
[2022-01-24 01:31:15,408] INFO [BrokerServer id=2] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2022-01-24 01:31:15,408] INFO [BrokerToControllerChannelManager broker=2 name=heartbeat]: Shutting down (kafka.server.BrokerToControllerRequestThread)
[2022-01-24 01:31:15,409] INFO [BrokerToControllerChannelManager broker=2 name=heartbeat]: Stopped (kafka.server.BrokerToControllerRequestThread)
[2022-01-24 01:31:15,410] INFO [BrokerToControllerChannelManager broker=2 name=heartbeat]: Shutdown completed (kafka.server.BrokerToControllerRequestThread)
[2022-01-24 01:31:15,412] ERROR [BrokerServer id=2] Fatal error during broker startup. Prepare to shutdown (kafka.server.BrokerServer)
java.util.concurrent.CancellationException
    at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2396)
    at kafka.server.BrokerLifecycleManager$ShutdownEvent.run(BrokerLifecycleManager.scala:478)
    at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:174)
    at java.base/java.lang.Thread.run(Thread.java:829)
[2022-01-24 01:31:15,417] INFO [BrokerServer id=2] Transition from STARTED to SHUTTING_DOWN (kafka.server.BrokerServer)

...

also 

...

[2022-01-24 02:02:19,304] INFO [RaftManager nodeId=2] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 10341 ms. (org.apache.kafka.clients.NetworkClient)
[2022-01-24 02:02:19,306] INFO [RaftManager nodeId=2] Disconnecting from node 3 due to socket connection setup timeout. The timeout value is 11036 ms. (org.apache.kafka.clients.NetworkClient)
[2022-01-24 02:02:20,100] INFO [RaftManager nodeId=2] Re-elect as candidate after election backoff has completed (org.apache.kafka.raft.KafkaRaftClient)

I did try to set the listeners to match the new docker hostname hostname = "${attr.unique.hostname}", or to the EC2 host IP but those didn't help neither.

I've spent few days on this puzzle but currently I am out of ideas. Would appreciate any help on this issue.

Kingindanord
  • 1,754
  • 2
  • 19
  • 48
  • 1
    Is this issue got resolved. I am also facing same issue. Can you help if you have fixed this issue – bharathi Oct 12 '22 at 04:05
  • I remember that I did switch to host network mode and it worked. if you don't want to use host mode you need to configure an overlay network (i.e. Calico). – Kingindanord Oct 14 '22 at 13:12

1 Answers1

1

I had similar problem in K8s, when images pulling was very slow. In fact, first started instance must wait for another instances, but still restarted due to missing another instances, that still was in image pulling state.

For my case was helpful set Kafka setting initial.broker.registration.timeout.ms to 240000 (4 minutes). So, first started Kafka instance was waiting 4 minutes to another instances and it don't exit with the error mentioned early. By default have the setting value 60000 (1 minute), which was too low for me.

If you still get the error message, I suspect connection miss-configuration, where Kafka instances not see each another.

I used image from bitnami version 3.3.2 with parameters:

- name: KAFKA_ENABLE_KRAFT
  value: "yes"
- name: KAFKA_KRAFT_CLUSTER_ID # must be generated and unique for each Kafka cluster
  value: "<<REDACTED>>"
- name: KAFKA_CFG_PROCESS_ROLES
  value: "controller,broker"
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
  value: 0@kafka-inst-0:9093,1@kafka-inst-1:9093,2@kafka-inst-2:9093,3@kafka-inst-3:9093,...
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
  value: CONTROLLER
- name: KAFKA_CFG_LISTENERS
  value: "PLAINTEXT://:9092,CONTROLLER://:9093"
- name: BROKER_ID_COMMAND # need for get broker ID
  value: "hostname | awk -F'-' '{print $NF}'"
- name: KAFKA_HEAP_OPTS
  value: "-Xmx1G -Xms1G"
- name: ALLOW_PLAINTEXT_LISTENER
  value: "yes"
- name: KAFKA_CFG_INITIAL_BROKER_REGISTRATION_TIMEOUT_MS
  value: "240000"
Adrian Mole
  • 49,934
  • 160
  • 51
  • 83
Adavan
  • 63
  • 2
  • 6