Skip to content
This repository has been archived by the owner on Mar 13, 2024. It is now read-only.

Commit

Permalink
Merge pull request #25 from openmeterio/database-example
Browse files Browse the repository at this point in the history
refactor: generic database example instead of clickhouse
  • Loading branch information
sagikazarmark committed Jan 15, 2024
2 parents 0e97e6e + 67d9d22 commit c7da2e2
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 68 deletions.
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ The following examples demonstrate how to ingest events from various sources int

The examples use the custom Benthos distribution in this repository.

- [Clickhouse](clickhouse/)
- [Database](database/)
- [HTTP server](http-server/) (forwarding events to OpenMeter)
- [Kubernetes Pod execution time](kubernetes-pod-exec-time/)

Expand Down
47 changes: 0 additions & 47 deletions examples/clickhouse/docker-compose.yaml

This file was deleted.

File renamed without changes.
27 changes: 23 additions & 4 deletions examples/clickhouse/README.md → examples/database/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Clickhouse
# Database

This example demonstrates reading data from [Clickhouse](https://clickhouse.com/) database, transforming it to [CloudEvents](https://cloudevents.io/) and sending to [OpenMeter](https://openmeter.io/).
This example demonstrates reading data from a database, transforming it to [CloudEvents](https://cloudevents.io/) and sending to [OpenMeter](https://openmeter.io/).

This is a rather common use case when a system already collects some sort of data or log and you want to send that as usage data to OpenMeter for further processing.

Expand All @@ -9,6 +9,14 @@ Benthos will read messages from a message log table and send the calculated usag

The example also demonstrates that certain business logic can also be implemented during the transformation (for example: users on the enterprise plan do not get charged for message length).

Databases featured in this example:

- Postgres
- [Clickhouse](https://clickhouse.com/)

> [!TIP]
> Check out the supported database drivers in the [Benthos documentation](https://www.benthos.dev/docs/components/inputs/sql_select#drivers).
## Table of Contents <!-- omit from toc -->

- [Prerequisites](#prerequisites)
Expand All @@ -26,7 +34,7 @@ Check out this repository if you want to run the example locally:

```shell
git clone https://github.com/openmeterio/benthos-openmeter.git
cd benthos-openmeter/examples/clickhouse
cd benthos-openmeter/examples/database
```

Create a new `.env` file and add the details of your OpenMeter instance:
Expand All @@ -52,7 +60,18 @@ Create the following meters in OpenMeter with the following details:
## Launch the example

Launch the example (Clickhouse DB, event collector and seeder):
Decide which database you want to use:

```shell
export COMPOSE_PROFILES=SELECTED_DATABASE
```

Available profiles:

- `postgres`
- `clickhouse`

Launch the example (database, event collector and seeder):

```shell
docker compose up -d
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
input:
sql_select:
driver: clickhouse
dsn: "${CLICKHOUSE_DSN}"
driver: "${DATABASE_DRIVER}"
dsn: "${DATABASE_DSN}"
table: messages
columns:
- message_id
- account_id
- message
- time
where: time >= ?
args_mapping: root = [ now().ts_unix() - 30 ]
args_mapping: 'root = [ (now().ts_unix() - 30).ts_format(format: "2006-01-02 15:04:05", tz: "UTC") ]'

pipeline:
processors:
Expand Down
20 changes: 20 additions & 0 deletions examples/database/docker-compose.common.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: "3.9"

services:
collector:
image: ghcr.io/openmeterio/benthos-openmeter
pull_policy: always
command: benthos -c /etc/benthos/config.yaml
restart: always
env_file:
- .env
volumes:
- ./config.yaml:/etc/benthos/config.yaml:ro

seeder:
image: ghcr.io/openmeterio/benthos-openmeter
pull_policy: always
command: benthos -c /etc/benthos/config.yaml
restart: always
volumes:
- ./seed/config.yaml:/etc/benthos/config.yaml:ro
94 changes: 94 additions & 0 deletions examples/database/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
version: "3.9"

services:
postgres:
profiles:
- postgres
image: postgres:15.3
ports:
- 127.0.0.1:5432:5432
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: chat
healthcheck:
test: pg_isready -U postgres -d chat
interval: 5s
timeout: 3s
retries: 100

postgres-collector:
profiles:
- postgres
extends:
file: docker-compose.common.yaml
service: collector
depends_on:
postgres:
condition: service_healthy
environment:
DATABASE_DRIVER: postgres
DATABASE_DSN: postgres://postgres:postgres@postgres:5432/chat?sslmode=disable

postgres-seeder:
profiles:
- postgres
extends:
file: docker-compose.common.yaml
service: seeder
depends_on:
postgres:
condition: service_healthy
environment:
DATABASE_DRIVER: postgres
DATABASE_DSN: postgres://postgres:postgres@postgres:5432/chat?sslmode=disable
volumes:
- ./seed/init.postgres.sql:/etc/benthos/init.sql:ro

clickhouse:
profiles:
- clickhouse
image: clickhouse/clickhouse-server:23.8.9.54-alpine
ports:
- 127.0.0.1:8123:8123
- 127.0.0.1:9000:9000
- 127.0.0.1:9009:9009
environment:
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD: default
CLICKHOUSE_DB: chat
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1

healthcheck:
test: wget --no-verbose --tries=1 --spider http://clickhouse:8123/ping || exit 1
interval: 5s
timeout: 3s
retries: 100

clickhouse-collector:
profiles:
- clickhouse
extends:
file: docker-compose.common.yaml
service: collector
depends_on:
clickhouse:
condition: service_healthy
environment:
DATABASE_DRIVER: clickhouse
DATABASE_DSN: clickhouse://default:default@clickhouse:9000/chat?dial_timeout=200ms&max_execution_time=60

clickhouse-seeder:
profiles:
- clickhouse
extends:
file: docker-compose.common.yaml
service: seeder
depends_on:
clickhouse:
condition: service_healthy
environment:
DATABASE_DRIVER: clickhouse
DATABASE_DSN: clickhouse://default:default@clickhouse:9000/chat?dial_timeout=200ms&max_execution_time=60
volumes:
- ./seed/init.clickhouse.sql:/etc/benthos/init.sql:ro
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ output:
continue: true
output:
sql_insert:
driver: clickhouse
dsn: "${CLICKHOUSE_DSN}"
driver: "${DATABASE_DRIVER}"
dsn: "${DATABASE_DSN}"
table: messages
columns:
- message_id
Expand All @@ -39,18 +39,10 @@ output:
this.sender,
this.recipient,
this.message,
this.time.ts_format("2006-01-02 15:04:05"),
this.time.ts_format(format: "2006-01-02 15:04:05", tz: "UTC"),
]
init_statement: |
CREATE TABLE messages (
message_id UUID,
account_id String,
sender String,
recipient String,
message String,
time DateTime
) ENGINE = MergeTree()
PRIMARY KEY (message_id);
init_files:
- init.sql

- check: '"${SEEDER_LOG:false}" == "true"'
output:
Expand Down
9 changes: 9 additions & 0 deletions examples/database/seed/init.clickhouse.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE
IF NOT EXISTS messages (
message_id UUID,
account_id String,
sender String,
recipient String,
message String,
time DateTime('UTC')
) ENGINE = MergeTree() PRIMARY KEY (message_id);
9 changes: 9 additions & 0 deletions examples/database/seed/init.postgres.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE
IF NOT EXISTS messages (
message_id UUID PRIMARY KEY,
account_id TEXT,
sender TEXT,
recipient TEXT,
message TEXT,
time TIMESTAMPTZ
);

0 comments on commit c7da2e2

Please sign in to comment.