ksqlDB
ksqlDB is a purpose-built database for stream processing applications, ingesting data from Apache Kafka .
Available on the Enterprise Premier plan . Contact us for details.
See how you can use ksqlDB and Cube Cloud to power real-time analytics in Power BI:
In this video, the SQL API is used to connect to Power BI. Currently, it’s recommended to use the DAX API.
Prerequisites
- Hostname for the ksqlDB server
- Username and password (or an API key) to connect to ksqlDB server
Confluent Cloud
If you are using Confluent Cloud , you need to generate an API key and use the API key name as your username and the API key secret as your password.
You can generate an API key by installing confluent-cli and running the
following commands in the command line:
brew install --cask confluent-cli
confluent login
confluent environment use <YOUR-ENVIRONMENT-ID>
confluent ksql cluster list
confluent api-key create --resource <YOUR-KSQL-CLUSTER-ID>Setup
Manual
Add the following to a .env file in your Cube project:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=username
CUBEJS_DB_PASS=passwordEnvironment Variables
| Environment Variable | Description | Possible Values | Required |
|---|---|---|---|
CUBEJS_DB_URL | The host URL for ksqlDB with port | A valid database host URL | ✅ |
CUBEJS_DB_USER | The username used to connect to the ksqlDB. API key for Confluent Cloud. | A valid database username | ✅ |
CUBEJS_DB_PASS | The password used to connect to the ksqlDB. API secret for Confluent Cloud. | A valid database password | ✅ |
CUBEJS_DB_KAFKA_HOST | Kafka broker host(s) for Kafka streams mode. Multiple brokers can be comma-separated. | A valid Kafka broker URL | ❌ |
CUBEJS_DB_KAFKA_USER | Username for Kafka broker authentication (SASL PLAIN) | A valid Kafka username | ❌ |
CUBEJS_DB_KAFKA_PASS | Password for Kafka broker authentication (SASL PLAIN) | A valid Kafka password | ❌ |
CUBEJS_DB_KAFKA_USE_SSL | If true, enables SASL_SSL for the Kafka connection | true, false | ❌ |
CUBEJS_CONCURRENCY | The number of concurrent queries to the data source | A valid number | ❌ |
Pre-Aggregations Support
ksqlDB supports only streaming pre-aggregations.
Kafka streams mode
By default, Cube connects to ksqlDB via its REST API. ksqlDB uses its REST API both for metadata (discovering tables and streams) and for streaming data into Cube Store during pre-aggregation builds.
In this default mode, Cube may create tables and streams in ksqlDB as part
of the pre-aggregation build process (e.g., CREATE TABLE ... AS SELECT
statements for non-read-only pre-aggregations).
When Kafka streams mode is enabled, Cube reads data directly from the underlying Kafka topics instead of going through the ksqlDB REST API for data streaming. ksqlDB is still used for metadata operations such as discovering tables, streams, and their schemas, but Cube Store subscribes to the backing Kafka topic directly.
In this mode, Cube does not create any tables or streams in ksqlDB. All pre-aggregations use the read-only refresh path: Cube discovers the existing ksqlDB objects and their backing Kafka topics, then streams data directly from Kafka into Cube Store.
When to use Kafka streams mode
Kafka streams mode is useful when:
- You want to prevent Cube from creating any objects in ksqlDB
- You need higher throughput for data ingestion by reading Kafka directly
- Your ksqlDB environment has restricted permissions that don’t allow creating tables or streams
- You prefer Cube Store to consume from Kafka topics without an intermediary
Enabling Kafka streams mode
Set the CUBEJS_DB_KAFKA_HOST environment variable to the address of your
Kafka broker(s). This activates Kafka streams mode automatically:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=ksql_username
CUBEJS_DB_PASS=ksql_password
CUBEJS_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DB_KAFKA_USER=kafka_api_key
CUBEJS_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DB_KAFKA_USE_SSL=trueMultiple Kafka brokers can be specified as a comma-separated list:
CUBEJS_DB_KAFKA_HOST=broker1:9092,broker2:9092,broker3:9092When using Confluent Cloud ,
the Kafka credentials are separate from the ksqlDB credentials. Generate
an API key for the Kafka cluster (not the ksqlDB cluster) and use it as
CUBEJS_DB_KAFKA_USER and CUBEJS_DB_KAFKA_PASS.
How it works
With Kafka streams mode enabled:
- Cube uses the ksqlDB REST API to discover available tables and streams
and to retrieve their schemas via
DESCRIBE. - For each table or stream, Cube resolves the backing Kafka topic name from the ksqlDB metadata.
- Instead of streaming data through ksqlDB, Cube Store connects directly to the Kafka broker(s) and consumes from the resolved topic.
- Pre-aggregation builds use the read-only refresh strategy. Cube does
not issue any
CREATE TABLEorCREATE STREAMstatements to ksqlDB.
Data modeling
ksqlDB is typically used as an additional data source alongside a primary
data warehouse. To use Kafka streams mode, configure ksqlDB as a named
data source using decorated environment variables
and point your cubes to it with the
data_source property.
First, declare the data sources and configure the ksqlDB connection with Kafka credentials:
CUBEJS_DATASOURCES=default,ksql
CUBEJS_DB_TYPE=postgres
CUBEJS_DB_HOST=my.postgres.host
CUBEJS_DB_NAME=my_database
CUBEJS_DB_USER=postgres_user
CUBEJS_DB_PASS=postgres_password
CUBEJS_DS_KSQL_DB_TYPE=ksql
CUBEJS_DS_KSQL_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DS_KSQL_DB_USER=ksql_api_key
CUBEJS_DS_KSQL_DB_PASS=ksql_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DS_KSQL_DB_KAFKA_USER=kafka_api_key
CUBEJS_DS_KSQL_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_USE_SSL=trueThen, create a cube that references an existing ksqlDB stream or table.
Set data_source to the name you declared above so that Cube uses the
ksqlDB connection for this cube:
cubes:
- name: events
data_source: ksql
sql_table: EVENTS-STREAM
measures:
- name: count
type: count
dimensions:
- name: id
sql: ID
type: number
primary_key: true
- name: name
sql: NAME
type: string
- name: created_at
sql: CREATED_AT
type: time
pre_aggregations:
- name: main
type: rollup
measures:
- CUBE.count
dimensions:
- CUBE.name
time_dimension: CUBE.created_at
granularity: day
partition_granularity: daycube("events", {
data_source: "ksql",
sql_table: `EVENTS-STREAM`,
measures: {
count: {
type: `count`,
},
},
dimensions: {
id: {
sql: `ID`,
type: `number`,
primary_key: true,
},
name: {
sql: `NAME`,
type: `string`,
},
created_at: {
sql: `CREATED_AT`,
type: `time`,
},
},
pre_aggregations: {
main: {
type: `rollup`,
measures: [CUBE.count],
dimensions: [CUBE.name],
time_dimension: CUBE.created_at,
granularity: `day`,
partition_granularity: `day`,
},
},
});The sql_table value should match the name of an existing ksqlDB stream
or table. Cube will discover its schema automatically. With Kafka streams
mode enabled, the pre-aggregation is built by reading the backing Kafka
topic directly — no objects are created in ksqlDB.
Was this page useful?