Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"""
import os

from pyspark.util import local_connect_and_auth
from pyspark.worker_util import get_sock_file_to_executor
from pyspark.serializers import (
write_int,
read_long,
Expand Down Expand Up @@ -90,14 +90,5 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def]


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, sock) = local_connect_and_auth(conn_info, auth_secret)
# There could be a long time between each micro batch.
sock.settimeout(None)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor(timeout=None) as sock_file:
main(sock_file, sock_file)
15 changes: 3 additions & 12 deletions python/pyspark/sql/connect/streaming/worker/listener_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import os
import json

from pyspark.util import local_connect_and_auth
from pyspark.worker_util import get_sock_file_to_executor
from pyspark.serializers import (
read_int,
write_int,
Expand Down Expand Up @@ -104,14 +104,5 @@ def process(listener_event_str, listener_event_type): # type: ignore[no-untyped


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, sock) = local_connect_and_auth(conn_info, auth_secret)
# There could be a long time between each listener event.
sock.settimeout(None)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor(timeout=None) as sock_file:
main(sock_file, sock_file)
16 changes: 4 additions & 12 deletions python/pyspark/sql/streaming/python_streaming_source_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@
StructType,
)
from pyspark.sql.worker.plan_data_source_read import records_to_arrow_batches
from pyspark.util import handle_worker_exception, local_connect_and_auth
from pyspark.util import handle_worker_exception
from pyspark.worker_util import (
get_sock_file_to_executor,
check_python_version,
read_command,
pickleSer,
Expand Down Expand Up @@ -310,14 +311,5 @@ def main(infile: IO, outfile: IO) -> None:


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, sock) = local_connect_and_auth(conn_info, auth_secret)
# Prevent the socket from timeout error when query trigger interval is large.
sock.settimeout(None)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor(timeout=None) as sock_file:
main(sock_file, sock_file)
14 changes: 3 additions & 11 deletions python/pyspark/sql/streaming/transform_with_state_driver_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
# limitations under the License.
#

import os
import json
from typing import Any, Iterator, TYPE_CHECKING

from pyspark.util import local_connect_and_auth
from pyspark.worker_util import get_sock_file_to_executor
from pyspark.serializers import (
write_int,
read_int,
Expand Down Expand Up @@ -95,12 +94,5 @@ def process(


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, sock) = local_connect_and_auth(conn_info, auth_secret)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor() as sock_file:
main(sock_file, sock_file)
15 changes: 3 additions & 12 deletions python/pyspark/sql/worker/analyze_udtf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

import inspect
import os
from textwrap import dedent
from typing import Dict, List, IO, Tuple

Expand All @@ -32,8 +31,8 @@
from pyspark.sql.types import _parse_datatype_json_string, StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
from pyspark.sql.worker.utils import worker_run
from pyspark.util import local_connect_and_auth
from pyspark.worker_util import (
get_sock_file_to_executor,
read_command,
pickleSer,
utf8_deserializer,
Expand Down Expand Up @@ -238,13 +237,5 @@ def main(infile: IO, outfile: IO) -> None:


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, _) = local_connect_and_auth(conn_info, auth_secret)
# TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8.
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor() as sock_file:
main(sock_file, sock_file)
15 changes: 3 additions & 12 deletions python/pyspark/sql/worker/commit_data_source_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
from typing import IO

from pyspark.errors import PySparkAssertionError
Expand All @@ -26,8 +25,7 @@
)
from pyspark.sql.datasource import DataSourceWriter, WriterCommitMessage
from pyspark.sql.worker.utils import worker_run
from pyspark.util import local_connect_and_auth
from pyspark.worker_util import pickleSer
from pyspark.worker_util import get_sock_file_to_executor, pickleSer


def _main(infile: IO, outfile: IO) -> None:
Expand Down Expand Up @@ -78,12 +76,5 @@ def main(infile: IO, outfile: IO) -> None:


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, _) = local_connect_and_auth(conn_info, auth_secret)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor() as sock_file:
main(sock_file, sock_file)
14 changes: 3 additions & 11 deletions python/pyspark/sql/worker/create_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#
import inspect
import os
from typing import IO

from pyspark.errors import PySparkAssertionError, PySparkTypeError
Expand All @@ -29,8 +28,8 @@
from pyspark.sql.datasource import DataSource, CaseInsensitiveDict
from pyspark.sql.types import _parse_datatype_json_string, StructType
from pyspark.sql.worker.utils import worker_run
from pyspark.util import local_connect_and_auth
from pyspark.worker_util import (
get_sock_file_to_executor,
read_command,
pickleSer,
utf8_deserializer,
Expand Down Expand Up @@ -146,12 +145,5 @@ def main(infile: IO, outfile: IO) -> None:


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, _) = local_connect_and_auth(conn_info, auth_secret)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor() as sock_file:
main(sock_file, sock_file)
12 changes: 3 additions & 9 deletions python/pyspark/sql/worker/data_source_pushdown_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import base64
import json
import os
import typing
from dataclasses import dataclass, field
from typing import IO, Type, Union
Expand Down Expand Up @@ -47,8 +46,8 @@
from pyspark.sql.types import StructType, VariantVal, _parse_datatype_json_string
from pyspark.sql.worker.plan_data_source_read import write_read_func_and_partitions
from pyspark.sql.worker.utils import worker_run
from pyspark.util import local_connect_and_auth
from pyspark.worker_util import (
get_sock_file_to_executor,
pickleSer,
read_command,
)
Expand Down Expand Up @@ -226,10 +225,5 @@ def main(infile: IO, outfile: IO) -> None:


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, _) = local_connect_and_auth(conn_info, auth_secret)
main(sock_file, sock_file)
with get_sock_file_to_executor() as sock_file:
main(sock_file, sock_file)
15 changes: 3 additions & 12 deletions python/pyspark/sql/worker/lookup_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#
from importlib import import_module
from pkgutil import iter_modules
import os
from typing import IO

from pyspark.serializers import (
Expand All @@ -25,8 +24,7 @@
)
from pyspark.sql.datasource import DataSource
from pyspark.sql.worker.utils import worker_run
from pyspark.util import local_connect_and_auth
from pyspark.worker_util import pickleSer
from pyspark.worker_util import get_sock_file_to_executor, pickleSer


def _main(infile: IO, outfile: IO) -> None:
Expand Down Expand Up @@ -60,12 +58,5 @@ def main(infile: IO, outfile: IO) -> None:


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, _) = local_connect_and_auth(conn_info, auth_secret)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor() as sock_file:
main(sock_file, sock_file)
14 changes: 3 additions & 11 deletions python/pyspark/sql/worker/plan_data_source_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#

import os
import functools
import pyarrow as pa
from itertools import islice, chain
Expand Down Expand Up @@ -44,8 +43,8 @@
StructType,
)
from pyspark.sql.worker.utils import worker_run
from pyspark.util import local_connect_and_auth
from pyspark.worker_util import (
get_sock_file_to_executor,
read_command,
pickleSer,
utf8_deserializer,
Expand Down Expand Up @@ -376,12 +375,5 @@ def main(infile: IO, outfile: IO) -> None:


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, _) = local_connect_and_auth(conn_info, auth_secret)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor() as sock_file:
main(sock_file, sock_file)
14 changes: 3 additions & 11 deletions python/pyspark/sql/worker/python_streaming_sink_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#

import os
from typing import IO

from pyspark.errors import PySparkAssertionError
Expand All @@ -32,8 +31,8 @@
StructType,
)
from pyspark.sql.worker.utils import worker_run
from pyspark.util import local_connect_and_auth
from pyspark.worker_util import (
get_sock_file_to_executor,
read_command,
pickleSer,
utf8_deserializer,
Expand Down Expand Up @@ -113,12 +112,5 @@ def main(infile: IO, outfile: IO) -> None:


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, _) = local_connect_and_auth(conn_info, auth_secret)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor() as sock_file:
main(sock_file, sock_file)
17 changes: 3 additions & 14 deletions python/pyspark/sql/worker/write_into_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#
import inspect
import os
from typing import IO, Iterable, Iterator

from pyspark.sql.conversion import ArrowTableToRowsConversion
Expand All @@ -24,7 +23,6 @@
from pyspark.serializers import (
read_bool,
read_int,
write_int,
)
from pyspark.sql import Row
from pyspark.sql.datasource import (
Expand All @@ -43,10 +41,8 @@
_create_row,
)
from pyspark.sql.worker.utils import worker_run
from pyspark.util import (
local_connect_and_auth,
)
from pyspark.worker_util import (
get_sock_file_to_executor,
read_command,
pickleSer,
utf8_deserializer,
Expand Down Expand Up @@ -241,12 +237,5 @@ def main(infile: IO, outfile: IO) -> None:


if __name__ == "__main__":
# Read information about how to connect back to the JVM from the environment.
conn_info = os.environ.get(
"PYTHON_WORKER_FACTORY_SOCK_PATH", int(os.environ.get("PYTHON_WORKER_FACTORY_PORT", -1))
)
auth_secret = os.environ.get("PYTHON_WORKER_FACTORY_SECRET")
(sock_file, _) = local_connect_and_auth(conn_info, auth_secret)
write_int(os.getpid(), sock_file)
sock_file.flush()
main(sock_file, sock_file)
with get_sock_file_to_executor() as sock_file:
main(sock_file, sock_file)
Loading