Skip to content
Open

Internal #11163

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions tensorflow_datasets/datasets/robonet/robonet_dataset_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,38 @@ def __init__(
self.height = height


def _process_example(filename):
"""Converts one video from hdf5 format.

Args:
filename: The path to the hdf5 file.

Returns:
A tuple containing the basename of the file and a dictionary with the
extracted features: 'video', 'actions', 'states', and 'filename'.
"""
h5py = tfds.core.lazy_imports.h5py
with h5py.File(filename) as hf:
video_bytes = hf['env']['cam0_video']['frames'][:].tobytes()
states = hf['env']['state'][:].astype(np.float32)
states = np.pad(
states, ((0, 0), (0, STATES_DIM - states.shape[1])), 'constant'
)
actions = hf['policy']['actions'][:].astype(np.float32)
actions = np.pad(
actions, ((0, 0), (0, ACTIONS_DIM - actions.shape[1])), 'constant'
)

basename = os.path.basename(filename)
features = {
'video': video_bytes,
'actions': actions,
'states': states,
'filename': basename,
}
return basename, features


class Builder(tfds.core.BeamBasedBuilder):
"""RoboNet: Large-Scale Multi-Robot Learning."""

Expand Down Expand Up @@ -165,28 +197,5 @@ def _build_pcollection(self, pipeline, filedir):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam

def _process_example(filename):
"""Converts one video from hdf5 format."""
h5py = tfds.core.lazy_imports.h5py
with h5py.File(filename) as hf:
video_bytes = hf['env']['cam0_video']['frames'][:].tobytes()
states = hf['env']['state'][:].astype(np.float32)
states = np.pad(
states, ((0, 0), (0, STATES_DIM - states.shape[1])), 'constant'
)
actions = hf['policy']['actions'][:].astype(np.float32)
actions = np.pad(
actions, ((0, 0), (0, ACTIONS_DIM - actions.shape[1])), 'constant'
)

basename = os.path.basename(filename)
features = {
'video': video_bytes,
'actions': actions,
'states': states,
'filename': basename,
}
return basename, features

filenames = tf.io.gfile.glob(os.path.join(filedir, '*.hdf5'))
return pipeline | beam.Create(filenames) | beam.Map(_process_example)
74 changes: 46 additions & 28 deletions tensorflow_datasets/rl_unplugged/rlu_rwrl/rlu_rwrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import dataclasses
import functools
import os
from typing import Any, Dict, Generator, Optional, Sequence, Text, Tuple, Union
from typing import Any, Iterable, Optional, Sequence, Text, Union

import numpy as np
from tensorflow_datasets.core.utils.lazy_imports_utils import tensorflow as tf
Expand Down Expand Up @@ -203,8 +203,8 @@ def tf_example_to_feature_description(


def tree_deflatten_with_delimiter(
flat_dict: Dict[str, Any], delimiter: str = _DELIMITER
) -> Dict[str, Any]:
flat_dict: dict[str, Any], delimiter: str = _DELIMITER
) -> dict[str, Any]:
"""De-flattens a dict to its originally nested structure.

Does the opposite of {combine_nested_keys(k) :v
Expand All @@ -228,7 +228,7 @@ def tree_deflatten_with_delimiter(


def tf_feature_to_tfds_feature(
nested: Union[tf.io.FixedLenFeature, Dict[Text, Any]]
nested: Union[tf.io.FixedLenFeature, dict[Text, Any]]
):
"""Converts potentially nested tf features into tfds features."""
if isinstance(nested, tf.io.FixedLenFeature):
Expand All @@ -245,6 +245,42 @@ def tf_feature_to_tfds_feature(
raise ValueError(f'Unsupported type {type(nested)}')


def _generate_examples_one_file_fn(
path,
feature_description,
tf_example_to_step_ds_fn,
) -> Iterable[tuple[str, dict[str, Any]]]:
"""Yields examples from one file.

Args:
path: The file path to the TFRecord file.
feature_description: A dictionary describing the features in the tf.Example.
tf_example_to_step_ds_fn: A function that takes a tf.Example tensor and the
feature description, and returns a dictionary representing an episode.

Yields:
A tuple containing:
- A unique key for the episode, composed of the file base name and an
episode index.
- A dictionary containing the episode data, as returned by
`tf_example_to_step_ds_fn`.
"""
key_prefix = os.path.basename(path)
# Dataset of tf.Examples containing full episodes.
example_ds = tf.data.TFRecordDataset(filenames=str(path))
# Dataset of episodes, each represented as a dataset of steps.
episode_ds = example_ds.map(
functools.partial(
tf_example_to_step_ds_fn,
feature_description=feature_description,
),
num_parallel_calls=tf.data.experimental.AUTOTUNE,
)
episode_ds = tfds.as_numpy(episode_ds)
for episode_id, e in enumerate(episode_ds):
yield f'{key_prefix}/{episode_id}', e


class RluRwrl(rlu_common.RLUBuilder):
"""DatasetBuilder for rlu_rwrl dataset."""

Expand Down Expand Up @@ -322,7 +358,7 @@ def num_shards(self):

def tf_example_to_step_ds( # pytype: disable=signature-mismatch # overriding-parameter-count-checks
self, tf_example: tf.Tensor, feature_description
) -> Dict[str, Any]:
) -> dict[str, Any]:
data = tf.io.parse_single_example(tf_example, feature_description)
data = tree_deflatten_with_delimiter(data)

Expand Down Expand Up @@ -368,26 +404,8 @@ def _generate_examples(self, paths):

feature_description = tf_example_to_feature_description(example_item)

def _generate_examples_one_file(
path,
) -> Generator[Tuple[str, Dict[str, Any]], None, None]:
"""Yields examples from one file."""
counter = 0
key_prefix = os.path.basename(path)
# Dataset of tf.Examples containing full episodes.
example_ds = tf.data.TFRecordDataset(filenames=str(path))
# Dataset of episodes, each represented as a dataset of steps.
episode_ds = example_ds.map(
functools.partial(
self.tf_example_to_step_ds,
feature_description=feature_description,
),
num_parallel_calls=tf.data.experimental.AUTOTUNE,
)
episode_ds = tfds.as_numpy(episode_ds)
for e in episode_ds:
episode_id = counter
yield f'{key_prefix}/{episode_id}', e
counter += 1

return beam.Create(file_paths) | beam.FlatMap(_generate_examples_one_file)
return beam.Create(file_paths) | beam.FlatMap(
_generate_examples_one_file_fn,
feature_description=feature_description,
tf_example_to_step_ds_fn=self.tf_example_to_step_ds,
)
61 changes: 42 additions & 19 deletions tensorflow_datasets/robotics/dataset_importer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from __future__ import annotations

import abc
import functools
import os
from typing import Any
from typing import Any, Callable, Iterator

from tensorflow_datasets.core import beam_utils
from tensorflow_datasets.core import dataset_builder
Expand All @@ -32,6 +33,41 @@



def _dataset_importer_converter_fn(
example: dict[str, Any],
decode_fn: Callable[[Any], Any],
keys_to_strip: list[str],
) -> Iterator[tuple[str, dict[str, Any]]]:
"""Beam converter function for DatasetImporterBuilder.

Decodes steps of an RLDS episode, converts all features to numpy, and removes
unnecessary keys.

Args:
example: A dictionary containing the encoded RLDS episode, including
potentially encoded 'steps', and 'tfds_id'.
decode_fn: A function to decode a single step from `example['steps']`.
keys_to_strip: A list of keys to remove from the output episode.

Yields:
A tuple of (example_id, episode), where episode is a dictionary of
numpy arrays with `tfds_id` and keys in `keys_to_strip` removed.
"""
# Decode the RLDS Episode and transform it to numpy.
example_out = dict(example)
steps_ds = tf.data.Dataset.from_tensor_slices(
example_out['steps']
).map(decode_fn)
steps = list(iter(steps_ds.take(-1)))
example_out['steps'] = steps
example_out = dataset_utils.as_numpy(example_out)
example_id = example_out['tfds_id'].decode('utf-8')
del example_out['tfds_id']
for key in keys_to_strip:
example_out.pop(key, None)
yield example_id, example_out


class DatasetImporterBuilder(
tfds.core.GeneratorBasedBuilder, skip_registration=True
):
Expand Down Expand Up @@ -118,24 +154,11 @@ def _generate_examples(

decode_fn = builder.info.features['steps'].feature.decode_example

def converter_fn(example):
# Decode the RLDS Episode and transform it to numpy.
example_out = dict(example)
example_out['steps'] = tf.data.Dataset.from_tensor_slices(
example_out['steps']
).map(decode_fn)
steps = list(iter(example_out['steps'].take(-1)))
example_out['steps'] = steps

example_out = dataset_utils.as_numpy(example_out)

example_id = example_out['tfds_id'].decode('utf-8')
del example_out['tfds_id']
for key in self.KEYS_TO_STRIP:
if key in example_out:
del example_out[key]

yield example_id, example_out
converter_fn = functools.partial(
_dataset_importer_converter_fn,
decode_fn=decode_fn,
keys_to_strip=self.KEYS_TO_STRIP,
)

return f'read_tfds_dataset@{split}' >> beam_utils.ReadFromTFDS(
builder=builder,
Expand Down
53 changes: 32 additions & 21 deletions tensorflow_datasets/structured/covid19/covid19.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
response, weather, and more.
"""

import functools
import numpy as np
from tensorflow_datasets.core.utils.lazy_imports_utils import tensorflow as tf
import tensorflow_datasets.public_api as tfds
Expand Down Expand Up @@ -48,6 +49,29 @@
_BATCH_SIZE = 10000


def _cast_according_to_column(feature_type, v):
if feature_type == tf.string and isinstance(v, (float, int)):
return str(v)
return v


def _load_shard(index: int, dl_manager, archive_path, columns, features):
"""Load a shard of the dataset."""
pd = tfds.core.lazy_imports.pandas
# There is only one file so by using the for we guarantee that the file
# will be closed.
for _, file in dl_manager.iter_archive(archive_path):
df = pd.read_csv(file, skiprows=index, nrows=_BATCH_SIZE)
result = []
for i, row in df.iterrows():
example = {
k: _cast_according_to_column(features[k].dtype, v)
for k, v in zip(columns, row.values)
}
result.append((index + i, example))
return result


class Covid19(tfds.core.GeneratorBasedBuilder):
"""DatasetBuilder for covid19 dataset."""

Expand Down Expand Up @@ -787,31 +811,18 @@ def _generate_examples(
pd = tfds.core.lazy_imports.pandas
beam = tfds.core.lazy_imports.apache_beam

def cast_according_to_column(feature_type, v):
if feature_type == tf.string and isinstance(v, (float, int)):
return str(v)
return v

file_handles = dl_manager.iter_archive(archive_path)
_, file = next(file_handles)

columns = pd.read_csv(file, nrows=1).columns

def load_shard(index: int):
# There is only one file so by using the for we guarantee that the file
# will be closed.
for _, file in dl_manager.iter_archive(archive_path):
df = pd.read_csv(file, skiprows=index, nrows=_BATCH_SIZE)
features = self.info.features
result = []
for i, row in df.iterrows():
example = {
k: cast_according_to_column(features[k].dtype, v)
for k, v in zip(columns, row.values)
}
result.append((index + i, example))
return result
features = self.info.features

return beam.Create(list(range(0, _N_RECORDS, _BATCH_SIZE))) | beam.FlatMap(
load_shard
functools.partial(
_load_shard,
dl_manager=dl_manager,
archive_path=archive_path,
columns=columns,
features=features,
)
)
34 changes: 16 additions & 18 deletions tensorflow_datasets/structured/web_graph/web_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import dataclasses
import os
import textwrap
from typing import List

import numpy as np
from tensorflow_datasets.core.utils.lazy_imports_utils import tensorflow as tf
Expand Down Expand Up @@ -82,6 +81,22 @@
"""


def _get_int_feature(example: tf.train.Example, feature_name: str) -> list[int]:
return example.features.feature[feature_name].int64_list.value


def _process_example(example: bytes, is_test=False):
"""Process a single example."""
example = tf.train.Example.FromString(example)
row_tag = _get_int_feature(example, 'row_tag')[0]
col_tag = np.array(_get_int_feature(example, 'col_tag'), dtype=np.int64)
gt_tag = np.array(
_get_int_feature(example, 'gt_tag') if is_test else [], dtype=np.int64
)
return_dict = {'row_tag': row_tag, 'col_tag': col_tag, 'gt_tag': gt_tag}
return row_tag, return_dict


@dataclasses.dataclass
class WebGraphConfig(tfds.core.BuilderConfig):
"""Palmer Penguins dataset builder config."""
Expand Down Expand Up @@ -225,23 +240,6 @@ def _generate_examples(self, pipeline, files, split: str):
"""Yields examples."""
beam = tfds.core.lazy_imports.apache_beam

def _get_int_feature(
example: tf.train.Example, feature_name: str
) -> List[int]:
return example.features.feature[feature_name].int64_list.value

def _process_example(example: bytes, is_test=False):
example = tf.train.Example.FromString(example)
row_tag = _get_int_feature(example, 'row_tag')[0]
col_tag = np.array(_get_int_feature(example, 'col_tag'), dtype=np.int64)
if is_test:
gt_tag = _get_int_feature(example, 'gt_tag')
else:
gt_tag = []
gt_tag = np.array(gt_tag, dtype=np.int64)
return_dict = {'row_tag': row_tag, 'col_tag': col_tag, 'gt_tag': gt_tag}
return row_tag, return_dict

return (
pipeline
| f'{split}_create' >> beam.Create(files)
Expand Down
Loading
Loading