# Logging pipeline

Дешёвая, дуракоустойчивая схема для аналитических логов: события из любого сервиса попадают в Redis, оттуда Vector батчит и шлёт в ClickHouse, поверх CH дашбордим в Metabase.

## Архитектура

```
producer → RPUSH "<domain>:events" → Redis LIST → Vector (BLPOP, batch) → ClickHouse → Metabase
```

* **Redis LIST** — буфер. Если CH/Vector упадут, события копятся в Redis (с `LTRIM` cap).
* **Vector** — батч-форвардер. Сам делает gzip, retry, healthcheck, batch.
* **ClickHouse** — column store. Партиционируем по месяцу, TTL по дате.
* **Metabase** — UI для графиков и ad-hoc SQL без кода.

Порты на хосте: ClickHouse `8123`, Vector API `8686`, Metabase `3030`.

## Что уже есть

| Сервис     | LIST в Redis      | CH таблица             | Категории в `category` |
| ---------- | ----------------- | ---------------------- | ---------------------- |
| `governor` | `governor:events` | `logs.governor_events` | `nsfw_classifier`      |

## Когда что добавлять

Перед тем как раскатать новый поток логов — реши, **похож ли он на существующие**:

* **Похож** (новый классификатор: toxicity, spam, policy violation, intent detection) → просто пушь в **тот же** `governor:events` с новым `category`. DDL не нужен — схема покрывает любой набор классов через `Map probs` и `Map extra`.
* **Не похож** (платежи, ошибки, действия пользователя, аудит) → **новый домен**: своя LIST, своя CH-таблица, свой Vector sink. Не запихивай разнородные события в одну таблицу — это убивает компрессию и читаемость дашбордов.

## Рецепт 1: новая категория в `governor_events`

1. В коде, где есть результат:

```python
from event_logger import EventLogger  # из governor/event_logger.py

logger.emit(EventLogger.build(
    category="toxicity",                     # новое значение
    model_id="org/toxicity-bert",
    text=raw_text,
    label="toxic",
    score=0.93,
    threshold=0.5,
    is_positive=True,
    probs={"toxic": 0.93, "neutral": 0.07},
    latency_ms=1.7,
    client_id="backend",                     # необязательно, для атрибуции
    request_id="abc",                        # необязательно, для трейсинга
    extra={"channel": "ws", "user_id": "42"},
))
```

2. Никаких миграций. В Metabase появится новый `category` в фильтрах автоматически.

## Рецепт 2: новый домен (своя таблица)

Допустим, хотим логи платежей. Шаги:

### 2.1. SQL для таблицы

`clickhouse/init/02_payment_events.sql`:

```sql
CREATE TABLE IF NOT EXISTS logs.payment_events (
    timestamp     DateTime64(3, 'UTC'),
    event_type    LowCardinality(String),    -- 'created' | 'paid' | 'refunded' | 'failed'
    payment_id    String,
    user_id       UInt64,
    amount_kop    Int64,                     -- в копейках, без Float
    currency      LowCardinality(String) DEFAULT 'RUB',
    provider      LowCardinality(String),    -- 'yookassa' | ...
    status        LowCardinality(String),
    error_code    LowCardinality(String) DEFAULT '',
    extra         Map(LowCardinality(String), String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (event_type, user_id, timestamp)
TTL toDateTime(timestamp) + INTERVAL 365 DAY;
```

> Файлы из `clickhouse/init/` выполняются **только при первом старте** (пустой volume). Для существующего CH применяй вручную: `curl -u governor:... -d "$(cat 02_*.sql)" http://localhost:8123/`.

### 2.2. Vector source + sink

В `vector/vector.toml` добавь:

```toml
[sources.payment_redis]
type = "redis"
url = "redis://redis:6379/0"
key = "payment:events"
data_type = "list"
decoding.codec = "json"

[sinks.clickhouse_payment]
type = "clickhouse"
inputs = ["payment_redis"]
endpoint = "http://clickhouse:8123"
database = "logs"
table = "payment_events"
compression = "gzip"
skip_unknown_fields = true
date_time_best_effort = true
batch.max_events = 1000
batch.timeout_secs = 5

[sinks.clickhouse_payment.auth]
strategy = "basic"
user = "governor"
password = "${CLICKHOUSE_PASSWORD}"
```

Перезапусти: `docker compose restart vector`.

### 2.3. Producer (на любом сервисе)

```python
# где-нибудь в backend/payments/services.py
import json, redis
from datetime import datetime, timezone

_r = redis.Redis.from_url(os.environ["REDIS_URL"])

def log_payment(event_type: str, payment_id: str, user_id: int, amount_kop: int,
                provider: str, status: str, error_code: str = "", extra: dict | None = None):
    payload = {
        "timestamp": datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z"),
        "event_type": event_type,
        "payment_id": payment_id,
        "user_id": user_id,
        "amount_kop": amount_kop,
        "currency": "RUB",
        "provider": provider,
        "status": status,
        "error_code": error_code,
        "extra": extra or {},
    }
    try:
        _r.rpush("payment:events", json.dumps(payload, ensure_ascii=False))
    except redis.RedisError:
        pass  # fire-and-forget; падающий Redis не должен ронять платёж
```

> ⚠ В критичных путях (платежи, аудит) лучше не блокировать на `RPUSH` синхронно. Заведи отдельный `EventLogger` с фоновым тредом по образцу `governor/event_logger.py` — он копит в `queue.Queue` и шлёт батчами, не влияя на основную транзакцию.

### 2.4. Подключи в Metabase

Browse data → Gamio Logs → найди новую таблицу. Через минуту она появится в Metabase автоматически (sync schema каждые 60 секунд по умолчанию).

## Шаблон producer'а для нового сервиса

Если новый микросервис на Python и хочешь нормально логировать без блокировок — скопируй [`governor/event_logger.py`](https://github.com/0x7o/gamio-backend/blob/master/governor/event_logger.py). Это самостоятельный модуль, зависимости только `redis-py`. Использование:

```python
from event_logger import EventLogger

logger = EventLogger(redis_url="redis://redis:6379/0", list_key="myservice:events")
logger.start()

# далее в любом обработчике
logger.emit({"timestamp": "...", "event_type": "...", ...})
```

При остановке сервиса — `logger.stop()` чтобы дренировать очередь.

## Что класть, чего не класть

**Класть можно** (текст и так в CH):

* сам пользовательский ввод (для governor — это смысл всей затеи: учить модель)
* идентификаторы (user\_id, request\_id, payment\_id)
* технические поля: latency, retry count, версия модели

**НЕ класть никогда:**

* пароли, токены, API-ключи, JWT, refresh tokens
* содержимое `Authorization`-заголовков
* e2e-зашифрованные данные (DM-сообщения и т.п.)
* payment card data, CVV
* персональные данные третьих лиц без основания (паспорта, адреса контактов)

Если случайно залили — `ALTER TABLE ... DELETE WHERE ...` (ленивая, но работает) + обновить producer чтобы не повторялось.

## Retention

TTL ставится на таблицу:

```sql
TTL toDateTime(timestamp) + INTERVAL 90 DAY  -- governor_events
TTL toDateTime(timestamp) + INTERVAL 365 DAY -- payment_events
```

Для аналитики обычно достаточно 30–90 дней. Аудит/compliance — год+. Помни: **диск дешёвый, но медленные дашборды дорогие** — лишние 100M строк в горячей партиции заметно тормозят queries без правильного `ORDER BY`.

## Куда смотреть данные

| Где                                                     | URL                                              | Кому                         |
| ------------------------------------------------------- | ------------------------------------------------ | ---------------------------- |
| **Metabase** — визуализации, dashboards                 | <http://localhost:3030>                          | команда, дизайнеры, продакты |
| **CH Play** — голый SQL                                 | <http://localhost:8123/play>                     | разработчик, отладка         |
| **Vector API** — внутренние метрики, dropped events     | <http://localhost:8686/playground>               | оператор/SRE                 |
| **Redis** — длина очереди (если 0 — Vector справляется) | `docker compose exec redis redis-cli LLEN <key>` | оператор                     |

Креды для CH (dev по умолчанию из docker-compose.yml: `CLICKHOUSE_PASSWORD`): пользователь `governor`. На проде пароль переопределить через env-переменную `CLICKHOUSE_PASSWORD` (читают и `clickhouse`, и `vector`); Metabase коннект-настройки держит в своём `metabase_data` volume и переживает смену env, поэтому пароль для Metabase'а меняй через UI (Admin → Databases) после ротации env.

## Известные ограничения текущей схемы

* **Vector читает Redis LIST, не Streams.** Один консьюмер. Если понадобится fan-out (например, второй consumer пишет в S3 для backup'а) — переезжаем на `XADD` + sidecar forwarder либо на Redpanda Connect.
* **Vector single-instance.** Если упадёт — события копятся в Redis (с `LTRIM` cap = 1M). При длительной недоступности > buffer → дроп старых.
* **CH без репликации.** Single-node. Снэпшот через `clickhouse-backup` или ручной `BACKUP TABLE ... TO Disk(...)`.
* **Schema-on-write.** При несовпадении полей JSON и колонок таблицы Vector пишет в CH с `skip_unknown_fields = true` — лишние поля **молча теряются**. Если producer пишет новое поле — **сначала ALTER TABLE**, потом релизи producer.

## Чек-лист перед добавлением нового потока

* [ ] Имя LIST в формате `<domain>:events` (не пересекается с существующими)
* [ ] CH-таблица в `logs.<domain>_events` с `PARTITION BY toYYYYMM(timestamp)`, `TTL`, осмысленным `ORDER BY`
* [ ] Vector source + sink в `vector/vector.toml`, `restart vector`
* [ ] Producer не блокируется на Redis (fire-and-forget или фоновый тред)
* [ ] Решено что класть, что не класть — никаких секретов
* [ ] Прогон smoke: 3 события → `LLEN <key>` упало до 0 → строки в CH
* [ ] Question в Metabase для базового среза, чтобы было откуда отталкиваться


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.gamio.ru/logging.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
