Skip to content

feat: migrate pipeline to nnx#2885

Open
mesakhcienet wants to merge 18 commits intoAI-Hypercomputer:mainfrom
CIeNET-International:test/pipeline-scan-nnx
Open

feat: migrate pipeline to nnx#2885
mesakhcienet wants to merge 18 commits intoAI-Hypercomputer:mainfrom
CIeNET-International:test/pipeline-scan-nnx

Conversation

@mesakhcienet
Copy link
Copy Markdown
Contributor

@mesakhcienet mesakhcienet commented Dec 24, 2025

Description

implement nnx-based pipeline.

This PR extends PR#2831

Main changes:

  1. nnx_decoders.py: implementing the missing pipeline logic in nnx_decoders.py.
  2. pipeline.py : add a new class NNXPipeline, which is a nnx-based pipeline class.

Tests

we run the pipeline process with command below:

MODEL_NAME=llama2-7b
python -m MaxText.train src/maxtext/configs/base.yml \
    run_name=pipeline_test_${MODEL_NAME}_nnx \
    base_output_directory=/dev/shm/pipeline_test_nnx \
    model_name=${MODEL_NAME}\
    dataset_type=synthetic \
    steps=15 \
    debug_sharding=true \
    per_device_batch_size=2 \
    max_target_length=32 \
    ici_pipeline_parallelism=2 \
    num_pipeline_microbatches=4 \
    num_layers_per_pipeline_stage=2 \
    enable_checkpointing=false \
    enable_nnx=true \
    pure_nnx_decoder=true \
    scan_layers_per_stage=false \
    async_checkpointing=false > nnx-porting-log/pipeline/custom_${MODEL_NAME}.log 2>&1

Checklist

Before submitting this PR, please make sure (put X in square brackets):

  • I have performed a self-review of my code. For an optional AI review, add the gemini-review label.
  • I have necessary comments in my code, particularly in hard-to-understand areas.
  • I have run end-to-end tests tests and provided workload links above if applicable.
  • I have made or will make corresponding changes to the doc if needed, including adding new documentation pages to the relevant Table of Contents (toctree directive) as explained in our documentation.

@mesakhcienet mesakhcienet changed the title core: migrate pipeline to nnx feat: migrate pipeline to nnx Dec 24, 2025
@mesakhcienet mesakhcienet force-pushed the test/pipeline-scan-nnx branch 8 times, most recently from 6875da8 to f34b1a3 Compare January 15, 2026 23:43
@codecov
Copy link
Copy Markdown

codecov bot commented Jan 19, 2026

Codecov Report

❌ Patch coverage is 39.00000% with 366 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/maxtext/layers/nnx_decoders.py 11.98% 237 Missing and 20 partials ⚠️
src/maxtext/layers/pipeline.py 64.43% 84 Missing and 17 partials ⚠️
src/maxtext/layers/decoders.py 66.66% 5 Missing and 3 partials ⚠️

📢 Thoughts on this report? Let us know!

@mesakhcienet mesakhcienet force-pushed the test/pipeline-scan-nnx branch 4 times, most recently from 12a3907 to 2c16599 Compare January 28, 2026 08:04
@mesakhcienet mesakhcienet force-pushed the test/pipeline-scan-nnx branch 2 times, most recently from 64dc147 to 9e4518e Compare February 2, 2026 01:58
@mesakhcienet mesakhcienet force-pushed the test/pipeline-scan-nnx branch from 631a73e to ac97a1d Compare March 2, 2026 08:48
@mesakhcienet mesakhcienet changed the base branch from main to xibin/nnx_all March 2, 2026 08:48
@ecnal-cienet ecnal-cienet force-pushed the xibin/nnx_all branch 12 times, most recently from 1849f0b to 669dc01 Compare March 3, 2026 19:59
Copy link
Copy Markdown
Collaborator

@bvandermoon bvandermoon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gobbleturk what testing do you recommend for migrating pipeline parallelism to NNX? I'll send over an internal doc @hsuan-lun-chiang, @mesakhcienet, and others put together that shows the tests they have already run

@bvandermoon
Copy link
Copy Markdown
Collaborator

@gobbleturk what testing do you recommend for migrating pipeline parallelism to NNX? I'll send over an internal doc @hsuan-lun-chiang, @mesakhcienet, and others put together that shows the tests they have already run

@NuojCheng any thoughts here?

Copy link
Copy Markdown
Collaborator

@NuojCheng NuojCheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some additional train compile test for pipeline NNX migration:

@NuojCheng
Copy link
Copy Markdown
Collaborator

There are also some linen usage in pipeline_utils.py, e.g.

I don't see them get updated in this PR but I think they probably should be updated?

Another thing is the usage of function in

# TODO(chengnuojin) Remove this function and its usage after pipeline nnx migration
def remove_logically_partition(weights):
"""Removes LogicallyPartitioned wrapper from weights."""
def _remove_logically_partition_leaf(v):
return getattr(v, "value") if isinstance(v, LogicallyPartitioned) else v
return jax.tree.map(
_remove_logically_partition_leaf,
weights,
is_leaf=lambda v: isinstance(v, LogicallyPartitioned),
)
. I suspect NNX migration can help us get rid of using this function since it is mostly dealing with linen wrapper troubles. Take a look on this if you can. Thank you for the hard work!

@mesakhcienet
Copy link
Copy Markdown
Contributor Author

There are also some linen usage in pipeline_utils.py, e.g.

I don't see them get updated in this PR but I think they probably should be updated?

Another thing is the usage of function in

# TODO(chengnuojin) Remove this function and its usage after pipeline nnx migration
def remove_logically_partition(weights):
"""Removes LogicallyPartitioned wrapper from weights."""
def _remove_logically_partition_leaf(v):
return getattr(v, "value") if isinstance(v, LogicallyPartitioned) else v
return jax.tree.map(
_remove_logically_partition_leaf,
weights,
is_leaf=lambda v: isinstance(v, LogicallyPartitioned),
)

. I suspect NNX migration can help us get rid of using this function since it is mostly dealing with linen wrapper troubles. Take a look on this if you can. Thank you for the hard work!

As far as I know, the current objective is to migrate the Linen pipeline to NNX while preserving the current Linen version. Please advise if any additional progress is required at this time. Thanks!

@NuojCheng
Copy link
Copy Markdown
Collaborator

There are also some linen usage in pipeline_utils.py, e.g.

I don't see them get updated in this PR but I think they probably should be updated?
Another thing is the usage of function in

# TODO(chengnuojin) Remove this function and its usage after pipeline nnx migration
def remove_logically_partition(weights):
"""Removes LogicallyPartitioned wrapper from weights."""
def _remove_logically_partition_leaf(v):
return getattr(v, "value") if isinstance(v, LogicallyPartitioned) else v
return jax.tree.map(
_remove_logically_partition_leaf,
weights,
is_leaf=lambda v: isinstance(v, LogicallyPartitioned),
)

. I suspect NNX migration can help us get rid of using this function since it is mostly dealing with linen wrapper troubles. Take a look on this if you can. Thank you for the hard work!

As far as I know, the current objective is to migrate the Linen pipeline to NNX while preserving the current Linen version. Please advise if any additional progress is required at this time. Thanks!

Shouldn't we have a nnx version of functions in pipeline_utils.py as well?

@bvandermoon
Copy link
Copy Markdown
Collaborator

There are also some linen usage in pipeline_utils.py, e.g.

I don't see them get updated in this PR but I think they probably should be updated?
Another thing is the usage of function in

# TODO(chengnuojin) Remove this function and its usage after pipeline nnx migration
def remove_logically_partition(weights):
"""Removes LogicallyPartitioned wrapper from weights."""
def _remove_logically_partition_leaf(v):
return getattr(v, "value") if isinstance(v, LogicallyPartitioned) else v
return jax.tree.map(
_remove_logically_partition_leaf,
weights,
is_leaf=lambda v: isinstance(v, LogicallyPartitioned),
)

. I suspect NNX migration can help us get rid of using this function since it is mostly dealing with linen wrapper troubles. Take a look on this if you can. Thank you for the hard work!

As far as I know, the current objective is to migrate the Linen pipeline to NNX while preserving the current Linen version. Please advise if any additional progress is required at this time. Thanks!

Are we able to bridge the NNX version back to Linen at a higher layer? If so, then I think we could get rid of the old Linen code that is no longer used

new_carry = layer_out[0] if isinstance(layer_out, tuple) else layer_out
return new_carry, nnx.state(layer)

final_carry, scanned_state = jax.lax.scan(layer_fn, inputs, (params, state))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can alternatively use nnx.scan here which already does the moveaxis for you.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also avoids the use of split and merge

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for your review,
as far as I know, we should avoid the nnx.scan and nnx.remat as mentioned in the internal docs chat here

@mesakhcienet
Copy link
Copy Markdown
Contributor Author

mesakhcienet commented Mar 31, 2026

There are also some linen usage in pipeline_utils.py, e.g.

I don't see them get updated in this PR but I think they probably should be updated?
Another thing is the usage of function in

# TODO(chengnuojin) Remove this function and its usage after pipeline nnx migration
def remove_logically_partition(weights):
"""Removes LogicallyPartitioned wrapper from weights."""
def _remove_logically_partition_leaf(v):
return getattr(v, "value") if isinstance(v, LogicallyPartitioned) else v
return jax.tree.map(
_remove_logically_partition_leaf,
weights,
is_leaf=lambda v: isinstance(v, LogicallyPartitioned),
)

. I suspect NNX migration can help us get rid of using this function since it is mostly dealing with linen wrapper troubles. Take a look on this if you can. Thank you for the hard work!

As far as I know, the current objective is to migrate the Linen pipeline to NNX while preserving the current Linen version. Please advise if any additional progress is required at this time. Thanks!

Are we able to bridge the NNX version back to Linen at a higher layer? If so, then I think we could get rid of the old Linen code that is no longer used

@bvandermoon
Yes we are able to brigde the NNX version. Meanwhile, we are considering two solutions and would like your input on which direction to take:

Option 1: If we use nnx_wrappers.ToLinen to wrap NNX model back to Linen at a higher layer, we can completely remove the old Linen code, but we will need to conduct more tests.

Option 2: Delay the full migration until nnx_decoders.py is stable. In the meantime, we retain the current Linen pipeline and use the enable_nnx and pure_nnx_decoder flags to trigger the NNX version for testing. After the migration is stable, we can remove the flags and linen version of pipeline.

Please let me know which of these two solutions you prefer, thank you.

@NuojCheng
Apologize i missed the first questions you asked.

The NNX pipeline classes (NNXPipeline, NNXCircularPipeline) already handle these internally with JAX-native equivalents:

  • nn.rematjax.checkpoint (pipeline.py L1551, L1870)
  • nn.scanjax.lax.scan + nnx.split/nnx.merge (pipeline.py L1556, L1876)
  • remove_logically_partition()→ inline jax.tree.map unboxing (pipeline.py L1519-1522)

So no NNX versions of those functions are needed — the NNX path bypasses them entirely. maybe you have some suggestions or any part that i am wrong? Please let me know. Thank you.

scanned_params = jax.tree.map(lambda x: jnp.moveaxis(x, 0, scan_axis), scanned_params)
scanned_state = nnx.State.merge(scanned_params, scanned_other)

self.scanned_layers = nnx.merge(graphdef, scanned_state)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.scanned_layers = nnx.merge(graphdef, scanned_state)
nnx.update(self.scanned_layers, scanned_state)

@bvandermoon
Copy link
Copy Markdown
Collaborator

There are also some linen usage in pipeline_utils.py, e.g.

I don't see them get updated in this PR but I think they probably should be updated?
Another thing is the usage of function in

# TODO(chengnuojin) Remove this function and its usage after pipeline nnx migration
def remove_logically_partition(weights):
"""Removes LogicallyPartitioned wrapper from weights."""
def _remove_logically_partition_leaf(v):
return getattr(v, "value") if isinstance(v, LogicallyPartitioned) else v
return jax.tree.map(
_remove_logically_partition_leaf,
weights,
is_leaf=lambda v: isinstance(v, LogicallyPartitioned),
)

. I suspect NNX migration can help us get rid of using this function since it is mostly dealing with linen wrapper troubles. Take a look on this if you can. Thank you for the hard work!

As far as I know, the current objective is to migrate the Linen pipeline to NNX while preserving the current Linen version. Please advise if any additional progress is required at this time. Thanks!

Are we able to bridge the NNX version back to Linen at a higher layer? If so, then I think we could get rid of the old Linen code that is no longer used

@bvandermoon Yes we are able to brigde the NNX version. Meanwhile, we are considering two solutions and would like your input on which direction to take:

Option 1: If we use nnx_wrappers.ToLinen to wrap NNX model back to Linen at a higher layer, we can completely remove the old Linen code, but we will need to conduct more tests.

Option 2: Delay the full migration until nnx_decoders.py is stable. In the meantime, we retain the current Linen pipeline and use the enable_nnx and pure_nnx_decoder flags to trigger the NNX version for testing. After the migration is stable, we can remove the flags and linen version of pipeline.

Please let me know which of these two solutions you prefer, thank you.

@NuojCheng Apologize i missed the first questions you asked.

The NNX pipeline classes (NNXPipeline, NNXCircularPipeline) already handle these internally with JAX-native equivalents:

  • nn.rematjax.checkpoint (pipeline.py L1551, L1870)
  • nn.scanjax.lax.scan + nnx.split/nnx.merge (pipeline.py L1556, L1876)
  • remove_logically_partition()→ inline jax.tree.map unboxing (pipeline.py L1519-1522)

So no NNX versions of those functions are needed — the NNX path bypasses them entirely. maybe you have some suggestions or any part that i am wrong? Please let me know. Thank you.

Thank you @mesakhcienet. Let's go with option 1 please. That way we can continue running unit tests along the way, and we don't need to worry about the Linen/NNX versions diverging before the migration is fully done

if bsw_pps is not None:

@jax.shard_map(mesh=self.mesh, in_specs=((bsw_pps, bsw_pps), P("stage")), out_specs=bsw_pps, check_vma=True)
@jax.shard_map(mesh=self.mesh, in_specs=((bsw_pps, bsw_pps), P("stage")), out_specs=bsw_pps, check_vma=False)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please set check_vma=True in pipeline code

This reverts commit 6d6bea2.
refactor: replace Linen pipeline classes with to_linen_class wrappers

Remove PipelineBaseLinen, Pipeline, CircularPipeline (~740 lines).
Add Pipeline = to_linen_class(NNXPipeline) and
CircularPipeline = to_linen_class(NNXCircularPipeline).
Update create_pipeline to accept stage_factory instead of layers.

test: update pipeline test to use NNX stage factory

refactor: switch Linen decoder pipeline path to NNX stage factory

Replace get_pipeline_stage_module with _get_nnx_decoder_block_classes
and _build_nnx_pipeline_stage. Delete SequentialBlockDecoderLayers.
Pipeline setup now uses stage_factory callable (rngs -> NNX module).
…teration

run_one_iteration now:
1. Fetches params from BSW (params-only, matching shard_map specs)
2. Gathers metrics/mutables directly for current repeat
3. Merges into full state for forward pass
4. Scatter-updates only non-params back (params static in scan)

Fixes ValueError: pytree structure error in shard_map where out_specs
had None (leaf) at RNG paths but BSW had RngCount (pytree node).

fix(pipeline): pass params-only to weight_prefetching in scan_body

BSW prefetching only needs parameters. Non-param state (metrics, mutables)
is now passed separately to run_one_iteration for direct gathering.

fix(pipeline): clean up run_one_iteration consistency

- Filter to non-params in num_pipeline_repeats == 1 path for consistency
- Remove redundant get_microbatch_and_repeat_ids call
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants