From 1210d1bff137effb7cf53e7d578997930d0bb27f Mon Sep 17 00:00:00 2001 From: salvacybersec Date: Thu, 13 Nov 2025 04:53:33 +0300 Subject: [PATCH] db thread --- src/database.py | 61 +++++++++++++++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 22 deletions(-) diff --git a/src/database.py b/src/database.py index f165eda..afa9812 100644 --- a/src/database.py +++ b/src/database.py @@ -4,24 +4,32 @@ SQLite veritabanı yönetimi modülü import sqlite3 import os import re +import threading from datetime import datetime, timezone from typing import Optional, List, Dict class Database: - """SQLite veritabanı yönetim sınıfı""" + """SQLite veritabanı yönetim sınıfı (thread-safe)""" def __init__(self, db_path: str = "data/videos.db"): self.db_path = db_path - self.conn = None + # Thread-local storage: Her thread'in kendi connection'ı olacak + self._local = threading.local() def connect(self): - """Veritabanı bağlantısı oluştur""" - # Dizin yoksa oluştur - os.makedirs(os.path.dirname(self.db_path), exist_ok=True) - self.conn = sqlite3.connect(self.db_path) - self.conn.row_factory = sqlite3.Row - return self.conn + """Veritabanı bağlantısı oluştur (thread-safe)""" + # Her thread'in kendi connection'ı var mı kontrol et + if not hasattr(self._local, 'conn') or self._local.conn is None: + # Dizin yoksa oluştur + os.makedirs(os.path.dirname(self.db_path), exist_ok=True) + # check_same_thread=False: Thread-safe çalışma için + self._local.conn = sqlite3.connect( + self.db_path, + check_same_thread=False + ) + self._local.conn.row_factory = sqlite3.Row + return self._local.conn def init_database(self): """Veritabanı şemasını oluştur""" @@ -86,9 +94,10 @@ class Database: print("Database initialized successfully") def close(self): - """Veritabanı bağlantısını kapat""" - if self.conn: - self.conn.close() + """Veritabanı bağlantısını kapat (thread-safe)""" + if hasattr(self._local, 'conn') and self._local.conn: + self._local.conn.close() + self._local.conn = None def _validate_video_id(self, video_id: str) -> bool: """Video ID formatını doğrula (SQL injection koruması)""" @@ -108,13 +117,15 @@ class Database: """Video işlenmiş mi kontrol et""" if not self._validate_video_id(video_id): raise ValueError(f"Geçersiz video_id formatı: {video_id}") - cursor = self.conn.cursor() + conn = self.connect() + cursor = conn.cursor() cursor.execute("SELECT video_id FROM videos WHERE video_id = ?", (video_id,)) return cursor.fetchone() is not None def get_pending_videos(self) -> List[Dict]: """İşlenmeyi bekleyen videoları getir (status=0)""" - cursor = self.conn.cursor() + conn = self.connect() + cursor = conn.cursor() cursor.execute(""" SELECT * FROM videos WHERE transcript_status = 0 @@ -134,7 +145,8 @@ class Database: if channel_id and not self._validate_channel_id(channel_id): raise ValueError(f"Geçersiz channel_id formatı: {channel_id}") - cursor = self.conn.cursor() + conn = self.connect() + cursor = conn.cursor() cursor.execute(""" INSERT OR IGNORE INTO videos (video_id, channel_id, video_title, video_url, published_at_utc, transcript_status) @@ -146,7 +158,7 @@ class Database: video_data.get('video_url', '')[:500], # Max length video_data.get('published_at_utc') )) - self.conn.commit() + conn.commit() def update_video_transcript(self, video_id: str, raw: str, clean: str, status: int, language: Optional[str] = None): @@ -158,7 +170,8 @@ class Database: if status not in [0, 1, 2]: raise ValueError(f"Geçersiz status değeri: {status}") - cursor = self.conn.cursor() + conn = self.connect() + cursor = conn.cursor() now_utc = datetime.now(timezone.utc).isoformat() cursor.execute(""" UPDATE videos @@ -170,7 +183,7 @@ class Database: last_updated_utc = ? WHERE video_id = ? """, (raw, clean, status, language, now_utc, now_utc, video_id)) - self.conn.commit() + conn.commit() def get_processed_videos(self, limit: Optional[int] = None, channel_id: Optional[str] = None) -> List[Dict]: @@ -182,7 +195,8 @@ class Database: if limit is not None and (not isinstance(limit, int) or limit < 1 or limit > 1000): raise ValueError(f"Geçersiz limit değeri: {limit} (1-1000 arası olmalı)") - cursor = self.conn.cursor() + conn = self.connect() + cursor = conn.cursor() query = """ SELECT * FROM videos WHERE transcript_status = 1 @@ -208,14 +222,15 @@ class Database: if not self._validate_video_id(video_id): raise ValueError(f"Geçersiz video_id formatı: {video_id}") - cursor = self.conn.cursor() + conn = self.connect() + cursor = conn.cursor() cursor.execute(""" UPDATE videos SET transcript_status = 2, last_updated_utc = ? WHERE video_id = ? """, (datetime.now(timezone.utc).isoformat(), video_id)) - self.conn.commit() + conn.commit() def is_transcript_cached(self, video_id: str, cache_days: int = 3) -> bool: """ @@ -231,7 +246,8 @@ class Database: if not self._validate_video_id(video_id): return False - cursor = self.conn.cursor() + conn = self.connect() + cursor = conn.cursor() # 3 gün içinde işlenmiş ve başarılı (status=1) transcript var mı? cursor.execute(""" SELECT processed_at_utc, transcript_status, transcript_clean @@ -272,7 +288,8 @@ class Database: if not self._validate_video_id(video_id): return None - cursor = self.conn.cursor() + conn = self.connect() + cursor = conn.cursor() cursor.execute(""" SELECT * FROM videos WHERE video_id = ?