Skip to Content
You're looking at the old Cube documentation. Visit the new docs →

ksqlDB

ksqlDB  is a purpose-built database for stream processing applications, ingesting data from Apache Kafka .

Available on the Enterprise Premier plan . Contact us  for details.

See how you can use ksqlDB and Cube Cloud to power real-time analytics in Power BI:

In this video, the SQL API is used to connect to Power BI. Currently, it’s recommended to use the DAX API.

Prerequisites

  • Hostname for the ksqlDB server
  • Username and password (or an API key) to connect to ksqlDB server

Confluent Cloud

If you are using Confluent Cloud , you need to generate an API key and use the API key name as your username and the API key secret as your password.

You can generate an API key by installing confluent-cli and running the following commands in the command line:

brew install --cask confluent-cli confluent login confluent environment use <YOUR-ENVIRONMENT-ID> confluent ksql cluster list confluent api-key create --resource <YOUR-KSQL-CLUSTER-ID>

Setup

Manual

Add the following to a .env file in your Cube project:

CUBEJS_DB_TYPE=ksql CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443 CUBEJS_DB_USER=username CUBEJS_DB_PASS=password

Environment Variables

Environment VariableDescriptionPossible ValuesRequired
CUBEJS_DB_URLThe host URL for ksqlDB with portA valid database host URL
CUBEJS_DB_USERThe username used to connect to the ksqlDB. API key for Confluent Cloud.A valid database username
CUBEJS_DB_PASSThe password used to connect to the ksqlDB. API secret for Confluent Cloud.A valid database password
CUBEJS_DB_KAFKA_HOSTKafka broker host(s) for Kafka streams mode. Multiple brokers can be comma-separated.A valid Kafka broker URL
CUBEJS_DB_KAFKA_USERUsername for Kafka broker authentication (SASL PLAIN)A valid Kafka username
CUBEJS_DB_KAFKA_PASSPassword for Kafka broker authentication (SASL PLAIN)A valid Kafka password
CUBEJS_DB_KAFKA_USE_SSLIf true, enables SASL_SSL for the Kafka connectiontrue, false
CUBEJS_CONCURRENCYThe number of concurrent queries to the data sourceA valid number

Pre-Aggregations Support

ksqlDB supports only streaming pre-aggregations.

Kafka streams mode

By default, Cube connects to ksqlDB via its REST API. ksqlDB uses its REST API both for metadata (discovering tables and streams) and for streaming data into Cube Store during pre-aggregation builds.

In this default mode, Cube may create tables and streams in ksqlDB as part of the pre-aggregation build process (e.g., CREATE TABLE ... AS SELECT statements for non-read-only pre-aggregations).

When Kafka streams mode is enabled, Cube reads data directly from the underlying Kafka topics instead of going through the ksqlDB REST API for data streaming. ksqlDB is still used for metadata operations such as discovering tables, streams, and their schemas, but Cube Store subscribes to the backing Kafka topic directly.

In this mode, Cube does not create any tables or streams in ksqlDB. All pre-aggregations use the read-only refresh path: Cube discovers the existing ksqlDB objects and their backing Kafka topics, then streams data directly from Kafka into Cube Store.

When to use Kafka streams mode

Kafka streams mode is useful when:

  • You want to prevent Cube from creating any objects in ksqlDB
  • You need higher throughput for data ingestion by reading Kafka directly
  • Your ksqlDB environment has restricted permissions that don’t allow creating tables or streams
  • You prefer Cube Store to consume from Kafka topics without an intermediary

Enabling Kafka streams mode

Set the CUBEJS_DB_KAFKA_HOST environment variable to the address of your Kafka broker(s). This activates Kafka streams mode automatically:

CUBEJS_DB_TYPE=ksql CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443 CUBEJS_DB_USER=ksql_username CUBEJS_DB_PASS=ksql_password CUBEJS_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092 CUBEJS_DB_KAFKA_USER=kafka_api_key CUBEJS_DB_KAFKA_PASS=kafka_api_secret CUBEJS_DB_KAFKA_USE_SSL=true

Multiple Kafka brokers can be specified as a comma-separated list:

CUBEJS_DB_KAFKA_HOST=broker1:9092,broker2:9092,broker3:9092

When using Confluent Cloud , the Kafka credentials are separate from the ksqlDB credentials. Generate an API key for the Kafka cluster (not the ksqlDB cluster) and use it as CUBEJS_DB_KAFKA_USER and CUBEJS_DB_KAFKA_PASS.

How it works

With Kafka streams mode enabled:

  1. Cube uses the ksqlDB REST API to discover available tables and streams and to retrieve their schemas via DESCRIBE.
  2. For each table or stream, Cube resolves the backing Kafka topic name from the ksqlDB metadata.
  3. Instead of streaming data through ksqlDB, Cube Store connects directly to the Kafka broker(s) and consumes from the resolved topic.
  4. Pre-aggregation builds use the read-only refresh strategy. Cube does not issue any CREATE TABLE or CREATE STREAM statements to ksqlDB.

Data modeling

ksqlDB is typically used as an additional data source alongside a primary data warehouse. To use Kafka streams mode, configure ksqlDB as a named data source using decorated environment variables and point your cubes to it with the data_source property.

First, declare the data sources and configure the ksqlDB connection with Kafka credentials:

CUBEJS_DATASOURCES=default,ksql CUBEJS_DB_TYPE=postgres CUBEJS_DB_HOST=my.postgres.host CUBEJS_DB_NAME=my_database CUBEJS_DB_USER=postgres_user CUBEJS_DB_PASS=postgres_password CUBEJS_DS_KSQL_DB_TYPE=ksql CUBEJS_DS_KSQL_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443 CUBEJS_DS_KSQL_DB_USER=ksql_api_key CUBEJS_DS_KSQL_DB_PASS=ksql_api_secret CUBEJS_DS_KSQL_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092 CUBEJS_DS_KSQL_DB_KAFKA_USER=kafka_api_key CUBEJS_DS_KSQL_DB_KAFKA_PASS=kafka_api_secret CUBEJS_DS_KSQL_DB_KAFKA_USE_SSL=true

Then, create a cube that references an existing ksqlDB stream or table. Set data_source to the name you declared above so that Cube uses the ksqlDB connection for this cube:

cubes: - name: events data_source: ksql sql_table: EVENTS-STREAM measures: - name: count type: count dimensions: - name: id sql: ID type: number primary_key: true - name: name sql: NAME type: string - name: created_at sql: CREATED_AT type: time pre_aggregations: - name: main type: rollup measures: - CUBE.count dimensions: - CUBE.name time_dimension: CUBE.created_at granularity: day partition_granularity: day
cube("events", { data_source: "ksql", sql_table: `EVENTS-STREAM`, measures: { count: { type: `count`, }, }, dimensions: { id: { sql: `ID`, type: `number`, primary_key: true, }, name: { sql: `NAME`, type: `string`, }, created_at: { sql: `CREATED_AT`, type: `time`, }, }, pre_aggregations: { main: { type: `rollup`, measures: [CUBE.count], dimensions: [CUBE.name], time_dimension: CUBE.created_at, granularity: `day`, partition_granularity: `day`, }, }, });

The sql_table value should match the name of an existing ksqlDB stream or table. Cube will discover its schema automatically. With Kafka streams mode enabled, the pre-aggregation is built by reading the backing Kafka topic directly — no objects are created in ksqlDB.

Was this page useful?