66from distutils .version import LooseVersion
77from functools import partial
88
9- import dask
10- import dask .dataframe as dd
119import numpy as np
1210import tensorflow as tf
1311from tensorflow .keras .utils import to_categorical as tf_to_categorical
1412
1513from deeptables .utils import consts , dt_logging
16-
14+ from hypernets . tabular import get_tool_box , is_dask_installed
1715logger = dt_logging .get_logger (__name__ )
1816
1917TFDG_DASK_CHUNK = 100
@@ -105,6 +103,7 @@ def __call__(self, X, y=None, *, batch_size, shuffle, drop_remainder):
105103 return ds
106104
107105 def _to_ds20 (self , X , y = None , * , batch_size , shuffle , drop_remainder ):
106+ import dask
108107 ds_types = {}
109108 ds_shapes = {}
110109 meta = self ._get_meta (X )
@@ -118,6 +117,7 @@ def _to_ds20(self, X, y=None, *, batch_size, shuffle, drop_remainder):
118117 ds_types [k ] = 'int32'
119118
120119 if y is not None :
120+ import dask .dataframe as dd
121121 if isinstance (y , dd .Series ):
122122 y = y .to_dask_array (lengths = True )
123123 if self .task == consts .TASK_MULTICLASS :
@@ -149,6 +149,7 @@ def to_spec(name, dtype, idx):
149149 sig = {k : to_spec (k , dtype , idx ) for k , (dtype , idx ) in meta .items ()}
150150
151151 if y is not None :
152+ import dask .dataframe as dd
152153 if isinstance (y , dd .Series ):
153154 y = y .to_dask_array (lengths = True )
154155 if self .task == consts .TASK_MULTICLASS :
@@ -167,6 +168,7 @@ def to_spec(name, dtype, idx):
167168
168169 @staticmethod
169170 def _generate (meta , X , y , * , batch_size , shuffle , drop_remainder ):
171+ import dask
170172 total_size = dask .compute (X .shape )[0 ][0 ]
171173 chunk_size = min (total_size , batch_size * TFDG_DASK_CHUNK )
172174 fn = partial (_TFDGForDask ._compute_chunk , X , y , chunk_size )
@@ -205,6 +207,7 @@ def _generate(meta, X, y, *, batch_size, shuffle, drop_remainder):
205207
206208 @staticmethod
207209 def _to_categorical (y , * , num_classes ):
210+ import dask
208211 if len (y .shape ) == 1 :
209212 y = y .reshape (dask .compute (y .shape [0 ])[0 ], 1 )
210213 fn = partial (tf_to_categorical , num_classes = num_classes , dtype = 'float32' )
@@ -213,6 +216,7 @@ def _to_categorical(y, *, num_classes):
213216
214217 @staticmethod
215218 def _compute_chunk (X , y , chunk_size , i ):
219+ import dask
216220 try :
217221 Xc = X [i :i + chunk_size ]
218222 yc = y [i :i + chunk_size ] if y is not None else None
@@ -236,7 +240,12 @@ def _range(start, stop, step, shuffle):
236240def to_dataset (config , task , num_classes , X , y = None , * ,
237241 batch_size , shuffle , drop_remainder ,
238242 categorical_columns , continuous_columns , var_len_categorical_columns ):
239- cls = _TFDGForDask if isinstance (X , dd .DataFrame ) else _TFDGForPandas
243+
244+ if is_dask_installed :
245+ import dask .dataframe as dd
246+ cls = _TFDGForDask if isinstance (X , dd .DataFrame ) else _TFDGForPandas
247+ else :
248+ cls = _TFDGForPandas
240249 logger .info (f'create dataset generator with { cls .__name__ } , '
241250 f'batch_size={ batch_size } , shuffle={ shuffle } , drop_remainder={ drop_remainder } ' )
242251
0 commit comments