Перейти к основному содержимому
Перейти к основному содержимому

Интеграция Apache Beam и ClickHouse

ClickHouse Supported

Apache Beam — это модель программирования с открытым исходным кодом, которая позволяет разработчикам определять и выполнять как пакетные, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam проявляется в его способности поддерживать широкий спектр сценариев обработки данных, от операций ETL (Извлечение, Преобразование, Загрузка) до сложной обработки событий и аналитики в реальном времени. Эта интеграция использует официальный JDBC соединитель ClickHouse для нижележащего слоя вставки.

Пакет интеграции

Пакет интеграции, необходимый для интеграции Apache Beam и ClickHouse, поддерживается и разрабатывается в рамках Apache Beam I/O Connectors — набора интеграций с множеством популярных систем хранения данных и баз данных. Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO находится в репозитории Apache Beam.

Настройка пакета Apache Beam ClickHouse

Установка пакета

Добавьте следующую зависимость в свою систему управления пакетами:

Рекомендуемая версия Beam

Рекомендуется использовать соединитель ClickHouseIO, начиная с версии Apache Beam 2.59.0. Ранее версии могут не полностью поддерживать функциональность соединителя.

Артефакты можно найти в официальном репозитории maven.

Пример кода

Следующий пример считывает CSV файл с именем input.csv как PCollection, преобразует его в объект Row (используя определенную схему) и вставляет его в локальный экземпляр ClickHouse, используя ClickHouseIO:

Поддерживаемые типы данных

ClickHouseApache BeamПоддерживаетсяПримечания
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytes — это LogicalType, представляющий фиксированный по длине
массив байт, расположенный в
org.apache.beam.sdk.schemas.logicaltypes
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

Параметры ClickHouseIO.Write

Вы можете настроить конфигурацию ClickHouseIO.Write с помощью следующих сеттеров:

Функция установки параметраТип аргументаЗначение по умолчаниюОписание
withMaxInsertBlockSize(long maxInsertBlockSize)1000000Максимальный размер блока строк для вставки.
withMaxRetries(int maxRetries)5Максимальное количество попыток повторной вставки в случае ошибки.
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)Максимальная кумулятивная длительность откладывания для повторных попыток.
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)Начальная длительность откладывания перед первой попыткой повторной вставки.
withInsertDistributedSync(Boolean sync)trueЕсли истинно, синхронизирует операции вставки для распределенных таблиц.
withInsertQuorum(Long quorum)nullКоличество реплик, необходимых для подтверждения операции вставки.
withInsertDeduplicate(Boolean deduplicate)trueЕсли истинно, включена дедупликация для операций вставки.
withTableSchema(TableSchema schema)nullСхема целевой таблицы ClickHouse.

Ограничения

Пожалуйста, учитывайте следующие ограничения при использовании соединителя:

  • На сегодняшний день поддерживается только операция Sink. Соединитель не поддерживает операцию Source.
  • ClickHouse выполняет дедупликацию при вставке в ReplicatedMergeTree или в распределённую таблицу, построенную на основе ReplicatedMergeTree. Без репликации вставка в обычный MergeTree может привести к дубликатам, если вставка не удалась, а затем была успешно повторена. Однако каждый блок вставляется атомарно, и размер блока можно настроить с помощью ClickHouseIO.Write.withMaxInsertBlockSize(long). Дедупликация достигается с использованием контрольных сумм вставленных блоков. Для получения дополнительной информации о дедупликации, пожалуйста, посетите Дедупликация и Настройки дедупликации вставки.
  • Соединитель не выполняет никаких DDL операторов; поэтому целевая таблица должна существовать до вставки.