Перейти к основному содержимому
Перейти к основному содержимому

Использование движка таблиц Kafka

Not supported in ClickHouse Cloud
примечание

Движок таблиц Kafka не поддерживается в ClickHouse Cloud. Пожалуйста, рассмотрите ClickPipes или Kafka Connect

Kafka в ClickHouse

Чтобы использовать движок таблиц Kafka, вы должны быть в целом знакомы с материализованными представлениями ClickHouse.

Обзор

Сначала мы сосредоточимся на самом распространенном варианте использования: использовании движка таблиц Kafka для вставки данных в ClickHouse из Kafka.

Движок таблиц Kafka позволяет ClickHouse читать напрямую из темы Kafka. Хотя это полезно для просмотра сообщений в теме, движок по своей сути допускает только одноразовое извлечение, т.е. когда запрос выдается к таблице, он потребляет данные из очереди и увеличивает смещение потребителя перед тем, как вернуть результаты вызывающему. Данные не могут быть повторно прочитаны без сброса этих смещений.

Чтобы сохранить эти данные из чтения движка таблиц, нам нужен способ захвата данных и вставки их в другую таблицу. Материализованные представления на основе триггеров обеспечивают эту функциональность. Материализованное представление инициирует чтение на движке таблиц, получая партии документов. Условие TO определяет назначение данных - обычно это таблица семейства Merge Tree. Этот процесс визуализирован ниже:

Шаги

1. Подготовка

Если у вас есть данные, заполненные в целевой теме, вы можете адаптировать следующее для использования в вашем наборе данных. В качестве альтернативы, образец набора данных Github предоставлен здесь. Этот набор данных используется в приведенных ниже примерах и использует сокращенную схему и подмножество строк (в частности, мы ограничиваемся событиями Github, касающимися репозитория ClickHouse), по сравнению с полным набором данных, доступным здесь, для краткости. Это все еще достаточно для большинства запросов опубликованных с набором данных.

2. Настройка ClickHouse

Этот шаг необходим, если вы подключаетесь к защищенному Kafka. Эти настройки не могут быть переданы через команды SQL DDL и должны быть настроены в конфигурационном файле ClickHouse config.xml. Мы предполагаем, что вы подключаетесь к экземпляру, защищенному SASL. Это самый простой способ взаимодействия с Confluent Cloud.

Разместите приведенный выше фрагмент внутри нового файла в каталоге conf.d/ или объедините его с существующими конфигурационными файлами. Для настроек, которые можно настроить, смотрите здесь.

Мы также создадим базу данных с именем KafkaEngine, чтобы использовать ее в этом учебнике:

После создания базы данных вам нужно будет переключиться на нее:

3. Создание целевой таблицы

Подготовьте вашу целевую таблицу. В приведенном ниже примере мы используем сокращенную схему GitHub для краткости. Обратите внимание, что хотя мы используем движок таблиц MergeTree, этот пример легко можно адаптировать для любого члена семьи MergeTree.

4. Создание и заполнение темы

Далее мы создадим тему. Для этого мы можем использовать несколько инструментов. Если мы запускаем Kafka локально на нашем компьютере или внутри контейнера Docker, RPK отлично подойдет. Мы можем создать тему с именем github с 5 партициями, выполнив следующую команду:

Если мы запускаем Kafka в Confluent Cloud, нам может потребоваться использовать Confluent CLI:

Теперь нам нужно заполнить эту тему данными, что мы сделаем, используя kcat. Мы можем выполнить команду, подобную следующей, если мы запускаем Kafka локально с отключенной аутентификацией:

Или следующую, если наш кластер Kafka использует SASL для аутентификации:

Набор данных содержит 200,000 строк, поэтому он должен быть загружен всего за несколько секунд. Если вы хотите работать с более крупным набором данных, обратите внимание на раздел больших наборов данных репозитория ClickHouse/kafka-samples.

5. Создание движка таблиц Kafka

Приведенный ниже пример создает движок таблиц с той же схемой, что и таблица слияния. Это не обязательно, так как вы можете иметь псевдонимы или эфемерные колонки в целевой таблице. Тем не менее, настройки важны; обратите внимание на использование JSONEachRow в качестве типа данных для потребления JSON из темы Kafka. Значения github и clickhouse представляют собой имя темы и имена групп потребителей соответственно. Темы могут на самом деле быть списком значений.

Мы обсудим настройки движка и настройку производительности ниже. На этом этапе простой выбор из таблицы github_queue должен читать несколько строк. Обратите внимание, что это переместит смещения потребителей вперед, предотвращая повторное чтение этих строк без сброса. Обратите внимание на ограничение и необходимый параметр stream_like_engine_allow_direct_select.

6. Создание материализованного представления

Материализованное представление свяжет две ранее созданные таблицы, читая данные из движка таблиц Kafka и вставляя их в целевую таблицу слияния. Мы можем выполнить несколько преобразований данных. Мы выполним простое чтение и вставку. Использование * предполагает, что имена колонок идентичны (чувствительны к регистру).

На момент создания материализованное представление подключается к движку Kafka и начинает чтение: вставляя строки в целевую таблицу. Этот процесс будет продолжаться бесконечно, с последующими вставками сообщений в Kafka, которые будут потребляться. Не стесняйтесь повторно выполнять скрипт вставки, чтобы вставить дополнительные сообщения в Kafka.

7. Подтверждение вставки строк

Подтвердите существование данных в целевой таблице:

Вы должны увидеть 200,000 строк:

Общие операции

Остановка и перезапуск потребления сообщений

Чтобы остановить потребление сообщений, вы можете отсоединить таблицу движка Kafka:

Это не повлияет на смещения группы потребителей. Чтобы перезапустить потребление и продолжить с предыдущего смещения, повторно присоедините таблицу.

Добавление метаданных Kafka

Полезно отслеживать метаданные из оригинальных сообщений Kafka после их ingested в ClickHouse. Например, мы можем захотеть знать, сколько определенного топика или партиции мы потребили. Для этой цели движок таблиц Kafka предоставляет несколько виртуальных колонок. Эти колонки могут быть сохранены как колонки в нашей целевой таблице, изменив нашу схему и оператор выбора материализованного представления.

Сначала мы выполняем операцию остановки, описанную выше, перед добавлением колонок в нашу целевую таблицу.

Ниже мы добавляем информационные колонки, чтобы идентифицировать исходный топик и партицию, из которой произошла строка.

Затем нам нужно убедиться, что виртуальные колонки отображаются так, как требуется. Виртуальные колонки имеют префикс _. Полный список виртуальных колонок можно найти здесь.

Чтобы обновить нашу таблицу с виртуальными колонками, нам нужно будет удалить материализованное представление, повторно присоединить таблицу движка Kafka и заново создать материализованное представление.

Новые потребляемые строки должны содержать метаданные.

Результат выглядит следующим образом:

actor_loginevent_typecreated_attopicpartition
IgorMinarCommitCommentEvent2011-02-12 02:22:00github0
queeupCommitCommentEvent2011-02-12 02:23:23github0
IgorMinarCommitCommentEvent2011-02-12 02:23:24github0
IgorMinarCommitCommentEvent2011-02-12 02:24:50github0
IgorMinarCommitCommentEvent2011-02-12 02:25:20github0
dapiCommitCommentEvent2011-02-12 06:18:36github0
sourcerebelsCommitCommentEvent2011-02-12 06:34:10github0
jamierumbelowCommitCommentEvent2011-02-12 12:21:40github0
jpnCommitCommentEvent2011-02-12 12:24:31github0
OxoniumCommitCommentEvent2011-02-12 12:31:28github0
Изменение настроек движка Kafka

Рекомендуем удалить таблицу движка Kafka и заново создать ее с новыми настройками. Материализованное представление не нужно изменять в процессе - потребление сообщений возобновится после того, как таблица движка Kafka будет воссоздана.

Устранение проблем

Ошибки, такие как проблемы с аутентификацией, не сообщаются в ответах DDL движка Kafka. Для диагностики проблем мы рекомендуем использовать основной журнал ClickHouse clickhouse-server.err.log. Дополнительный трассировочный журнал для базовой библиотеки клиента Kafka librdkafka может быть включен через конфигурацию.

Работа с неправильно сформированными сообщениями

Kafka часто используется как "помойка" для данных. Это приводит к тому, что темы содержат смешанные форматы сообщений и несоответствующие имена полей. Избегайте этого и используйте возможности Kafka, такие как Kafka Streams или ksqlDB, чтобы убедиться, что сообщения хорошо сформированы и согласованны перед вставкой в Kafka. Если эти варианты невозможны, ClickHouse имеет некоторые функции, которые могут помочь.

  • Рассматривайте поле сообщения как строки. Функции могут использоваться в операторе материализованного представления для выполнения очистки и преобразования формата при необходимости. Это не должно представлять собой решение для производства, но может помочь при одноразовом приеме.
  • Если вы потребляете JSON из темы, используя формат JSONEachRow, используйте настройку input_format_skip_unknown_fields. При записи данных по умолчанию ClickHouse выбрасывает исключение, если входные данные содержат колонки, которые не существуют в целевой таблице. Тем не менее, если этот параметр включен, эти лишние колонки будут игнорироваться. Опять же, это не решение уровня производства и может ввести в заблуждение других.
  • Обратите внимание на настройку kafka_skip_broken_messages. Это требует от пользователя указать уровень терпимости на блок за неправильно сформированные сообщения - учитывается в контексте kafka_max_block_size. Если это терпение превышено (измеряется в абсолютных сообщениях), обычное поведение исключения изменится, и другие сообщения будут пропущены.
Семантика доставки и проблемы с дубликатами

Движок таблиц Kafka имеет семантику «по крайней мере раз». В редких известных обстоятельствах возможны дубликаты. Например, сообщения могут быть прочитаны из Kafka и успешно вставлены в ClickHouse. Прежде чем новое смещение может быть зафиксировано, соединение с Kafka теряется. В таком случае требуется повторная попытка блока. Блок может быть дедуплицирован с использованием распределенной таблицы или ReplicatedMergeTree как целевой таблицы. Хотя это снижает вероятность дублирования строк, это зависит от идентичных блоков. События, такие как перераспределение Kafka, могут аннулировать это предположение, вызывая дубликаты в редких обстоятельствах.

Вставки на основе кворума

Вам могут понадобиться вставки на основе кворума в случаях, когда требуются более высокие гарантии доставки в ClickHouse. Это не может быть установлено на материализованное представление или целевую таблицу. Тем не менее, это можно установить для пользовательских профилей, например:

ClickHouse в Kafka

Хотя это редкий случай использования, данные ClickHouse также могут быть сохранены в Kafka. Например, мы вручную вставим строки в движок таблиц Kafka. Эти данные будут прочитаны тем же движком Kafka, материализованное представление которого поместит данные в таблицу Merge Tree. Наконец, мы продемонстрируем применение материализованных представлений при вставке в Kafka для чтения таблиц из существующих исходных таблиц.

Шаги

Наша первоначальная цель лучше всего иллюстрируется:

Мы предполагаем, что у вас есть таблицы и представления, созданные на первых этапах для Kafka в ClickHouse, и что тема была полностью потреблена.

1. Вставка строк напрямую

Сначала подтвердите количество в целевой таблице.

Вы должны иметь 200,000 строк:

Теперь вставим строки из целевой таблицы GitHub обратно в движок таблиц Kafka github_queue. Обратите внимание, как мы используем формат JSONEachRow и ограничиваем выборку до 100 строк.

Подсчитайте снова строки в GitHub, чтобы подтвердить, что их количество увеличилось на 100. Как показано на приведенной выше диаграмме, строки были вставлены в Kafka через движок таблиц Kafka, прежде чем быть повторно прочитанными тем же движком и вставленными в целевую таблицу GitHub материализованным представлением!

Вы должны увидеть 100 дополнительных строк:

2. Использование материализованных представлений

Мы можем использовать материализованные представления для отправки сообщений в движок Kafka (и тему), когда документы вставляются в таблицу. Когда строки вставляются в таблицу GitHub, срабатывает материализованное представление, что вызывает вставку строк обратно в движок Kafka и в новую тему. Как мы уже сказали, это лучше всего иллюстрируется:

Создайте новую тему Kafka github_out или эквивалент. Убедитесь, что движок таблиц Kafka github_out_queue указывает на эту тему.

Теперь создайте новое материализованное представление github_out_mv, чтобы оно указывало на таблицу GitHub, вставляя строки в вышеописанный движок, когда оно срабатывает. Добавления в таблицу GitHub, как результат, будут отправлены в нашу новую тему Kafka.

Если вы вставите в исходную тему github, созданную в рамках Kafka в ClickHouse, документы магически появятся в теме "github_clickhouse". Подтвердите это с помощью встроенных средств Kafka. Например, ниже мы вставляем 100 строк в тему github, используя kcat для темы, размещенной в Confluent Cloud:

Чтение из темы github_out должно подтвердить доставку сообщений.

Хотя это сложный пример, он иллюстрирует силу материализованных представлений, когда они используются вместе с движком Kafka.

Кластеры и производительность

Работа с кластерами ClickHouse

Через группы потребителей Kafka несколько экземпляров ClickHouse могут потенциально читать из одной и той же темы. Каждый потребитель будет назначен партиции темы в 1:1 отображении. При масштабировании потребления ClickHouse с использованием движка таблиц Kafka имейте в виду, что общее количество потребителей в кластере не может превышать количество партиций в теме. Поэтому убедитесь, что партиционирование настроено соответствующим образом для темы заранее.

Несколько экземпляров ClickHouse могут быть настроены для чтения из темы, используя один и тот же идентификатор группы потребителей - указанного во время создания движка таблиц Kafka. Таким образом, каждый экземпляр будет читать из одной или нескольких партиций, вставляя сегменты в свою локальную целевую таблицу. Целевые таблицы могут, в свою очередь, быть настроены на использование ReplicatedMergeTree для обработки дублирования данных. Этот подход позволяет масштабировать чтение из Kafka вместе с кластером ClickHouse, при условии, что партиций Kafka достаточно.

Настройка производительности

Рассмотрите следующее, когда вы хотите увеличить производительность пропускной способности движка таблиц Kafka:

  • Производительность будет варьироваться в зависимости от размера сообщения, формата и типов целевых таблиц. 100,000 строк/с на одном движке таблиц следует считать достижимым. По умолчанию сообщения читаются блоками, контролируемыми параметром kafka_max_block_size. По умолчанию он установлен на max_insert_block_size, по умолчанию равным 1,048,576. Если только сообщения не очень большие, это следует почти всегда увеличивать. Значения от 500к до 1М не редкость. Тестируйте и оценивайте влияние на производительность.
  • Число потребителей для движка таблиц можно увеличить, используя kafka_num_consumers. Однако по умолчанию вставки будут линейные в одном потоке, если kafka_thread_per_consumer не изменен с значения по умолчанию 1. Установите это значение на 1, чтобы обеспечить выполнение сбросов параллельно. Обратите внимание, что создание таблицы движка Kafka с N потребителями (и kafka_thread_per_consumer=1) логически эквивалентно созданию N движков Kafka, каждый с материализованным представлением и kafka_thread_per_consumer=0.
  • Увеличение количества потребителей не является бесплатной операцией. Каждый потребитель поддерживает свои собственные буферы и потоки, увеличивая нагрузку на сервер. Будьте осторожны с нагрузкой от потребителей и сначала масштабируйте линейно по вашему кластеру, если это возможно.
  • Если пропускная способность сообщений Kafka переменная и задержки допустимы, рассмотрите возможность увеличения stream_flush_interval_ms, чтобы обеспечить сброс больших блоков.
  • background_message_broker_schedule_pool_size устанавливает количество потоков, выполняющих фоновые задачи. Эти потоки используются для потоковой передачи Kafka. Эта настройка применяется при запуске сервера ClickHouse и не может быть изменена в пользовательской сессии, по умолчанию равной 16. Если вы видите тайм-ауты в журналах, возможно, имеет смысл увеличить это значение.
  • Для связи с Kafka используется библиотека librdkafka, которая сама создает потоки. Большое количество таблиц Kafka или потребителей может привести к большому количеству переключений контекста. Либо распределите эту нагрузку по кластеру, повторно реплицируя целевые таблицы, если это возможно, либо рассмотрите возможность использования движка таблиц для чтения из нескольких тем - поддерживается список значений. Несколько материализованных представлений могут читаться из одной таблицы, каждое фильтруя данные из конкретной темы.

Любые изменения настроек должны быть протестированы. Мы рекомендуем контролировать задержки потребителей Kafka, чтобы убедиться, что вы правильно масштабированы.

Дополнительные настройки

Помимо настройки, обсужденной выше, могут быть интересны следующие:

  • Kafka_max_wait_ms - Время ожидания в миллисекундах для чтения сообщений из Kafka перед повторной попыткой. Устанавливается на уровне пользовательского профиля и по умолчанию составляет 5000.

Все настройки из базовой librdkafka также могут быть помещены в конфигурационные файлы ClickHouse внутри элемента kafka - имена настроек должны быть XML элементами с точками, замененными на символы подчеркивания, например:

Это специальные настройки, и мы рекомендуем вам обратиться к документации Kafka для более детального объяснения.