Generische Hilfsmethoden fuer Schema-Operationen (exec), parametrisierte Writes (run) und Transaktionen (transaction). Co-Authored-By: tolvitty <noreply@anthropic.com>
402 lines
13 KiB
TypeScript
402 lines
13 KiB
TypeScript
import Database from 'better-sqlite3';
|
|
import type BetterSqlite3 from 'better-sqlite3';
|
|
import { LogfireEvent, LogfireSettings, ContentSnapshot, EventType, RetentionSettings } from '../types';
|
|
|
|
const SCHEMA_VERSION = 1;
|
|
|
|
export class DatabaseManager {
|
|
private db: BetterSqlite3.Database;
|
|
|
|
constructor(dbPath: string, settings: LogfireSettings) {
|
|
this.db = new Database(dbPath);
|
|
this.configurePragmas(settings);
|
|
this.ensureSchema();
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Schema
|
|
// ---------------------------------------------------------------------------
|
|
|
|
private configurePragmas(settings: LogfireSettings): void {
|
|
const { database } = settings.advanced;
|
|
if (database.walMode) {
|
|
this.db.pragma('journal_mode = WAL');
|
|
}
|
|
this.db.pragma('synchronous = NORMAL');
|
|
this.db.pragma(`cache_size = -${database.cacheSizeMb * 1000}`);
|
|
this.db.pragma(`mmap_size = ${database.mmapSizeMb * 1024 * 1024}`);
|
|
this.db.pragma('foreign_keys = ON');
|
|
}
|
|
|
|
private ensureSchema(): void {
|
|
const current = this.getSchemaVersion();
|
|
if (current >= SCHEMA_VERSION) return;
|
|
|
|
this.db.exec(`
|
|
CREATE TABLE IF NOT EXISTS schema_version (
|
|
version INTEGER PRIMARY KEY
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id TEXT PRIMARY KEY,
|
|
timestamp INTEGER NOT NULL,
|
|
type TEXT NOT NULL,
|
|
category TEXT NOT NULL,
|
|
source TEXT,
|
|
target TEXT,
|
|
payload TEXT,
|
|
session TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_events_type ON events(type);
|
|
CREATE INDEX IF NOT EXISTS idx_events_source ON events(source);
|
|
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session);
|
|
CREATE INDEX IF NOT EXISTS idx_events_category ON events(category);
|
|
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
id TEXT PRIMARY KEY,
|
|
start_time INTEGER NOT NULL,
|
|
end_time INTEGER,
|
|
vault_name TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS baseline (
|
|
file_path TEXT PRIMARY KEY,
|
|
scanned_at INTEGER NOT NULL,
|
|
word_count INTEGER,
|
|
char_count INTEGER,
|
|
links TEXT,
|
|
tags TEXT,
|
|
headings TEXT,
|
|
frontmatter TEXT,
|
|
embeds TEXT,
|
|
file_created INTEGER,
|
|
file_modified INTEGER,
|
|
file_size INTEGER
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS daily_stats (
|
|
date TEXT NOT NULL,
|
|
file_path TEXT,
|
|
events_count INTEGER DEFAULT 0,
|
|
words_added INTEGER DEFAULT 0,
|
|
words_removed INTEGER DEFAULT 0,
|
|
time_active_ms INTEGER DEFAULT 0,
|
|
PRIMARY KEY (date, file_path)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS monthly_stats (
|
|
month TEXT NOT NULL,
|
|
file_path TEXT,
|
|
events_count INTEGER DEFAULT 0,
|
|
words_added INTEGER DEFAULT 0,
|
|
words_removed INTEGER DEFAULT 0,
|
|
time_active_ms INTEGER DEFAULT 0,
|
|
sessions_count INTEGER DEFAULT 0,
|
|
PRIMARY KEY (month, file_path)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS retention_log (
|
|
run_date TEXT PRIMARY KEY,
|
|
events_deleted INTEGER,
|
|
events_aggregated INTEGER,
|
|
db_size_before INTEGER,
|
|
db_size_after INTEGER,
|
|
duration_ms INTEGER
|
|
);
|
|
`);
|
|
|
|
this.setSchemaVersion(SCHEMA_VERSION);
|
|
}
|
|
|
|
private getSchemaVersion(): number {
|
|
try {
|
|
const row = this.db
|
|
.prepare('SELECT MAX(version) as v FROM schema_version')
|
|
.get() as { v: number } | undefined;
|
|
return row?.v ?? 0;
|
|
} catch {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
private setSchemaVersion(version: number): void {
|
|
this.db
|
|
.prepare('INSERT OR REPLACE INTO schema_version (version) VALUES (?)')
|
|
.run(version);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Event operations
|
|
// ---------------------------------------------------------------------------
|
|
|
|
private insertStmt: BetterSqlite3.Statement | null = null;
|
|
|
|
insertEvents(events: LogfireEvent[]): void {
|
|
if (events.length === 0) return;
|
|
|
|
if (!this.insertStmt) {
|
|
this.insertStmt = this.db.prepare(`
|
|
INSERT OR IGNORE INTO events (id, timestamp, type, category, source, target, payload, session)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
`);
|
|
}
|
|
|
|
const stmt = this.insertStmt;
|
|
const tx = this.db.transaction((batch: LogfireEvent[]) => {
|
|
for (const e of batch) {
|
|
stmt.run(
|
|
e.id,
|
|
e.timestamp,
|
|
e.type,
|
|
e.category,
|
|
e.source,
|
|
e.target ?? null,
|
|
JSON.stringify(e.payload),
|
|
e.session,
|
|
);
|
|
}
|
|
});
|
|
|
|
tx(events);
|
|
}
|
|
|
|
getEventCount(): number {
|
|
const row = this.db.prepare('SELECT COUNT(*) as c FROM events').get() as { c: number };
|
|
return row.c;
|
|
}
|
|
|
|
getOldestEventTimestamp(): number | null {
|
|
const row = this.db
|
|
.prepare('SELECT MIN(timestamp) as ts FROM events')
|
|
.get() as { ts: number | null };
|
|
return row.ts;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Session operations
|
|
// ---------------------------------------------------------------------------
|
|
|
|
startSession(id: string, vaultName: string): void {
|
|
this.db
|
|
.prepare('INSERT INTO sessions (id, start_time, vault_name) VALUES (?, ?, ?)')
|
|
.run(id, Date.now(), vaultName);
|
|
}
|
|
|
|
endSession(id: string): void {
|
|
this.db
|
|
.prepare('UPDATE sessions SET end_time = ? WHERE id = ?')
|
|
.run(Date.now(), id);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Baseline operations
|
|
// ---------------------------------------------------------------------------
|
|
|
|
hasBaseline(): boolean {
|
|
const row = this.db.prepare('SELECT COUNT(*) as c FROM baseline').get() as { c: number };
|
|
return row.c > 0;
|
|
}
|
|
|
|
upsertBaseline(snapshot: ContentSnapshot, fileCreated: number, fileModified: number, fileSize: number): void {
|
|
this.db.prepare(`
|
|
INSERT OR REPLACE INTO baseline
|
|
(file_path, scanned_at, word_count, char_count, links, tags, headings, frontmatter, embeds, file_created, file_modified, file_size)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`).run(
|
|
snapshot.path,
|
|
Date.now(),
|
|
snapshot.wordCount,
|
|
snapshot.charCount,
|
|
JSON.stringify(snapshot.links),
|
|
JSON.stringify(snapshot.tags),
|
|
JSON.stringify(snapshot.headings),
|
|
JSON.stringify(snapshot.frontmatter),
|
|
JSON.stringify(snapshot.embeds),
|
|
fileCreated,
|
|
fileModified,
|
|
fileSize,
|
|
);
|
|
}
|
|
|
|
loadBaseline(): Map<string, ContentSnapshot> {
|
|
const rows = this.db.prepare('SELECT * FROM baseline').all() as Array<{
|
|
file_path: string;
|
|
word_count: number;
|
|
char_count: number;
|
|
links: string;
|
|
tags: string;
|
|
headings: string;
|
|
frontmatter: string;
|
|
embeds: string;
|
|
}>;
|
|
|
|
const map = new Map<string, ContentSnapshot>();
|
|
for (const row of rows) {
|
|
map.set(row.file_path, {
|
|
path: row.file_path,
|
|
wordCount: row.word_count,
|
|
charCount: row.char_count,
|
|
links: JSON.parse(row.links),
|
|
tags: JSON.parse(row.tags),
|
|
headings: JSON.parse(row.headings),
|
|
frontmatter: JSON.parse(row.frontmatter),
|
|
embeds: JSON.parse(row.embeds),
|
|
});
|
|
}
|
|
return map;
|
|
}
|
|
|
|
removeBaselineEntry(filePath: string): void {
|
|
this.db.prepare('DELETE FROM baseline WHERE file_path = ?').run(filePath);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Generic read-only query (for codeblocks / API)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
queryReadOnly(sql: string, params: unknown[] = []): unknown[] {
|
|
const stmt = this.db.prepare(sql);
|
|
stmt.safeIntegers(false);
|
|
return stmt.all(...params);
|
|
}
|
|
|
|
exec(sql: string): void {
|
|
this.db.exec(sql);
|
|
}
|
|
|
|
run(sql: string, ...params: unknown[]): void {
|
|
this.db.prepare(sql).run(...params);
|
|
}
|
|
|
|
transaction<T>(fn: () => T): T {
|
|
return this.db.transaction(fn)();
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Retention & Maintenance
|
|
// ---------------------------------------------------------------------------
|
|
|
|
applyRetention(retention: RetentionSettings): { eventsDeleted: number; eventsAggregated: number } {
|
|
const now = Date.now();
|
|
const rawCutoff = now - retention.rawEventsDays * 86400000;
|
|
const dailyCutoff = now - retention.dailyStatsDays * 86400000;
|
|
|
|
const eventsAggregated = this.aggregateToDailyStats(rawCutoff);
|
|
const eventsDeleted = this.deleteOldEvents(rawCutoff, retention.neverDeleteTypes);
|
|
this.aggregateToMonthlyStats(dailyCutoff);
|
|
|
|
this.db.prepare('DELETE FROM daily_stats WHERE date < ?').run(
|
|
new Date(dailyCutoff).toISOString().substring(0, 10),
|
|
);
|
|
|
|
return { eventsDeleted, eventsAggregated };
|
|
}
|
|
|
|
runMaintenance(retention: RetentionSettings): void {
|
|
const startTime = Date.now();
|
|
const dbSizeBefore = this.getDatabaseSizeBytes();
|
|
|
|
const { eventsDeleted, eventsAggregated } = this.applyRetention(retention);
|
|
|
|
this.db.pragma('optimize');
|
|
this.db.exec('VACUUM');
|
|
|
|
const dbSizeAfter = this.getDatabaseSizeBytes();
|
|
const duration = Date.now() - startTime;
|
|
|
|
this.db.prepare(`
|
|
INSERT OR REPLACE INTO retention_log (run_date, events_deleted, events_aggregated, db_size_before, db_size_after, duration_ms)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
`).run(
|
|
new Date().toISOString().substring(0, 10),
|
|
eventsDeleted,
|
|
eventsAggregated,
|
|
dbSizeBefore,
|
|
dbSizeAfter,
|
|
duration,
|
|
);
|
|
|
|
console.log(`[Logfire] Maintenance: ${eventsDeleted} gelöscht, ${eventsAggregated} aggregiert, ${formatBytes(dbSizeBefore)} → ${formatBytes(dbSizeAfter)} in ${duration}ms`);
|
|
}
|
|
|
|
private aggregateToDailyStats(cutoff: number): number {
|
|
const dateStr = "strftime('%Y-%m-%d', timestamp / 1000, 'unixepoch', 'localtime')";
|
|
|
|
const result = this.db.prepare(`
|
|
INSERT OR REPLACE INTO daily_stats (date, file_path, events_count, words_added, words_removed, time_active_ms)
|
|
SELECT
|
|
${dateStr} as date,
|
|
source as file_path,
|
|
COUNT(*) as events_count,
|
|
COALESCE(SUM(CASE WHEN json_extract(payload, '$.wordsAdded') IS NOT NULL THEN json_extract(payload, '$.wordsAdded') ELSE 0 END), 0) as words_added,
|
|
COALESCE(SUM(CASE WHEN json_extract(payload, '$.wordsRemoved') IS NOT NULL THEN json_extract(payload, '$.wordsRemoved') ELSE 0 END), 0) as words_removed,
|
|
COALESCE(SUM(CASE WHEN json_extract(payload, '$.duration') IS NOT NULL THEN json_extract(payload, '$.duration') ELSE 0 END), 0) as time_active_ms
|
|
FROM events
|
|
WHERE timestamp < ?
|
|
GROUP BY ${dateStr}, source
|
|
`).run(cutoff);
|
|
|
|
return result.changes;
|
|
}
|
|
|
|
private deleteOldEvents(cutoff: number, protectedTypes: EventType[]): number {
|
|
if (protectedTypes.length === 0) {
|
|
const result = this.db.prepare('DELETE FROM events WHERE timestamp < ?').run(cutoff);
|
|
return result.changes;
|
|
}
|
|
|
|
const placeholders = protectedTypes.map(() => '?').join(', ');
|
|
const result = this.db.prepare(
|
|
`DELETE FROM events WHERE timestamp < ? AND type NOT IN (${placeholders})`
|
|
).run(cutoff, ...protectedTypes);
|
|
return result.changes;
|
|
}
|
|
|
|
private aggregateToMonthlyStats(cutoff: number): void {
|
|
const cutoffDate = new Date(cutoff).toISOString().substring(0, 10);
|
|
|
|
this.db.prepare(`
|
|
INSERT OR REPLACE INTO monthly_stats (month, file_path, events_count, words_added, words_removed, time_active_ms, sessions_count)
|
|
SELECT
|
|
substr(date, 1, 7) as month,
|
|
file_path,
|
|
SUM(events_count) as events_count,
|
|
SUM(words_added) as words_added,
|
|
SUM(words_removed) as words_removed,
|
|
SUM(time_active_ms) as time_active_ms,
|
|
0 as sessions_count
|
|
FROM daily_stats
|
|
WHERE date < ?
|
|
GROUP BY substr(date, 1, 7), file_path
|
|
`).run(cutoffDate);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Database info
|
|
// ---------------------------------------------------------------------------
|
|
|
|
getDatabaseSizeBytes(): number {
|
|
const pageCount = (this.db.pragma('page_count') as Array<{ page_count: number }>)[0].page_count;
|
|
const pageSize = (this.db.pragma('page_size') as Array<{ page_size: number }>)[0].page_size;
|
|
return pageCount * pageSize;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Lifecycle
|
|
// ---------------------------------------------------------------------------
|
|
|
|
close(): void {
|
|
if (this.insertStmt) {
|
|
this.insertStmt = null;
|
|
}
|
|
this.db.close();
|
|
}
|
|
}
|
|
|
|
function formatBytes(bytes: number): string {
|
|
if (bytes < 1024) return `${bytes} B`;
|
|
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`;
|
|
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
|
|
}
|