Автоматическое удаление группы потребителей кафки после отключения

В моем Java-приложении раз в пару секунд я назначаю определенную тему TopicPartition потребителю и пытаюсь прочитать конкретное сообщение из определенной темы + раздела. После прочтения сообщения (используя poll () ) я сразу отключаю потребителя.

Поскольку приведенный выше сценарий может работать в многопоточной среде. Имя группы потребителей имеет префикс + случайный хэш, например, my_consumer_group_EWQSV (поскольку kafka не назначит один и тот же конкретный раздел двум потребителям в одной группе).

Проблема в том, что я не мог сказать kafka удалить этих потребителей после того, как они отключились (так как эти потребители просто временные), есть ли способ сделать это? (не вручную, я имею в виду, используя конфигурацию или что-то в этом роде, я не смог найти никакой конф, такой как «auto-delete-after-consumer-disconnect»)

Спасибо :)

Всего 1 ответ


В Java, чтобы вручную потреблять определенное сообщение от kafka без создания группы потребителей:

Следующего достаточно:

kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        TopicPartition tp = new TopicPartition(topic, partition);
        try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {
            consumer.assign(Collections.singletonList(tp));
            consumer.seek(tp, offset);
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
        } 

Важными частями являются:

  • установите для свойства enable.auto.commit значение false (как показано выше).
  • НЕ устанавливайте свойство group.id ( ConsumerConfig.GROUP_ID_CONFIG ), потому что идентификатор не нужен.
  • НЕ используйте метод подписки, который автоматически назначает разделы, вместо этого используйте методы assign и seek для чтения сообщения вручную, как показано выше.

Есть идеи?

10000