o
    xq@g@                     @   s   d dl Z d dlmZ d dlZd dlZd dlZd dlm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZmZ d dlmZ d dlZd dlZd dlmZ d d	lmZ d dlZe  G d
d dZdS )    N)embedding_functions)ListDictOptional)CrossEncoder)RecursiveCharacterTextSplitter)ThreadPoolExecutoras_completed)PyPDFLoader)datetime)load_dotenvc                   @   s  e Zd Zd8dedee ded	ed
edededefddZdd Z	deeef fddZ
dd Zdedee fddZdd Zd9deded ed!ed"ed#ed$edefd%d&Zd:d(ed)ee d*ee defd+d,Zd-d. Zd)edd'fd/d0Zd;d3ed)ed4ed5edee f
d6d7Zd'S )<HybridSearch	search.dbpdf_uuids.json           ?collection_nameuserfolder_nameduckdb_pathuuid_storage_path
chunk_sizechunk_overlapalphac	                 C   s   t dd t|j d t| }	|| _|| _|| _|	d | | _tddd| _	| 
 | _|	d }
t j|
s=t |
 tj|
d| _tjt dd	d
}| jj||d| _t|	d | | _|   t||tdd| _dS )z5
        Initialize the hybrid search system
        DB_PATH/z$cross-encoder/ms-marco-MiniLM-L-6-v2i   )
max_lengthz
/chroma_db)pathOPENAI_API_KEYztext-embedding-3-small)api_key
model_name)nameembedding_functionF)r   r   length_functionis_separator_regexN)osgetenvstr	user_coder   r   r   r   r   reranker_model_load_uuid_storage	pdf_uuidsr   existsmakedirschromadbPersistentClientchroma_clientr   OpenAIEmbeddingFunctionget_or_create_collectionchroma_collectionduckdbconnectconn_setup_duckdbr   lentext_splitter)selfr   r   r   r   r   r   r   r   r   persist_directory	openai_ef r>   9/var/www/html/answerit/server/dependencies/chromautils.py__init__   s6   $

zHybridSearch.__init__c                    s:   | j  fdd|D }t||D ]\}}||d< q|S )Nc                    s   g | ]} |d  fqS )contentr>   ).0chunkqueryr>   r?   
<listcomp>C   s    z)HybridSearch.reranker.<locals>.<listcomp>reranker_score)r*   predictzip)r;   rE   resultsscoresscorerC   r>   rD   r?   rerankerA   s   
zHybridSearch.rerankerreturnc                 C   sH   t j| jr"t| jd}t|W  d   S 1 sw   Y  i S )z8
        Load the UUID storage from a JSON file
        rN)r&   r   r-   r   openjsonloadr;   filer>   r>   r?   r+   H   s
    zHybridSearch._load_uuid_storagec                 C   sD   t | jd}tj| j|dd W d   dS 1 sw   Y  dS )z6
        Save the UUID storage to a JSON file
        w   )indentN)rP   r   rQ   dumpr,   rS   r>   r>   r?   _save_uuid_storageQ   s   "zHybridSearch._save_uuid_storagepdf_namec                 C   s,   | j  D ]\}}|d |kr|  S qdS )z8
        Retrieve the UUID for a given PDF name
        rZ   N)r,   items)r;   rZ   uuiddetailsr>   r>   r?   get_pdf_uuidX   s
   zHybridSearch.get_pdf_uuidc                    st    j d  j d  j d  j d  j d z j jd fddd	d
 W dS  tjy9   Y dS w )z2
        Set up DuckDB tables and indexes
        aG  
            CREATE TABLE IF NOT EXISTS documents (
                chunk_id VARCHAR PRIMARY KEY,
                pdf_uuid VARCHAR,
                content TEXT,
                page_number INTEGER,
                chunk_number INTEGER,
                created_at TIMESTAMP,
                metadata JSON
            )
        z
            CREATE TABLE IF NOT EXISTS terms (
                term VARCHAR,
                chunk_id VARCHAR,
                tf INTEGER,
                UNIQUE(term, chunk_id)
            )
        z
            CREATE TABLE IF NOT EXISTS doc_stats (
                collection_name VARCHAR PRIMARY KEY,
                total_chunks INTEGER,
                avg_chunk_length DOUBLE
            )
        z>CREATE INDEX IF NOT EXISTS idx_pdf_uuid ON documents(pdf_uuid)z2CREATE INDEX IF NOT EXISTS idx_term ON terms(term)
bm25_scorec                    s     | ||||S )N)_bm25_score)tfdfdoc_lenavg_doc_len
total_docsr;   r>   r?   <lambda>   s   z,HybridSearch._setup_duckdb.<locals>.<lambda>DOUBLE)return_typeN)r7   executecreate_functionr5   CatalogExceptionrf   r>   rf   r?   r8   a   s   
	
zHybridSearch._setup_duckdb      ?      ?ra   rb   rc   rd   re   k1bc           
      C   sR   t || d |d  d }||d  ||d| || |     }	t||	 S )z-Calculate BM25 score for a term in a documentr      )nplogfloat)
r;   ra   rb   rc   rd   re   ro   rp   idftf_adjustedr>   r>   r?   r`      s   (zHybridSearch._bm25_scoreNpdf_pathpdf_uuidmetadatac                 C   sF  |du r
t t }|du ri }tj|dd }|||d| j|< |   t	|}|
 }| jd|g | jd|g | jjd|idd	 }|rT| jj|d
 g }t ,}	g }
t|D ]\}}|
|	| j||||| q`t|
D ]}|  qwW d   n1 sw   Y  t|}| jd | jd| jg |S )zG
        Process and upsert a PDF document into both databases
        N.r   )rZ   rw   ry   (DELETE FROM documents WHERE pdf_uuid = ?WDELETE FROM terms WHERE chunk_id IN (SELECT chunk_id FROM documents WHERE pdf_uuid = ?)rx   whereidsr   z-INSERT INTO documents SELECT * FROM chunks_dfz
            INSERT OR REPLACE INTO doc_stats 
            SELECT 
                ? as collection_name,
                COUNT(*) as total_chunks,
                AVG(LENGTH(content)) as avg_chunk_length
            FROM documents
        )r(   r\   uuid4r&   r   basenamesplitr,   rY   r
   rR   r7   rj   r4   getdeleter   	enumerateappendsubmitprocess_pager	   resultpd	DataFramer   )r;   rw   rx   ry   rZ   loaderpagesexisting_ids
all_chunksexecutorfuturespage_numpagefuture	chunks_dfr>   r>   r?   
upsert_pdf   sL   

	zHybridSearch.upsert_pdfc                 C   s   | j |j}t|D ]d\}}| d| d| }	t }
||||
 d|}| jj|	g|g|gd |	|	|||||
|d |
  }t| }g }| D ]\}}|	||	|d qT|rot|}| jd qdS )	z|
        Process a single page and add chunks to databases.
        This method will run in parallel for each page.
        z-pz-c)rx   page_numberchunk_number
created_at)r   	documents	metadatas)chunk_idrx   rA   r   r   r   ry   )termr   ra   z'INSERT INTO terms SELECT * FROM term_dfN)r:   
split_textpage_contentr   r   now	isoformatr4   addr   lowerr   r   Seriesvalue_countsr[   r   r7   rj   )r;   r   r   rx   ry   r   chunks	chunk_numrC   r   r   chunk_metadataterms	term_freq	term_datar   freqterm_dfr>   r>   r?   r      sN   	

zHybridSearch.process_pagec              
   C   s   | j d|g | j d|g z| jjd|idd }|r%| jj|d W n ty? } ztd|  W Y d}~nd}~ww td	| d
 dS )z
        Delete all data associated with a given PDF UUID from DuckDB and ChromaDB.
        
        Args:
            pdf_uuid (str): The unique identifier of the PDF to delete.
        r{   r|   rx   r}   r   r   z$Error while deleting from ChromaDB: Nz"All data associated with PDF UUID z has been deleted.)r7   rj   r4   r   r   	Exceptionprint)r;   rx   r   er>   r>   r?   delete_collection_by_uuid  s,   z&HybridSearch.delete_collection_by_uuid      rE   top_kfinal_nc                    s  | j j|gd|i|d}|  }dddd |D }| j||| jg| |||g 	 }	|d d }
|
rTt
|
t|
fd	d
t|d d |
D }ni }dd
 |	 D }|rt
| t|   kr~ fdd
| D }ndd
 |D }t| t| B }g }|D ]?}||d}||d}|| d| |  }| jd|g	 jd }|||d t|d t|d |d |||d q|r| ||}t|dd ddd| S )z=
        Perform hybrid search within a specific PDF
        rx   )query_textsr~   	n_resultsa#  
        WITH pdf_stats AS (
            -- Calculate statistics specific to the PDF
            SELECT COUNT(*) AS total_chunks, AVG(LENGTH(content)) AS avg_chunk_length
            FROM documents
            WHERE pdf_uuid = ?  -- Filter by the specific PDF
        ),
        doc_lengths AS (
            -- Get the length of each chunk within the specific PDF
            SELECT d.chunk_id, LENGTH(d.content) AS doc_len
            FROM documents d
            WHERE d.pdf_uuid = ?  -- Filter by the specific PDF
        ),
        term_stats AS (
            -- Calculate term statistics specific to the PDF
            SELECT 
                t.term,
                t.chunk_id,
                t.tf,
                COUNT(*) OVER (PARTITION BY t.term) AS df  -- Document frequency within this PDF
            FROM terms t
            JOIN documents d ON t.chunk_id = d.chunk_id
            WHERE t.term IN ({}) AND d.pdf_uuid = ?  -- Filter terms by the specific PDF
        ),
        scores AS (
            -- Calculate BM25 score for each chunk in the specific PDF
            SELECT 
                t.chunk_id,
                SUM(bm25_score(
                    t.tf, 
                    t.df, 
                    dl.doc_len, 
                    ps.avg_chunk_length,   -- Use PDF-level average chunk length
                    ps.total_chunks        -- Use PDF-level total chunks
                )) AS bm25_score
            FROM term_stats t
            JOIN doc_lengths dl ON t.chunk_id = dl.chunk_id
            CROSS JOIN pdf_stats ps  -- Join with PDF-specific statistics
            GROUP BY t.chunk_id
        )
        SELECT 
            d.chunk_id,
            d.content,
            d.page_number,
            d.chunk_number,
            d.metadata,
            COALESCE(s.bm25_score, 0) AS bm25_score  -- Default to 0 if no BM25 score
        FROM documents d
        LEFT JOIN scores s ON d.chunk_id = s.chunk_id
        WHERE d.pdf_uuid = ?  -- Filter by the specific PDF
        ORDER BY bm25_score DESC  -- Sort by BM25 score
        LIMIT ?
        ,c                 S   s   g | ]}d qS )?r>   )rB   _r>   r>   r?   rF   u  s    z'HybridSearch.search.<locals>.<listcomp>	distancesr   c                    s&   i | ]\}}|d |     qS )rq   r>   )rB   r   distance)max_distancemin_distancer>   r?   
<dictcomp>  s    z'HybridSearch.search.<locals>.<dictcomp>r   c                 S   s   i | ]\}}|d  |d qS )r   r_   r>   )rB   r   rowr>   r>   r?   r     s    c                    s"   i | ]\}}||    qS r>   r>   )rB   r   rL   )max_bm25_scoremin_bm25_scorer>   r?   r     s    c                 S   s   i | ]}|d qS )r   r>   )rB   r   r>   r>   r?   r     s    rq   z*SELECT * FROM documents WHERE chunk_id = ?rA   r   r   ry   )r   rA   r   r   ry   combined_scorevector_scorer_   c                 S   s   | d S )NrG   r>   )xr>   r>   r?   rg     s    z%HybridSearch.search.<locals>.<lambda>T)keyreverseN)r4   rE   r   r   formatjoinr7   rj   r   fetchdfminmaxrI   iterrowsvaluesr[   setkeysr   ilocr   intrM   sorted)r;   rE   rx   r   r   r   vector_resultsquery_terms
bm25_querybm25_resultsvector_distancesvector_scoresbm25_scoresr   combined_resultsr   r   r_   r   chunk_detailsr>   )r   r   r   r   r?   search3  sv   46


zHybridSearch.search)r   r   r   r   r   )rm   rn   )NN)r   r   r   )__name__
__module____qualname__r(   r   r   r   rt   r@   rM   r+   rY   r   r^   r8   r`   r   r   r   r   r>   r>   r>   r?   r      s    0,		,5$D6(r   )r/   chromadb.utilsr   r5   pandasr   numpyrr   typingr   r   r   sentence_transformersr   langchain.text_splitterr   concurrent.futuresr   r	   langchain.document_loadersr
   r\   r&   r   dotenvr   rQ   r   r>   r>   r>   r?   <module>   s"    