
    *#h\6                        d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlZd dlZd dlZd dlmZmZ d dlmZ d dlmZmZ d dlmZ d dlmZ ej:                  j<                  j?                  e       Z!erd dl"Z"e G d	 d
ejF                               Z$dddee%   fdZ&dddee%   fdZ' G d de      Z( G d dejR                        Z*y)    N)	dataclass)TYPE_CHECKINGIterableListOptionalTupleUnion)ArrowWriterParquetWriter)MAX_SHARD_SIZE)is_remote_filesystemrename)_BaseExamplesIterable)convert_file_size_to_intc                   :    e Zd ZU dZdZeej                     ed<   y)SparkConfigzBuilderConfig for Spark.Nfeatures)	__name__
__module____qualname____doc__r   r   datasetsFeatures__annotations__     b/var/www/html/sandstorm/venv/lib/python3.12/site-packages/datasets/packaged_modules/spark/spark.pyr   r      s    ",0Hhx(()0r   r   dfpyspark.sql.DataFramenew_partition_orderc                     | j                  d      j                  d|d          }|dd  D ]6  }| j                  d      j                  d|       }|j                  |      }8 |S )N*z
part_id = r      )selectwhereunion)r   r    df_combinedpartition_idpartition_dfs        r   _reorder_dataframe_by_partitionr*   "   su    ))C.&&4G4J3K'LMK+AB/ 6yy~++j,GH!''56 r   partition_orderc                 "     dd l  fd}|S )Nr   c               3   d  K   j                  d
j                  j                  j                         j	                  d            } t        | 	      }d}|j                  d      }d}|D ]A  }|j                         }|d   }|j                  d       ||k7  r|}d}| d| |f |dz  }C y w)	Nr"   part_idr   T)prefetchPartitions_r#   )	r$   sql	functionsspark_partition_idaliasr*   toLocalIteratorasDictpop)df_with_partition_idr)   row_idrowscurr_partitionrowrow_as_dictr.   r   r+   pysparks           r   generate_fnz0_generate_iterable_examples.<locals>.generate_fn0   s     !yygkk.C.C.V.V.X.^.^_h.ij67K_]++t+D 	C**,K!),GOOI&(!(IQvh'44aKF	s   B-B0)r?   )r   r+   r@   r?   s   `` @r   _generate_iterable_examplesrA   *   s     " r   c                       e Zd Z	 d	 ddZd Zdej                  j                  dd fdZde	de	dd fd	Z
ede	fd
       Zy)SparkExamplesIterableNc                     || _         |xs- t        | j                   j                  j                               | _        t        | j                   | j                        | _        y N)r   rangerddgetNumPartitionsr+   rA   generate_examples_fn)selfr   r+   s      r   __init__zSparkExamplesIterable.__init__E   sG    
 .W%8T8T8V2W$?I]I]$^!r   c              #   @   K   | j                         E d {    y 7 wrE   )rI   rJ   s    r   __iter__zSparkExamplesIterable.__iter__N   s     ,,...s   	generatorreturnc                     t        t        | j                  j                  j	                                     }|j                  |       t        | j                  |      S N)r+   )listrF   r   rG   rH   shufflerC   )rJ   rO   r+   s      r   shuffle_data_sourcesz*SparkExamplesIterable.shuffle_data_sourcesQ   sA    uTWW[[%A%A%CDE/*$TWWoNNr   	worker_idnum_workersc                 T    | j                  ||      }t        | j                  |      S rR   )split_shard_indices_by_workerrC   r   )rJ   rV   rW   r+   s       r   shard_data_sourcesz(SparkExamplesIterable.shard_data_sourcesV   s%    <<YT$TWWoNNr   c                 ,    t        | j                        S rE   )lenr+   rM   s    r   n_shardszSparkExamplesIterable.n_shardsZ   s    4''((r   rE   )r   r   )r   r   r   rK   rN   nprandom	GeneratorrU   intrZ   propertyr]   r   r   r   rC   rC   D   sv     _#_/Obii.A.A OF] O
OC Oc OF] O )# ) )r   rC   c                   
    e Zd ZeZ	 	 ddddedef fdZd Zd Zde	j                  j                  j                  fd	Zd
 Zdedededeeeeeeef   f      fdZ	 	 	 ddddedeeeef      dee   fdZdddefdZ xZS )Sparkr   r   	cache_dirworking_dirc                     dd l }|j                  j                  j                  j	                         | _        || _        || _        t        | $  d|t        | j                  j                               d| y )Nr   )re   config_namer   )r?   r2   SparkSessionbuildergetOrCreate_sparkr   _working_dirsuperrK   strsemanticHash)rJ   r   re   rf   config_kwargsr?   	__class__s         r   rK   zSpark.__init__b   sk     	kk..66BBD' 	
DGG0023	
 	
r   c                    | j                   fd}| j                  j                  j                  dd      j	                  d      ry | j                   ro| j                  j
                  j                  t        d      d      j                  |      j                         }t        j                  j                  |d         ry t        d      )Nc                     t        j                  d       t         j                  j                  dt	        j
                         j                  z         }t        |d       |gS )NT)exist_okfs_testa)osmakedirspathjoinuuiduuid4hexopen)context
probe_filere   s     r   create_cache_and_write_probez?Spark._validate_cache_dir.<locals>.create_cache_and_write_probe{   sL     KK	D1iTZZ\=M=M1MNJ S!<r   zspark.master localr#   r   ztWhen using Dataset.from_spark on a multi-node cluster, the driver and all workers should be able to access cache_dir)
_cache_dirrl   confget
startswithsparkContextparallelizerF   mapPartitionscollectrx   rz   isfile
ValueError)rJ   r   probere   s      @r   _validate_cache_dirzSpark._validate_cache_diru   s     OO		  ;;3>>wG
 ??((44U1XqAOOPlmuuw  ww~~eAh' C
 	
r   c                 V    t        j                  | j                  j                        S )N)r   )r   DatasetInfoconfigr   rM   s    r   _infozSpark._info   s    ##T[[-A-ABBr   
dl_managerc                 `    t        j                  t         j                  j                        gS )N)name)r   SplitGeneratorSplitTRAIN)rJ   r   s     r   _split_generatorszSpark._split_generators   s     ''X^^-A-ABCCr   c                    dd l }d }| j                  j                         }|dk  r|nd}| j                  j                  |      j	                  d      j                  |d      j                  |j                  j                  j                  d      j                  d            j                         d   j                  |z  }||z  }||kD  r9t        |t        ||z              }| j                  j	                  |      | _        y y )Nr   c              3   v   K   | D ]0  }t         j                  j                  d|j                  gi       2 y w)Nbatch_bytes)paRecordBatchfrom_pydictnbytes)itbatchs     r   get_arrow_batch_sizez=Spark._repartition_df_if_needed.<locals>.get_arrow_batch_size   s7      Rnn00-%,,1PQQRs   79d   r#   zbatch_bytes: longr   sample_bytes)r?   r   countlimitrepartition
mapInArrowaggr2   r3   sumr5   r   r   minra   )	rJ   max_shard_sizer?   r   df_num_rowssample_num_rowsapprox_bytes_per_rowapprox_total_sizenew_num_partitionss	            r   _repartition_df_if_neededzSpark._repartition_df_if_needed   s    	R ggmmo)4);+ GGMM/*[^Z,.ABS&&**=9??OPWYq	
 \ 	 1;>~-!$[#6G.6X2Y!Zgg))*<=DG .r   fpathfile_formatr   rP   c           	   #   2  	
K   dd l 	|dk(  rt        nt        | j                  rGt        j
                  j                  | j                  t        j
                  j                              n|dk(  | j                  j                  | j                  | j                  j                  
	
f	d}| j                  j                  |d      j                  d      j!                  	j"                  j$                  j'                  d      j)                  d      	j"                  j$                  j'                  d      j)                  d	      	j"                  j$                  j+                  d      j)                  d
      	j"                  j$                  j-                  d      j)                  d            j/                         }|D ]>  }|j0                  |j2                  |j4                  |j6                  |j8                  ff @ y w)Nr   parquetc           	   3   D  	K    j                          j                         }t        | d       }|)t        j                  j                  |gdgdggg d      S d} j                  d|d      j                  d|d            }t        j                  j                  |g      }|j                  |       | D ]  }|j                  k\  r|j                         \  }}|j                          t        j                  j                  |g|g|ggg d       |dz  } |j                  j                  d|d      j                  d|d            }t        j                  j                  |g      }|j                  |        |j                  dkD  rN|j                         \  }}|j                          t        j                  j                  |g|g|ggg d       k7  rt        j                  t        j                   j#                              D ]r  }	t        j                   j%                  t        j                   j#                        t        j                   j'                  |	            }
t(        j+                  |	|
       t y y w)	Nr   )task_idnum_examples	num_bytes)namesSSSSS05dTTTTT)r   rz   writer_batch_sizestorage_optionsembed_local_filesr#   )TaskContexttaskAttemptIdnextr   r   from_arraysreplaceTablefrom_batcheswrite_table
_num_bytesfinalizeclose	_featuresrx   listdirrz   dirnamer{   basenameshutilmove)r   r   first_batchshard_idwritertabler   r   r   filedestr   r   r   r   r?   r   working_fpathr   writer_classs              r   write_arrowz0Spark._prepare_split_single.<locals>.write_arrow   sx    )g))+99;Gr4.K"~~11YaS)B 2   H!!"**7xnFNNw[bcfZgi"3 /"3F HH));-8Eu% *!-&2C2C~2U.4oo.?+L)LLN..44!\NYK@F 5   MH)!'!1!1*227xnNVVW^cjknboq*;(7*;F --ug6""5)#*&   1$*0//*;'inn00Y<B 1  
 %JJrww}'EF ,D77<<(>@P@PQU@VWDKKd+, &s   JJ z2task_id: long, num_examples: long, num_bytes: longr   r   total_num_examplesr   total_num_bytes
num_shardsshard_lengths)r?   r   r
   rm   rx   rz   r{   r   r   r   _writer_batch_size_fsr   r   r   groupByr   r2   r3   r   r5   r   collect_listr   r   r   r   r   r   )rJ   r   r   r   r   statsr=   r   r   r?   r   r   r   r   s    ` `   @@@@@@@r   _prepare_split_singlezSpark._prepare_split_single   s     	(3y(@}kTXTeTeT%6%68H8H8OPkp'94 ;;'' 33((222	, 2	,j GG{,`aWYS%%)).9??@TU%%))+6<<=NO%%++K8>>|L%%22>BHHY	 WY 	  	pC++ 6 68K8KS^^]`]n]nooo	ps   HHsplit_generatorzdatasets.SplitGeneratornum_procc                    | j                          t        |xs t              }| j                  |       t	        | j
                         }|rt        j                  j                  nt        j                  }d}| j                   d|j                   | d| }	 || j                  |	      d}
d}dg }g }| j                  ||      D ]E  \  }}|\  }}}}|dkD  s|
|z  }
||z  }|z  |j                  ||f       |j                  |       G |
|j                  _        ||j                  _        t$        j'                  d d       dkD  r||j                  _        | j
                  dt*        d	t*        d
t*        ffdg }d}t-        t/        |            D ]3  }||   \  }}t-        |      D ]  }|j                  |||g       |dz  } 5 | j0                  j2                  j5                  |t/        |            j7                  fd      j9                          y d}|d   d   }| j;                  j=                  d|d      j=                  d|d      j=                  |d             y )Nz-TTTTT-SSSSS-of-NNNNN-.r   z	Renaming z shards.r#   r   r   global_shard_idc           	          t        j                  d|d      j                  d| d      j                  d|d      j                  dd             y )Nr   r   r   zTTTTT-SSSSSNNNNN)r   r   )r   r   r   r   fstotal_shardss      r   _rename_shardz+Spark._prepare_split.<locals>._rename_shard=  s^    
 MM'hs^=EEgRYZ]Q^`MM-OC3HJRRSZ_klo^prr   c                      |  S rE   r   )argsr   s    r   <lambda>z&Spark._prepare_split.<locals>.<lambda>O  s    S`bfSg r   r   r   r   r   )r   r   r   r   r   r   rx   rz   r{   	posixpathr   _output_dirr   appendextend
split_infor   r   loggerdebugr   ra   rF   r\   rl   r   r   mapr   _renamer   )rJ   r   r   r   r   kwargsis_local	path_joinSUFFIXfnamer   r   task_id_and_num_shardsall_shard_lengthsr   contentr   r   r   r   r   r   ir   r   r   r   r   s                           @@@@r   _prepare_splitzSpark._prepare_split	  s    	  "1.2RNS&&~6+DHH55$,BGGLL)..	(99+Q334VHAk]K$**E2!# $ : :5+~ ^ 	8GW 1}"l2"9,
*&--w
.CD!((7	8 3E""//>"", 	yh78!7HO&&4 B			 "%	 DO3567 )&<Q&?# %j 1 )HKK(O DE#q(O))
 KK$$00s4yAEEFghppr H,Q/2GLLg(39AA'gVY]\fb)r   c                 ,    t        | j                        S rE   )rC   r   )rJ   r   s     r    _get_examples_iterable_for_splitz&Spark._get_examples_iterable_for_splitY  s     %TWW--r   )NN)arrowNN)r   r   r   r   BUILDER_CONFIG_CLASSro   rK   r   r   r   downloaddownload_managerDownloadManagerr   r   ra   r   r   boolr	   tupler   r   r	  rC   r  __classcell__)rr   s   @r   rd   rd   _   s   &
 	
#
 
 	
&
BCDH,=,=,N,N,^,^ D>2RpRp Rp 	Rp
 
%T5e#445	6Rpn #48"&N2N N !sCx1	N
 3-N`.2. 
.r   rd   )+rx   r   r|   dataclassesr   typingr   r   r   r   r   r	   numpyr^   pyarrowr   r   datasets.arrow_writerr
   r   datasets.configr   datasets.filesystemsr   r   datasets.iterable_datasetr   datasets.utils.py_utilsr   utilslogging
get_loggerr   r   r?   BuilderConfigr   ra   r*   rA   rC   DatasetBuilderrd   r   r   r   <module>r"     s    	   ! H H    < * < < 
			*	*8	4 1((( 1 1(? VZ[^V_ #Y4)1 )6~.H## ~.r   