Skip to content

Conversation

@asnare
Copy link
Contributor

@asnare asnare commented Dec 3, 2025

This PR factors out some common code (from databrickslabs/lakebridge#2163) needed for processing a line-based stream in realtime, such as when working with the output of subprocesses. This is currently needed by the Lakebridge CLI as well as the BB LSP Server.

Although moderately complex, this is needed because the builtin Python support for line-based processing1 lacks in several ways:

  • When a line exceeds the limit it throws an exception. Exceptions in asynchronous contexts are very hard to deal with, making it error-prone. If unhandled (eg. in a task) everything stalls.
  • When an error occurs there's no reasonable recovery path: the internal and stream state are basically unknown.

In contrast, this processing:

  • Handles lines of arbitrary length, splitting up long lines if necessary.
  • Uses bounded memory irrespective of how long the lines are; unnecessarily large buffers are not needed.

Expected usage is something like:

async def run_something() -> None:
    process = await asyncio.create_subprocess_exec(
        ...,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    assert process.stdout is not None, "stdout should be a pipe"
    assert process.stderr is not None, "stderr should be a pipe"

    async def log_stream(name: str, stream: asyncio.StreamReader, log_level: int, limit: int = 4096) -> None:
        try:
            async for line in readlines(stream=stream, limit=limit):
                logger.log(log_level, str(line))
        except Exception as e:
            logger.error(f"Error reading {name}: {e}", exc_info=True)
            # Drain to prevent blocking.
            try:
                while await stream.read(limit): pass
            except Exception:
                pass  # Exception while draining, situation seems unrecoverable.

    await asyncio.gather(
        log_stream("stdout", process.stdout, logging.DEBUG),
        log_stream("stderr", process.stderr, logging.INFO),
        process.wait(),
    )

    return process.returncode

Footnotes

  1. asyncio.StreamReader.readline()

@asnare asnare self-assigned this Dec 3, 2025
@asnare asnare requested a review from nfx as a code owner December 3, 2025 13:46
@asnare asnare added the enhancement New feature or request label Dec 3, 2025
@github-actions
Copy link

github-actions bot commented Dec 3, 2025

✅ 40/40 passed, 2 skipped, 1m44s total

Running from acceptance #376

@codecov
Copy link

codecov bot commented Dec 3, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 80.48%. Comparing base (7ebe9a3) to head (e79b8dd).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #320      +/-   ##
==========================================
+ Coverage   80.05%   80.48%   +0.43%     
==========================================
  Files          17       17              
  Lines        2396     2449      +53     
  Branches      516      526      +10     
==========================================
+ Hits         1918     1971      +53     
  Misses        354      354              
  Partials      124      124              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link

@m-abulazm m-abulazm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. I just have a couple of questions to make sure I understand everything.

@asnare asnare requested a review from m-abulazm December 5, 2025 13:56
github-merge-queue bot pushed a commit to databrickslabs/lakebridge that referenced this pull request Dec 5, 2025
## Changes

LSP servers use stdin/stdout for the LSP calls, and stderr as a general
logging facility. Currently we mirror the stderr output as logs, but due
to the implementation details this fails when an individual line
received from stderr is longer than 64KiB in size.

Although #2160 contains a hot fix for this problem, this PR improves the
handling in a more robust manner:

- Long lines from the LSP server are now detected and broken up into
chunks, as several log entries. (This is preferable to a large limit
because it means we have an upper bound on memory usage rather than
potentially buffering forever and running out of memory.)
- The cause of the error when a long line hits has been addressed,
however if we hit some other error during stderr processing we now log
the (critical!) error so that it's at least clear something has gone
wrong. (The nature of this is that the CLI might hang, but it will be
noisy about it.)

### Relevant implementation details

The primary change here is to handle line-breaking and decoding
ourselves rather than relying on the `StreamReader.readline()`
implementation. (The behaviour of the latter cannot be reliably
controlled when a line exceeds the configured `limit`, which defaults to
64K.)

### Caveats/things to watch out for when reviewing:

Further changes are needed to the test LSP server (`lsp_server.py`) that
we use for testing, they are out of scope for this PR.

### Linked issues

Supersedes #2160, closes #2164.

This PR is the basis for databrickslabs/blueprint#320: once that's
available the changes in here can be simplified/refactored.

### Functionality

- modified existing command: `databricks labs lakebridge transpile`

### Tests

- manually tested
- added unit tests
@asnare asnare requested a review from m-abulazm December 8, 2025 11:59
@asnare asnare merged commit 917730c into main Dec 8, 2025
14 checks passed
@asnare asnare deleted the log-line-chunking branch December 8, 2025 14:42
gueniai added a commit that referenced this pull request Dec 30, 2025
- **TRACE-level logging support** [#318](#318)
    Added support for `TRACE`-level logs by mapping them to `DEBUG`, since Python lacks native `TRACE` support. Introduced a new method to translate Databricks CLI log levels to Python levels, defaulting to `INFO` when unknown. Improved error logging for better debugging and reporting.

- **Python 3.14 compatibility** [#315](#315)
    The project now supports Python 3.10–3.14. CI tests have been updated accordingly, and project metadata now reflects Python 3.14 and beta development status. Minimum requirements remain: Python 3.10+ and `databricks-sdk` 0.16.0+.

- **Line-based subprocess streaming** [#320](#320)
    Added real-time, line-based reading of subprocess output via a new `readlines` function. It decodes UTF-8 safely, manages memory efficiently, and handles very long lines without blocking. Also added `pytest-asyncio` to improve async code testing.

- **Hatch upgrade** [#311](#311)
    Upgraded Hatch from 1.9.4 → 1.14.2 to fix compatibility issues (notably with Click 8.3.0) and improve build performance. Removed the old Click version constraint so newer versions can be installed.
@gueniai gueniai mentioned this pull request Dec 30, 2025
gueniai added a commit that referenced this pull request Dec 30, 2025
# Release Notes 

- **TRACE-level logging support**
[#318](#318)
Added support for `TRACE`-level logs by mapping them to `DEBUG`, since
Python lacks native `TRACE` support. Introduced a new method to
translate Databricks CLI log levels to Python levels, defaulting to
`INFO` when unknown. Improved error logging for better debugging and
reporting.
    
- **Python 3.14 compatibility**
[#315](#315)
The project now supports Python 3.10–3.14. CI tests have been updated
accordingly, and project metadata now reflects Python 3.14 and beta
development status. Minimum requirements remain: Python 3.10+ and
`databricks-sdk` 0.16.0+.
    
- **Line-based subprocess streaming**
[#320](#320)
Added real-time, line-based reading of subprocess output via a new
`readlines` function. It decodes UTF-8 safely, manages memory
efficiently, and handles very long lines without blocking. Also added
`pytest-asyncio` to improve async code testing.
    
- **Hatch upgrade**
[#311](#311)
Upgraded Hatch from 1.9.4 → 1.14.2 to fix compatibility issues (notably
with Click 8.3.0) and improve build performance. Removed the old Click
version constraint so newer versions can be installed.
gueniai pushed a commit to databrickslabs/lakebridge that referenced this pull request Dec 30, 2025
## Changes

This code used to process stderr from the LSP server has been extracted
out to `blueprint`: this PR updates the project to use the code from
blueprint.

### Linked Issues

 - Processing originally implemented: #2163
 - Extracted to blueprint in: databrickslabs/blueprint#320

### Functionality

- modified existing command: `databricks labs lakebridge transpile`

### Tests

- manually tested
- existing unit tests
- existing integration tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants