""" SQLite veritabanı yönetimi modülü """ import sqlite3 import os import re from datetime import datetime, timezone from typing import Optional, List, Dict class Database: """SQLite veritabanı yönetim sınıfı""" def __init__(self, db_path: str = "data/videos.db"): self.db_path = db_path self.conn = None def connect(self): """Veritabanı bağlantısı oluştur""" # Dizin yoksa oluştur os.makedirs(os.path.dirname(self.db_path), exist_ok=True) self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row return self.conn def init_database(self): """Veritabanı şemasını oluştur""" conn = self.connect() cursor = conn.cursor() # Channels tablosu cursor.execute(""" CREATE TABLE IF NOT EXISTS channels ( channel_id TEXT PRIMARY KEY, channel_name TEXT, channel_url TEXT, last_checked_utc TEXT, created_at_utc TEXT DEFAULT (datetime('now')) ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_channels_last_checked ON channels(last_checked_utc) """) # Videos tablosu cursor.execute(""" CREATE TABLE IF NOT EXISTS videos ( video_id TEXT PRIMARY KEY, channel_id TEXT, video_title TEXT, video_url TEXT, published_at_utc TEXT, processed_at_utc TEXT, transcript_status INTEGER DEFAULT 0, transcript_language TEXT, transcript_raw TEXT, transcript_clean TEXT, last_updated_utc TEXT DEFAULT (datetime('now')) ) """) # Index'ler cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_videos_channel_id ON videos(channel_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_videos_published_at_utc ON videos(published_at_utc) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_videos_transcript_status ON videos(transcript_status) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_videos_processed_at_utc ON videos(processed_at_utc) """) conn.commit() print("Database initialized successfully") def close(self): """Veritabanı bağlantısını kapat""" if self.conn: self.conn.close() def _validate_video_id(self, video_id: str) -> bool: """Video ID formatını doğrula (SQL injection koruması)""" if not video_id or len(video_id) > 20: return False # YouTube video ID: 11 karakter, alfanumerik + _ - return bool(re.match(r'^[a-zA-Z0-9_-]{11}$', video_id)) def _validate_channel_id(self, channel_id: str) -> bool: """Channel ID formatını doğrula (SQL injection koruması)""" if not channel_id or len(channel_id) > 50: return False # YouTube channel ID: UC ile başlayan 24 karakter return bool(re.match(r'^UC[a-zA-Z0-9_-]{22}$', channel_id)) def is_video_processed(self, video_id: str) -> bool: """Video işlenmiş mi kontrol et""" if not self._validate_video_id(video_id): raise ValueError(f"Geçersiz video_id formatı: {video_id}") cursor = self.conn.cursor() cursor.execute("SELECT video_id FROM videos WHERE video_id = ?", (video_id,)) return cursor.fetchone() is not None def get_pending_videos(self) -> List[Dict]: """İşlenmeyi bekleyen videoları getir (status=0)""" cursor = self.conn.cursor() cursor.execute(""" SELECT * FROM videos WHERE transcript_status = 0 ORDER BY published_at_utc DESC """) return [dict(row) for row in cursor.fetchall()] def add_video(self, video_data: Dict): """Yeni video ekle (status=0 olarak)""" # Input validation video_id = video_data.get('video_id') channel_id = video_data.get('channel_id') if not self._validate_video_id(video_id): raise ValueError(f"Geçersiz video_id formatı: {video_id}") if channel_id and not self._validate_channel_id(channel_id): raise ValueError(f"Geçersiz channel_id formatı: {channel_id}") cursor = self.conn.cursor() cursor.execute(""" INSERT OR IGNORE INTO videos (video_id, channel_id, video_title, video_url, published_at_utc, transcript_status) VALUES (?, ?, ?, ?, ?, 0) """, ( video_id, channel_id, video_data.get('video_title', '')[:500], # Max length video_data.get('video_url', '')[:500], # Max length video_data.get('published_at_utc') )) self.conn.commit() def update_video_transcript(self, video_id: str, raw: str, clean: str, status: int, language: Optional[str] = None): """Video transcript'ini güncelle""" # Input validation if not self._validate_video_id(video_id): raise ValueError(f"Geçersiz video_id formatı: {video_id}") if status not in [0, 1, 2]: raise ValueError(f"Geçersiz status değeri: {status}") cursor = self.conn.cursor() now_utc = datetime.now(timezone.utc).isoformat() cursor.execute(""" UPDATE videos SET transcript_raw = ?, transcript_clean = ?, transcript_status = ?, transcript_language = ?, processed_at_utc = ?, last_updated_utc = ? WHERE video_id = ? """, (raw, clean, status, language, now_utc, now_utc, video_id)) self.conn.commit() def get_processed_videos(self, limit: Optional[int] = None, channel_id: Optional[str] = None) -> List[Dict]: """İşlenmiş videoları getir (status=1)""" # Input validation if channel_id and not self._validate_channel_id(channel_id): raise ValueError(f"Geçersiz channel_id formatı: {channel_id}") if limit is not None and (not isinstance(limit, int) or limit < 1 or limit > 1000): raise ValueError(f"Geçersiz limit değeri: {limit} (1-1000 arası olmalı)") cursor = self.conn.cursor() query = """ SELECT * FROM videos WHERE transcript_status = 1 """ params = [] if channel_id: query += " AND channel_id = ?" params.append(channel_id) query += " ORDER BY published_at_utc DESC" if limit: query += " LIMIT ?" params.append(limit) cursor.execute(query, params) return [dict(row) for row in cursor.fetchall()] def mark_video_failed(self, video_id: str, reason: Optional[str] = None): """Video'yu başarısız olarak işaretle (status=2)""" # Input validation if not self._validate_video_id(video_id): raise ValueError(f"Geçersiz video_id formatı: {video_id}") cursor = self.conn.cursor() cursor.execute(""" UPDATE videos SET transcript_status = 2, last_updated_utc = ? WHERE video_id = ? """, (datetime.now(timezone.utc).isoformat(), video_id)) self.conn.commit()