db thread

This commit is contained in:
salvacybersec
2025-11-13 04:53:33 +03:00
parent c677983aa7
commit 1210d1bff1

View File

@@ -4,24 +4,32 @@ SQLite veritabanı yönetimi modülü
import sqlite3 import sqlite3
import os import os
import re import re
import threading
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional, List, Dict from typing import Optional, List, Dict
class Database: class Database:
"""SQLite veritabanı yönetim sınıfı""" """SQLite veritabanı yönetim sınıfı (thread-safe)"""
def __init__(self, db_path: str = "data/videos.db"): def __init__(self, db_path: str = "data/videos.db"):
self.db_path = db_path self.db_path = db_path
self.conn = None # Thread-local storage: Her thread'in kendi connection'ı olacak
self._local = threading.local()
def connect(self): def connect(self):
"""Veritabanı bağlantısı oluştur""" """Veritabanı bağlantısı oluştur (thread-safe)"""
# Her thread'in kendi connection'ı var mı kontrol et
if not hasattr(self._local, 'conn') or self._local.conn is None:
# Dizin yoksa oluştur # Dizin yoksa oluştur
os.makedirs(os.path.dirname(self.db_path), exist_ok=True) os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self.conn = sqlite3.connect(self.db_path) # check_same_thread=False: Thread-safe çalışma için
self.conn.row_factory = sqlite3.Row self._local.conn = sqlite3.connect(
return self.conn self.db_path,
check_same_thread=False
)
self._local.conn.row_factory = sqlite3.Row
return self._local.conn
def init_database(self): def init_database(self):
"""Veritabanı şemasını oluştur""" """Veritabanı şemasını oluştur"""
@@ -86,9 +94,10 @@ class Database:
print("Database initialized successfully") print("Database initialized successfully")
def close(self): def close(self):
"""Veritabanı bağlantısını kapat""" """Veritabanı bağlantısını kapat (thread-safe)"""
if self.conn: if hasattr(self._local, 'conn') and self._local.conn:
self.conn.close() self._local.conn.close()
self._local.conn = None
def _validate_video_id(self, video_id: str) -> bool: def _validate_video_id(self, video_id: str) -> bool:
"""Video ID formatını doğrula (SQL injection koruması)""" """Video ID formatını doğrula (SQL injection koruması)"""
@@ -108,13 +117,15 @@ class Database:
"""Video işlenmiş mi kontrol et""" """Video işlenmiş mi kontrol et"""
if not self._validate_video_id(video_id): if not self._validate_video_id(video_id):
raise ValueError(f"Geçersiz video_id formatı: {video_id}") raise ValueError(f"Geçersiz video_id formatı: {video_id}")
cursor = self.conn.cursor() conn = self.connect()
cursor = conn.cursor()
cursor.execute("SELECT video_id FROM videos WHERE video_id = ?", (video_id,)) cursor.execute("SELECT video_id FROM videos WHERE video_id = ?", (video_id,))
return cursor.fetchone() is not None return cursor.fetchone() is not None
def get_pending_videos(self) -> List[Dict]: def get_pending_videos(self) -> List[Dict]:
"""İşlenmeyi bekleyen videoları getir (status=0)""" """İşlenmeyi bekleyen videoları getir (status=0)"""
cursor = self.conn.cursor() conn = self.connect()
cursor = conn.cursor()
cursor.execute(""" cursor.execute("""
SELECT * FROM videos SELECT * FROM videos
WHERE transcript_status = 0 WHERE transcript_status = 0
@@ -134,7 +145,8 @@ class Database:
if channel_id and not self._validate_channel_id(channel_id): if channel_id and not self._validate_channel_id(channel_id):
raise ValueError(f"Geçersiz channel_id formatı: {channel_id}") raise ValueError(f"Geçersiz channel_id formatı: {channel_id}")
cursor = self.conn.cursor() conn = self.connect()
cursor = conn.cursor()
cursor.execute(""" cursor.execute("""
INSERT OR IGNORE INTO videos INSERT OR IGNORE INTO videos
(video_id, channel_id, video_title, video_url, published_at_utc, transcript_status) (video_id, channel_id, video_title, video_url, published_at_utc, transcript_status)
@@ -146,7 +158,7 @@ class Database:
video_data.get('video_url', '')[:500], # Max length video_data.get('video_url', '')[:500], # Max length
video_data.get('published_at_utc') video_data.get('published_at_utc')
)) ))
self.conn.commit() conn.commit()
def update_video_transcript(self, video_id: str, raw: str, clean: str, def update_video_transcript(self, video_id: str, raw: str, clean: str,
status: int, language: Optional[str] = None): status: int, language: Optional[str] = None):
@@ -158,7 +170,8 @@ class Database:
if status not in [0, 1, 2]: if status not in [0, 1, 2]:
raise ValueError(f"Geçersiz status değeri: {status}") raise ValueError(f"Geçersiz status değeri: {status}")
cursor = self.conn.cursor() conn = self.connect()
cursor = conn.cursor()
now_utc = datetime.now(timezone.utc).isoformat() now_utc = datetime.now(timezone.utc).isoformat()
cursor.execute(""" cursor.execute("""
UPDATE videos UPDATE videos
@@ -170,7 +183,7 @@ class Database:
last_updated_utc = ? last_updated_utc = ?
WHERE video_id = ? WHERE video_id = ?
""", (raw, clean, status, language, now_utc, now_utc, video_id)) """, (raw, clean, status, language, now_utc, now_utc, video_id))
self.conn.commit() conn.commit()
def get_processed_videos(self, limit: Optional[int] = None, def get_processed_videos(self, limit: Optional[int] = None,
channel_id: Optional[str] = None) -> List[Dict]: channel_id: Optional[str] = None) -> List[Dict]:
@@ -182,7 +195,8 @@ class Database:
if limit is not None and (not isinstance(limit, int) or limit < 1 or limit > 1000): if limit is not None and (not isinstance(limit, int) or limit < 1 or limit > 1000):
raise ValueError(f"Geçersiz limit değeri: {limit} (1-1000 arası olmalı)") raise ValueError(f"Geçersiz limit değeri: {limit} (1-1000 arası olmalı)")
cursor = self.conn.cursor() conn = self.connect()
cursor = conn.cursor()
query = """ query = """
SELECT * FROM videos SELECT * FROM videos
WHERE transcript_status = 1 WHERE transcript_status = 1
@@ -208,14 +222,15 @@ class Database:
if not self._validate_video_id(video_id): if not self._validate_video_id(video_id):
raise ValueError(f"Geçersiz video_id formatı: {video_id}") raise ValueError(f"Geçersiz video_id formatı: {video_id}")
cursor = self.conn.cursor() conn = self.connect()
cursor = conn.cursor()
cursor.execute(""" cursor.execute("""
UPDATE videos UPDATE videos
SET transcript_status = 2, SET transcript_status = 2,
last_updated_utc = ? last_updated_utc = ?
WHERE video_id = ? WHERE video_id = ?
""", (datetime.now(timezone.utc).isoformat(), video_id)) """, (datetime.now(timezone.utc).isoformat(), video_id))
self.conn.commit() conn.commit()
def is_transcript_cached(self, video_id: str, cache_days: int = 3) -> bool: def is_transcript_cached(self, video_id: str, cache_days: int = 3) -> bool:
""" """
@@ -231,7 +246,8 @@ class Database:
if not self._validate_video_id(video_id): if not self._validate_video_id(video_id):
return False return False
cursor = self.conn.cursor() conn = self.connect()
cursor = conn.cursor()
# 3 gün içinde işlenmiş ve başarılı (status=1) transcript var mı? # 3 gün içinde işlenmiş ve başarılı (status=1) transcript var mı?
cursor.execute(""" cursor.execute("""
SELECT processed_at_utc, transcript_status, transcript_clean SELECT processed_at_utc, transcript_status, transcript_clean
@@ -272,7 +288,8 @@ class Database:
if not self._validate_video_id(video_id): if not self._validate_video_id(video_id):
return None return None
cursor = self.conn.cursor() conn = self.connect()
cursor = conn.cursor()
cursor.execute(""" cursor.execute("""
SELECT * FROM videos SELECT * FROM videos
WHERE video_id = ? WHERE video_id = ?