diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json index ab05f7bdc634..b08e2bb81150 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json @@ -1,4 +1,5 @@ { "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support", - "https://github.com/apache/beam/pull/34830": "testing" + "https://github.com/apache/beam/pull/34830": "testing", + "trigger-2026-04-04": "portable_runner expand_sdf opt-in" } diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json index cccf60ad1fc6..bd7b284625bb 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Spark.json @@ -1,4 +1,5 @@ { "https://github.com/apache/beam/pull/34830": "testing", - "https://github.com/apache/beam/issues/35429": "testing" + "https://github.com/apache/beam/issues/35429": "testing", + "trigger-2026-04-04": "portable_runner expand_sdf opt-in" } diff --git a/.github/trigger_files/beam_PostCommit_XVR_Flink.json b/.github/trigger_files/beam_PostCommit_XVR_Flink.json index 2d8ad3760b4b..a926b3314ed6 100644 --- a/.github/trigger_files/beam_PostCommit_XVR_Flink.json +++ b/.github/trigger_files/beam_PostCommit_XVR_Flink.json @@ -1,3 +1,4 @@ { - "modification": 2 + "modification": 2, + "trigger-2026-04-04": "portable_runner expand_sdf opt-in" } diff --git a/.github/trigger_files/beam_PostCommit_XVR_Spark3.json b/.github/trigger_files/beam_PostCommit_XVR_Spark3.json index 0967ef424bce..03bb52a7ef46 100644 --- a/.github/trigger_files/beam_PostCommit_XVR_Spark3.json +++ b/.github/trigger_files/beam_PostCommit_XVR_Spark3.json @@ -1 +1,3 @@ -{} +{ + "trigger-2026-04-04": "portable_runner expand_sdf opt-in" +} diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 94a467d5a249..e0f9a9dec34c 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -332,7 +332,7 @@ def _optimize_pipeline( phases = [] for phase_name in pre_optimize.split(','): # For now, these are all we allow. - if phase_name in ('pack_combiners', 'lift_combiners'): + if phase_name in ('pack_combiners', 'lift_combiners', 'expand_sdf'): phases.append(getattr(translations, phase_name)) else: raise ValueError( diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index 31293a4d43ec..2fd63b822e96 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -460,6 +460,104 @@ def create_options(self): return options +class PortableRunnerOptimizationTest(unittest.TestCase): + """Tests for PortableRunner._optimize_pipeline.""" + @staticmethod + def _transform_urns(proto, options): + optimized = PortableRunner._optimize_pipeline(proto, options) + return { + t.spec.urn + for t in optimized.components.transforms.values() if t.spec.urn + } + + def test_custom_optimize_expand_sdf(self): + """Verify that expand_sdf can be requested explicitly. + + See https://github.com/apache/beam/issues/24422. + """ + from apache_beam.io import restriction_trackers + from apache_beam.portability import common_urns + + class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): + def initial_restriction(self, element): + return restriction_trackers.OffsetRange(0, len(element)) + + def create_tracker(self, restriction): + return restriction_trackers.OffsetRestrictionTracker(restriction) + + def restriction_size(self, element, restriction): + return restriction.size() + + class ExpandingStringsDoFn(beam.DoFn): + def process( + self, + element, + restriction_tracker=beam.DoFn.RestrictionParam( + ExpandStringsProvider())): + cur = restriction_tracker.current_restriction().start + while restriction_tracker.try_claim(cur): + yield element[cur] + cur += 1 + + p = beam.Pipeline() + _ = (p | beam.Create(['abc']) | beam.ParDo(ExpandingStringsDoFn())) + proto = p.to_runner_api() + + transform_urns = self._transform_urns( + proto, PipelineOptions(['--experiments=pre_optimize=expand_sdf'])) + + self.assertIn( + common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns) + self.assertIn( + common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, + transform_urns) + self.assertIn( + common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + transform_urns) + + def test_custom_optimize_expands_bounded_read(self): + """Verify that iobase.Read(BoundedSource) expands with explicit expand_sdf. + + This is the end-to-end scenario from + https://github.com/apache/beam/issues/24422: Read transforms like + ReadFromParquet use SDFs internally. With explicit expand_sdf, these are + expanded before reaching the Spark job server as a single ParDo, + executing on one partition with no parallelization. + """ + from apache_beam.io import iobase + from apache_beam.portability import common_urns + + class _FakeBoundedSource(iobase.BoundedSource): + def get_range_tracker(self, start_position, stop_position): + return None + + def read(self, range_tracker): + return iter([]) + + def estimate_size(self): + return 0 + + p = beam.Pipeline() + _ = p | beam.io.Read(_FakeBoundedSource()) + proto = p.to_runner_api() + + transform_urns = self._transform_urns( + proto, PipelineOptions(['--experiments=pre_optimize=expand_sdf'])) + + # The SDFBoundedSourceReader DoFn should have been expanded into + # SDF component stages. + self.assertIn( + common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns) + self.assertIn( + common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, + transform_urns) + self.assertIn( + common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + transform_urns) + # Reshuffle should be present to enable parallelization. + self.assertIn(common_urns.composites.RESHUFFLE.urn, transform_urns) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()