Перейти к основному содержанию
Перейти к основному содержанию

Коннектор Spark

ClickHouse Supported

Этот коннектор использует оптимизации, специфичные для ClickHouse, такие как продвинутое партиционирование и проталкивание предикатов (predicate pushdown), для повышения производительности запросов и эффективности обработки данных. Коннектор основан на официальном JDBC‑коннекторе ClickHouse и управляет собственным каталогом.

До версии Spark 3.0 в Spark отсутствовало встроенное понятие каталога, поэтому пользователи обычно полагались на внешние системы каталогов, такие как Hive Metastore или AWS Glue. При использовании этих внешних решений пользователям приходилось регистрировать таблицы источников данных вручную, прежде чем получать к ним доступ в Spark. Однако, начиная с Spark 3.0, в котором была введена концепция каталога, Spark теперь может автоматически обнаруживать таблицы за счёт регистрации плагинов каталогов.

Каталог Spark по умолчанию — это spark_catalog, а таблицы идентифицируются как {catalog name}.{database}.{table}. С появлением новой функциональности каталогов стало возможным добавлять и использовать несколько каталогов в одном приложении Spark.

Выбор между Catalog API и TableProvider API

Коннектор ClickHouse для Spark поддерживает два способа доступа: Catalog API и TableProvider API (доступ, основанный на формате). Понимание различий между ними поможет выбрать подходящий подход для вашего варианта использования.

Catalog API и TableProvider API

ХарактеристикаCatalog APITableProvider API
ConfigurationЦентрализованная через конфигурацию SparkДля каждой операции через options
Table DiscoveryАвтоматическое обнаружение через каталогРучное указание таблицы
DDL OperationsПолная поддержка (CREATE, DROP, ALTER)Ограниченная (только автоматическое создание таблицы)
Spark SQL IntegrationНативная (clickhouse.database.table)Требуется указание формата
Use CaseДолгосрочные стабильные подключения с централизованной конфигурациейРазовый, динамический или временный доступ

Требования

  • Java 8 или 17 (для Spark 4.0 требуется Java 17+)
  • Scala 2.12 или 2.13 (Spark 4.0 поддерживает только Scala 2.13)
  • Apache Spark версий 3.3, 3.4, 3.5 или 4.0

Матрица совместимости

ВерсияСовместимые версии SparkВерсия ClickHouse JDBC
mainSpark 3.3, 3.4, 3.5, 4.00.9.4
0.9.0Spark 3.3, 3.4, 3.5, 4.00.9.4
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Не зависит от версии
0.3.0Spark 3.2, 3.3Не зависит от версии
0.2.1Spark 3.2Не зависит от версии
0.1.2Spark 3.2Не зависит от версии

Установка и настройка

Для интеграции ClickHouse со Spark доступно несколько вариантов установки, подходящих для разных конфигураций проектов. Вы можете добавить коннектор ClickHouse Spark как зависимость непосредственно в файл сборки вашего проекта (например, в pom.xml для Maven или build.sbt для SBT). Либо вы можете поместить необходимые JAR-файлы в каталог $SPARK_HOME/jars/ или передать их напрямую как параметр Spark, используя флаг --jars в команде spark-submit. Оба подхода обеспечивают доступность коннектора ClickHouse в вашей среде Spark.

Импортировать как зависимость

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Добавьте следующий репозиторий, если хотите использовать версию SNAPSHOT.

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

Скачайте библиотеку

Имя бинарного JAR-файла имеет следующий шаблон:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

Вы можете найти все доступные релизные JAR‑файлы в Maven Central Repository и все SNAPSHOT‑JAR‑файлы ежедневных сборок в Sonatype OSS Snapshots Repository.

Справочные материалы

Крайне важно включить clickhouse-jdbc JAR с классификатором "all", так как коннектор зависит от clickhouse-http и clickhouse-client, которые оба входят в состав clickhouse-jdbc:all. В качестве альтернативы вы можете добавить clickhouse-client JAR и clickhouse-http по отдельности, если предпочитаете не использовать полный JDBC‑пакет.

В любом случае убедитесь, что версии пакетов совместимы в соответствии с матрицей совместимости.

Регистрация каталога (обязательно)

Чтобы получить доступ к вашим таблицам ClickHouse, необходимо настроить новый каталог Spark со следующими параметрами:

PropertyValueDefault ValueRequired
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(пустая строка)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formatjsonarrowNo

Эти настройки можно задать одним из следующих способов:

  • Отредактировать или создать spark-defaults.conf.
  • Передать конфигурацию в команду spark-submit (или в CLI-команды spark-shell/spark-sql).
  • Добавить конфигурацию при инициализации контекста.
Справочные материалы

При работе с кластером ClickHouse необходимо задать уникальное имя каталога для каждого экземпляра. Например:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

Таким образом, вы сможете получить доступ к таблице <ck_db>.<ck_table> на clickhouse1 из Spark SQL как clickhouse1.<ck_db>.<ck_table>, а к таблице <ck_db>.<ck_table> на clickhouse2 как clickhouse2.<ck_db>.<ck_table>.

Использование API TableProvider (доступ на основе формата)

Помимо подхода, основанного на каталоге, коннектор ClickHouse для Spark поддерживает модель доступа, основанную на формате, через API TableProvider.

Пример чтения с использованием формата

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read from ClickHouse using format API
df = spark.read \
    .format("clickhouse") \
    .option("host", "your-clickhouse-host") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "your_table") \
    .option("user", "default") \
    .option("password", "your_password") \
    .option("ssl", "true") \
    .load()

df.show()

Пример записи с использованием формата

# Write to ClickHouse using format API
df.write \
    .format("clickhouse") \
    .option("host", "your-clickhouse-host") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "your_table") \
    .option("user", "default") \
    .option("password", "your_password") \
    .option("ssl", "true") \
    .mode("append") \
    .save()

Возможности TableProvider

API TableProvider предоставляет ряд мощных возможностей:

Автоматическое создание таблиц

При записи в несуществующую таблицу коннектор автоматически создаёт таблицу с соответствующей схемой. Коннектор использует разумные значения по умолчанию:

  • Движок: По умолчанию используется MergeTree(), если не указано иное. Вы можете указать другой движок с помощью опции engine (например, ReplacingMergeTree(), SummingMergeTree() и т. д.).
  • ORDER BY: Обязателен — вы должны явно указать опцию order_by при создании новой таблицы. Коннектор проверяет, что все указанные столбцы существуют в схеме.
  • Поддержка ключей с типом Nullable: Автоматически добавляет settings.allow_nullable_key=1, если ORDER BY содержит столбцы типа Nullable.
# Table will be created automatically with explicit ORDER BY (required)
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "new_table") \
    .option("order_by", "id") \
    .mode("append") \
    .save()

# Specify table creation options with custom engine
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "new_table") \
    .option("order_by", "id, timestamp") \
    .option("engine", "ReplacingMergeTree()") \
    .option("settings.allow_nullable_key", "1") \
    .mode("append") \
    .save()
Справочные материалы

ORDER BY обязательно: Параметр order_by обязателен при создании новой таблицы через TableProvider API. Вы должны явно указать, какие столбцы использовать в предложении ORDER BY. Коннектор проверяет, что все указанные столбцы присутствуют в схеме, и выдаст ошибку, если каких-либо столбцов не окажется.

Выбор движка: Движок по умолчанию — MergeTree(), но вы можете указать любой движок таблицы ClickHouse с помощью параметра engine (например, ReplacingMergeTree(), SummingMergeTree(), AggregatingMergeTree() и т. д.).

Параметры подключения TableProvider

При использовании API на основе формата доступны следующие параметры подключения:

Параметры подключения

OptionОписаниеЗначение по умолчаниюОбязательно
hostИмя хоста сервера ClickHouselocalhostДа
protocolПротокол подключения (http или https)httpНет
http_portПорт HTTP/HTTPS8123Нет
databaseИмя базы данныхdefaultДа
tableИмя таблицыN/AДа
userИмя пользователя для аутентификацииdefaultНет
passwordПароль для аутентификации(пустая строка)Нет
sslВключить подключение по SSLfalseНет
ssl_modeРежим SSL (NONE, STRICT и т. д.)STRICTНет
timezoneЧасовой пояс для операций с датой и временемserverНет

Параметры создания таблицы

Эти параметры используются, когда таблица не существует и её нужно создать:

OptionDescriptionDefault ValueRequired
order_byСтолбец(ы), используемые в клаузе ORDER BY. Для нескольких столбцов — список через запятуюN/AДа
engineДвижок таблицы ClickHouse (например, MergeTree(), ReplacingMergeTree(), SummingMergeTree(), и т. д.)MergeTree()Нет
settings.allow_nullable_keyВключить ключи с типом Nullable в ORDER BY (для ClickHouse Cloud)Определяется автоматически**Нет
settings.<key>Любая настройка таблицы ClickHouseN/AНет
clusterИмя кластера для distributed таблицN/AНет
clickhouse.column.<name>.variant_typesСписок через запятую типов ClickHouse для столбцов типа Variant (например, String, Int64, Bool, JSON). Имена типов чувствительны к регистру. Пробелы после запятых необязательны.N/AНет

* Параметр order_by обязателен при создании новой таблицы. Все указанные столбцы должны присутствовать в схеме.
** Автоматически устанавливается в 1, если ORDER BY содержит столбцы с типом Nullable и параметр не задан явно.

Совет

Рекомендация: Для ClickHouse Cloud явно указывайте settings.allow_nullable_key=1, если ваши столбцы в ORDER BY могут иметь тип Nullable, так как ClickHouse Cloud требует эту настройку.

Режимы записи

Коннектор Spark (как через TableProvider API, так и через Catalog API) поддерживает следующие режимы записи в Spark:

  • append: Добавляет данные в существующую таблицу
  • overwrite: Заменяет все данные в таблице (полностью очищает таблицу)
Справочные материалы

Перезапись партиций не поддерживается: Коннектор в настоящее время не поддерживает операции перезаписи на уровне партиций (например, режим overwrite с partitionBy). Эта функция находится в разработке. См. GitHub issue #34 для отслеживания статуса этой функции.

# Overwrite mode (truncates table first)
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "my_table") \
    .mode("overwrite") \
    .save()

Настройка параметров ClickHouse

И Catalog API, и TableProvider API поддерживают настройку параметров, специфичных для ClickHouse (а не параметров коннектора). Эти параметры передаются в ClickHouse при создании таблиц или выполнении запросов.

Параметры ClickHouse позволяют задавать такие специфические настройки, как allow_nullable_key, index_granularity и другие настройки на уровне таблицы или запроса. Они отличаются от параметров коннектора (таких как host, database, table), которые определяют, как коннектор подключается к ClickHouse.

Использование TableProvider API

При работе с TableProvider API используйте формат параметра settings.<key>:

df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "my_table") \
    .option("order_by", "id") \
    .option("settings.allow_nullable_key", "1") \
    .option("settings.index_granularity", "8192") \
    .mode("append") \
    .save()

Использование Catalog API

При работе с Catalog API используйте формат spark.sql.catalog.<catalog_name>.option.<key> в конфигурации Spark:

spark.sql.catalog.clickhouse.option.allow_nullable_key 1
spark.sql.catalog.clickhouse.option.index_granularity 8192

Или задайте их при создании таблиц в Spark SQL:

CREATE TABLE clickhouse.default.my_table (
  id INT,
  name STRING
) USING ClickHouse
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  'settings.allow_nullable_key' = '1',
  'settings.index_granularity' = '8192'
)

Настройки ClickHouse Cloud

При подключении к ClickHouse Cloud убедитесь, что включён SSL и задан соответствующий режим SSL. Например:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

Чтение данных

public static void main(String[] args) {
        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

Запись данных

Справочные материалы

Перезапись партиций не поддерживается: Catalog API в данный момент не поддерживает операции перезаписи на уровне партиций (например, режим overwrite с partitionBy). Эта функция находится в разработке. См. задачу GitHub №34 для отслеживания реализации этой функции.

 public static void main(String[] args) throws AnalysisException {

        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        // Define the schema for the DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
        });

        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob")
        );

        // Create a DataFrame
        Dataset<Row> df = spark.createDataFrame(data, schema);

        df.writeTo("clickhouse.default.example_table").append();

        spark.stop();
    }

Операции DDL

Вы можете выполнять операции DDL в вашем экземпляре ClickHouse с помощью Spark SQL; все изменения сразу же сохраняются в ClickHouse. Spark SQL позволяет писать запросы так же, как в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие, без каких-либо изменений, например:

Примечание

При использовании Spark SQL за один раз может быть выполнена только одна инструкция SQL.

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

Приведённые выше примеры демонстрируют запросы Spark SQL, которые вы можете выполнять в своём приложении через любой из API — Java, Scala, PySpark или shell.

Работа с VariantType

Примечание

Поддержка VariantType доступна в Spark 4.0+ и требует ClickHouse 25.3+ с включёнными экспериментальными типами JSON/Variant.

Коннектор поддерживает тип Spark VariantType для работы с полуструктурированными данными. VariantType отображается на типы ClickHouse JSON и Variant, что позволяет эффективно хранить и выполнять запросы к данным с гибкой схемой.

Примечание

В этом разделе основное внимание уделяется именно отображению и использованию VariantType. Полный обзор всех поддерживаемых типов данных см. в разделе Supported data types.

Сопоставление типов ClickHouse

Тип ClickHouseТип SparkОписание
JSONVariantTypeХранит только JSON-объекты (которые должны начинаться с {)
Variant(T1, T2, ...)VariantTypeХранит значения разных типов, включая примитивы, массивы и JSON

Чтение данных VariantType

При чтении из ClickHouse столбцы JSON и Variant автоматически отображаются в тип VariantType в Spark:

// Read JSON column as VariantType
val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")

// Access variant data
df.show()

// Convert variant to JSON string for inspection
import org.apache.spark.sql.functions._
df.select(
  col("id"),
  to_json(col("data")).as("data_json")
).show()

Запись данных типа VariantType

Вы можете записывать данные типа VariantType в ClickHouse, используя типы столбцов JSON или Variant:

import org.apache.spark.sql.functions._

// Create DataFrame with JSON data
val jsonData = Seq(
  (1, """{"name": "Alice", "age": 30}"""),
  (2, """{"name": "Bob", "age": 25}"""),
  (3, """{"name": "Charlie", "city": "NYC"}""")
).toDF("id", "json_string")

// Parse JSON strings to VariantType
val variantDF = jsonData.select(
  col("id"),
  parse_json(col("json_string")).as("data")
)

// Write to ClickHouse with JSON type (JSON objects only)
variantDF.writeTo("clickhouse.default.user_data").create()

// Or specify Variant with multiple types
spark.sql("""
  CREATE TABLE clickhouse.default.mixed_data (
    id INT,
    data VARIANT
  ) USING clickhouse
  TBLPROPERTIES (
    'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON',
    'engine' = 'MergeTree()',
    'order_by' = 'id'
  )
""")

Создание таблиц VariantType с помощью Spark SQL

Вы можете создавать таблицы VariantType с помощью DDL Spark SQL:

-- Create table with JSON type (default)
CREATE TABLE clickhouse.default.json_table (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)
-- Create table with Variant type supporting multiple types
CREATE TABLE clickhouse.default.flexible_data (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

Настройка типов Variant

При создании таблиц со столбцами типа VariantType вы можете указать, какие типы ClickHouse следует использовать:

Тип JSON (по умолчанию)

Если свойство variant_types не указано, столбец по умолчанию имеет тип JSON в ClickHouse, который принимает только объекты в формате JSON:

CREATE TABLE clickhouse.default.json_table (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

В результате будет сформирован следующий запрос к ClickHouse:

CREATE TABLE json_table (id Int32, data JSON) ENGINE = MergeTree() ORDER BY id

Тип Variant с несколькими типами

Чтобы поддерживать примитивы, массивы и JSON-объекты, укажите эти типы в свойстве variant_types:

CREATE TABLE clickhouse.default.flexible_data (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

Это создаёт следующий запрос к ClickHouse:

CREATE TABLE flexible_data (
  id Int32, 
  data Variant(String, Int64, Float64, Bool, Array(String), JSON)
) ENGINE = MergeTree() ORDER BY id

Поддерживаемые типы Variant

В Variant() могут использоваться следующие типы ClickHouse:

  • Примитивы: String, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64, Bool
  • Массивы: Array(T), где T — любой поддерживаемый тип, включая вложенные массивы
  • JSON: JSON для хранения объектов JSON

Конфигурация формата чтения

По умолчанию столбцы JSON и Variant читаются как VariantType. Это поведение можно изменить, чтобы читать их как строки:

// Read JSON/Variant as strings instead of VariantType
spark.conf.set("spark.clickhouse.read.jsonAs", "string")

val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
// data column will be StringType containing JSON strings

Поддержка формата записи

Поддержка записи типа VariantType зависит от формата:

ФорматПоддержкаПримечания
JSON✅ ПолнаяПоддерживает типы JSON и Variant. Рекомендуется для данных типа VariantType
Arrow⚠️ ЧастичнаяПоддерживает запись в тип ClickHouse JSON. Не поддерживает тип ClickHouse Variant. Полная поддержка появится после решения задачи https://github.com/ClickHouse/ClickHouse/issues/92752

Настройте формат записи:

spark.conf.set("spark.clickhouse.write.format", "json")  // Recommended for Variant types
Совет

Если вам нужно записывать данные в тип данных ClickHouse Variant, используйте формат JSON. Формат Arrow поддерживает запись только в тип JSON.

Рекомендации по лучшим практикам

  1. Используйте тип JSON для данных только в формате JSON: Если вы храните исключительно JSON-объекты, используйте тип JSON по умолчанию (без свойства variant_types).
  2. Явно указывайте типы: При использовании Variant() явно перечисляйте все типы, которые вы планируете хранить.
  3. Включите экспериментальные функции: Убедитесь, что в ClickHouse включён параметр allow_experimental_json_type = 1.
  4. Используйте формат JSON для записи: Формат JSON рекомендуется для данных типа VariantType для лучшей совместимости.
  5. Учитывайте характер запросов: Типы JSON/Variant поддерживают JSON path-запросы ClickHouse для эффективной фильтрации.
  6. Подсказки по столбцам для повышения производительности: При использовании полей JSON в ClickHouse добавление подсказок по столбцам улучшает производительность запросов. В настоящее время добавление подсказок по столбцам через Spark не поддерживается. Для отслеживания этой функциональности см. GitHub issue #497.

Пример: полный сценарий работы

import org.apache.spark.sql.functions._

// Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1")

// Create table with Variant column
spark.sql("""
  CREATE TABLE clickhouse.default.events (
    event_id BIGINT,
    event_time TIMESTAMP,
    event_data VARIANT
  ) USING clickhouse
  TBLPROPERTIES (
    'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON',
    'engine' = 'MergeTree()',
    'order_by' = 'event_time'
  )
""")

// Prepare data with mixed types
val events = Seq(
  (1L, "2024-01-01 10:00:00", """{"action": "login", "user_id": 123}"""),
  (2L, "2024-01-01 10:05:00", """{"action": "purchase", "amount": 99.99}"""),
  (3L, "2024-01-01 10:10:00", """{"action": "logout", "duration": 600}""")
).toDF("event_id", "event_time", "json_data")

// Convert to VariantType and write
val variantEvents = events.select(
  col("event_id"),
  to_timestamp(col("event_time")).as("event_time"),
  parse_json(col("json_data")).as("event_data")
)

variantEvents.writeTo("clickhouse.default.events").append()

// Read and query
val result = spark.sql("""
  SELECT event_id, event_time, event_data
  FROM clickhouse.default.events
  WHERE event_time >= '2024-01-01'
  ORDER BY event_time
""")

result.show(false)

Параметры конфигурации

Ниже перечислены настраиваемые параметры конфигурации, доступные в коннекторе.

Примечание

Использование параметров конфигурации: Это параметры конфигурации на уровне Spark, которые применяются как к Catalog API, так и к TableProvider API. Их можно задать двумя способами:

  1. Глобальная конфигурация Spark (применяется ко всем операциям):

    spark.conf.set("spark.clickhouse.write.batchSize", "20000")
    spark.conf.set("spark.clickhouse.write.compression.codec", "lz4")
    
  2. Переопределение для отдельной операции (только для TableProvider API — может переопределять глобальные настройки):

    df.write \
        .format("clickhouse") \
        .option("host", "your-host") \
        .option("database", "default") \
        .option("table", "my_table") \
        .option("spark.clickhouse.write.batchSize", "20000") \
        .option("spark.clickhouse.write.compression.codec", "lz4") \
        .mode("append") \
        .save()
    

Либо задайте их в spark-defaults.conf, либо при создании сеанса Spark.


ПараметрЗначение по умолчаниюОписаниеС версии
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse поддерживает использование сложных выражений в качестве ключей сегментирования или значений партиций, например cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если имеет значение true, неподдерживаемые выражения игнорируются, в противном случае выполнение немедленно завершается с исключением. Обратите внимание: когда включён параметр spark.clickhouse.write.distributed.convertLocal, игнорирование неподдерживаемых ключей сегментирования может привести к повреждению данных.0.4.0
spark.clickhouse.read.compression.codeclz4Кодек, используемый для распаковки данных при чтении. Поддерживаемые кодеки: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueПри чтении distributed таблицы использовать локальную таблицу вместо неё. Если имеет значение true, параметр spark.clickhouse.read.distributed.useClusterNodes игнорируется.0.1.0
spark.clickhouse.read.fixedStringAsbinaryЧитать тип ClickHouse FixedString как указанный тип данных Spark. Поддерживаемые типы: binary, string0.8.0
spark.clickhouse.read.formatjsonФормат сериализации данных при чтении. Поддерживаемые форматы: json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseВключить фильтр времени выполнения при чтении.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueЕсли true, конструировать входной фильтр по партициям по виртуальному столбцу _partition_id вместо значения партиции. Известны проблемы с построением SQL-предикатов по значению партиции. Эта возможность требует ClickHouse Server версии v21.6+.0.4.0
spark.clickhouse.useNullableQuerySchemafalseЕсли значение параметра — true, помечать все поля схемы запроса как Nullable при создании таблицы с помощью CREATE/REPLACE TABLE ... AS SELECT .... Обратите внимание, эта настройка требует SPARK-43390 (доступна в Spark 3.5); без этого патча параметр всегда работает так, как если бы был установлен в true.0.8.0
spark.clickhouse.write.batchSize10000Количество записей в одном пакете при записи в ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4Кодек сжатия, используемый при записи данных. Поддерживаемые кодеки: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseПри записи в distributed таблицу данные записываются в локальную таблицу вместо неё. Если true, игнорируется spark.clickhouse.write.distributed.useClusterNodes.0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueЗаписывать данные на все узлы кластера при записи в distributed таблицу.0.1.0
spark.clickhouse.write.formatarrowФормат сериализации при записи. Поддерживаемые форматы: json, arrow0.4.0
spark.clickhouse.write.localSortByKeytrueЕсли имеет значение true, перед записью выполнять локальную сортировку по ключам сортировки.0.3.0
spark.clickhouse.write.localSortByPartitionзначение параметра spark.clickhouse.write.repartitionByPartitionЕсли имеет значение true, выполняет локальную сортировку по партиции перед записью. Если не задано, приравнивается к spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3Максимальное число повторных попыток записи для одного пакетного задания, если оно завершилось ошибкой с кодами, допускающими повторную попытку.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueНужно ли переразбивать данные по ключам партиционирования ClickHouse, чтобы привести их к распределению данных в таблице ClickHouse перед записью.0.3.0
spark.clickhouse.write.repartitionNum0Если перед записью требуется перераспределить данные в соответствии с распределением таблицы ClickHouse, используйте этот параметр, чтобы задать число партиций при перераспределении; значение меньше 1 означает отсутствие такого требования.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseЕсли true, Spark будет строго распределять входящие записи по партициям, чтобы удовлетворить требуемое распределение перед передачей записей в целевую таблицу источника данных при записи. В противном случае Spark может применять некоторые оптимизации для ускорения запроса, но при этом нарушить требуемое распределение. Обратите внимание, что для этой конфигурации требуется SPARK-37523 (доступно в Spark 3.4); без этого патча она всегда работает как при значении true.0.3.0
spark.clickhouse.write.retryInterval10sИнтервал в секундах между повторными попытками записи.0.1.0
spark.clickhouse.write.retryableErrorCodes241Коды ошибок сервера ClickHouse, при которых выполняется повторная попытка записи.0.1.0

Поддерживаемые типы данных

В этом разделе описано соответствие типов данных между Spark и ClickHouse. Приведённые ниже таблицы служат кратким справочником по преобразованию типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.

Чтение данных из ClickHouse в Spark

Тип данных ClickHouseТип данных SparkПоддерживаетсяПримитивныйПримечания
NothingNullTypeДа
BoolBooleanTypeДа
UInt8, Int16ShortTypeДа
Int8ByteTypeДа
UInt16,Int32IntegerTypeДа
UInt32,Int64, UInt64LongTypeДа
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Да
Float32FloatTypeДа
Float64DoubleTypeДа
String, UUID, Enum8, Enum16, IPv4, IPv6StringTypeДа
FixedStringBinaryType, StringTypeДаОпределяется конфигурацией READ_FIXED_STRING_AS
DecimalDecimalTypeДаТочность и масштаб до Decimal128
Decimal32DecimalType(9, scale)Да
Decimal64DecimalType(18, scale)Да
Decimal128DecimalType(38, scale)Да
Date, Date32DateTypeДа
DateTime, DateTime32, DateTime64TimestampTypeДа
ArrayArrayTypeНетТип элементов массива также преобразуется
MapMapTypeНетКлючи ограничены типом StringType
IntervalYearYearMonthIntervalType(Year)Да
IntervalMonthYearMonthIntervalType(Month)Да
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeНетИспользуется соответствующий тип интервала
JSON, VariantVariantTypeНетТребуется Spark 4.0+ и ClickHouse 25.3+. Можно читать как StringType с spark.clickhouse.read.jsonAs=string
Object
Nested
TupleStructTypeНетПоддерживаются как именованные, так и неименованные кортежи. Именованные кортежи отображаются на поля структуры по имени, неименованные используют _1, _2 и т. д. Поддерживаются вложенные структуры и Nullable-поля
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Вставка данных из Spark в ClickHouse

Тип данных SparkТип данных ClickHouseПоддерживаетсяПримитивный типПримечания
BooleanTypeBoolДаОтображается в тип Bool (а не UInt8) начиная с версии 0.9.0
ByteTypeInt8Да
ShortTypeInt16Да
IntegerTypeInt32Да
LongTypeInt64Да
FloatTypeFloat32Да
DoubleTypeFloat64Да
StringTypeStringДа
VarcharTypeStringДа
CharTypeStringДа
DecimalTypeDecimal(p, s)ДаТочность и масштаб — до Decimal128
DateTypeDateДа
TimestampTypeDateTimeДа
ArrayType (list, tuple, or array)ArrayНетТип элементов массива также преобразуется
MapTypeMapНетКлючи ограничены типом StringType
StructTypeTupleНетПреобразуется в именованный Tuple с именами полей.
VariantTypeJSON или VariantНетТребуется Spark 4.0+ и ClickHouse 25.3+. По умолчанию используется тип JSON. Используйте свойство clickhouse.column.<name>.variant_types, чтобы указать Variant с несколькими типами.
Object
Nested

Участие и поддержка

Если вы хотите внести вклад в проект или сообщить о каких-либо проблемах, мы будем рады вашему участию! Посетите наш репозиторий на GitHub, чтобы создать issue, предложить улучшения или отправить pull request. Мы приветствуем любые вклады! Прежде чем начать, пожалуйста, ознакомьтесь с руководством по участию в репозитории. Спасибо, что помогаете улучшать наш коннектор ClickHouse Spark!