Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions kafka/admin/_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Cluster metadata mixin for KafkaAdminClient."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.admin import DescribeLogDirsRequest

if TYPE_CHECKING:
from kafka.net.manager import KafkaConnectionManager

log = logging.getLogger(__name__)


class ClusterAdminMixin:
"""Mixin providing cluster management methods for KafkaAdminClient."""
_manager: KafkaConnectionManager

async def _get_cluster_metadata(self, topics):
"""topics = [] for no topics, None for all."""
request = MetadataRequest(
topics=[
MetadataRequest.MetadataRequestTopic(name=topic)
for topic in topics] if topics is not None else None,
allow_auto_topic_creation=False,
include_cluster_authorized_operations=True,
include_topic_authorized_operations=True,
)
response = await self._manager.send(request)
metadata = response.to_dict()
self._process_acl_operations(metadata)
for topic in metadata['topics']:
self._process_acl_operations(topic)
return metadata

def describe_cluster(self):
"""Fetch cluster-wide metadata such as the list of brokers, the controller ID,
and the cluster ID.

Returns:
A dict with cluster-wide metadata, excluding topic details.
"""
metadata = self._manager.run(self._get_cluster_metadata, [])
metadata.pop('topics')
return metadata

async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None):
request = DescribeLogDirsRequest(topics=topic_partitions)
responses = []
if brokers is None:
brokers = [broker.node_id for broker in self._manager.cluster.brokers()]
for node_id in brokers:
response = await self._manager.send(request, node_id=node_id)
responses.append({"broker": node_id, "log_dirs": [result.to_dict() for result in response.results]})
return responses

def describe_log_dirs(self, topic_partitions=None, brokers=None):
"""Fetch broker log directory and topic/partition stats

Keyword Arguments:
topic_partitions (dict, list, optional):
Either: dict of {topic_name: [partition ids]}.
Or: list of [topic_name], to query all partitions for topic.
Or: None, to query all topics / all partitions.
Default: None
brokers (list, optional): List of [node_id] for brokers to query.
If None, query is sent to all brokers. Default: None

Returns:
list of dicts, containing per-broker log-dir data
"""
topic_partitions = self._get_topic_partitions(topic_partitions)
return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers)
46 changes: 0 additions & 46 deletions kafka/admin/_metadata.py

This file was deleted.

29 changes: 1 addition & 28 deletions kafka/admin/_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import kafka.errors as Errors
from kafka.errors import UnknownTopicOrPartitionError
from kafka.protocol.admin import DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType
from kafka.protocol.admin import DeleteRecordsRequest, ElectLeadersRequest, ElectionType
from kafka.structs import TopicPartition

if TYPE_CHECKING:
Expand Down Expand Up @@ -166,30 +166,3 @@ def response_errors(r):
ignore_errors = (Errors.ElectionNotNeededError,)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors)

async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None):
request = DescribeLogDirsRequest(topics=topic_partitions)
responses = []
if brokers is None:
brokers = [broker.node_id for broker in self._manager.cluster.brokers()]
for node_id in brokers:
response = await self._manager.send(request, node_id=node_id)
responses.append({"broker": node_id, "log_dirs": [result.to_dict() for result in response.results]})
return responses

def describe_log_dirs(self, topic_partitions=None, brokers=None):
"""Send a DescribeLogDirsRequest request to a broker.

Keyword Arguments:
topic_partitions (dict, list, optional):
Either: dict of {topic_name: [partition ids]}.
Or: list of [topic_name], to query all partitions for topic.
Or: None, to query all topics / all partitions.
Default: None
brokers (list, optional): List of [node_id] for brokers to query.
If None, query is sent to all brokers. Default: None

Returns:
list of dicts, containing per-broker log-dir data
"""
topic_partitions = self._get_topic_partitions(topic_partitions)
return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers)
4 changes: 2 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from kafka.version import __version__

from kafka.admin._acls import ACLAdminMixin
from kafka.admin._cluster import ClusterAdminMixin
from kafka.admin._configs import ConfigAdminMixin
from kafka.admin._groups import GroupAdminMixin
from kafka.admin._metadata import MetadataAdminMixin
from kafka.admin._records import RecordAdminMixin
from kafka.admin._topics import TopicAdminMixin

Expand All @@ -25,7 +25,7 @@

class KafkaAdminClient(
TopicAdminMixin,
MetadataAdminMixin,
ClusterAdminMixin,
ACLAdminMixin,
ConfigAdminMixin,
GroupAdminMixin,
Expand Down
32 changes: 8 additions & 24 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from .cluster import ClusterSubCommand
from .configs import ConfigsSubCommand
from .consumer_groups import ConsumerGroupsSubCommand
from .log_dirs import LogDirsSubCommand
from .topics import TopicsSubCommand

def main_parser():
Expand Down Expand Up @@ -60,7 +59,7 @@ def build_kwargs(props):
def run_cli(args=None):
parser = main_parser()
subparsers = parser.add_subparsers(help='subcommands')
for cmd in [ClusterSubCommand, ConfigsSubCommand, LogDirsSubCommand,
for cmd in [ClusterSubCommand, ConfigsSubCommand,
TopicsSubCommand, ConsumerGroupsSubCommand]:
cmd.add_subparser(subparsers)
config = parser.parse_args(args)
Expand Down Expand Up @@ -114,30 +113,11 @@ def run_cli(args=None):
# alter
# IncrementalAlterConfigs (not supported yet)

# [partitions]
# create
# alter-reassignments (AlterPartitionReassignments - not supported yet)
# list-reassignments (ListPartitionReassignments - not supported yet)

# [records]
# delete

# [consumer-groups]
# remove-members (not supported yet)
# delete-offsets (not supported yet)
# alter-offsets (not supported yet)

# [offsets]
# list (not supported yet)
# delete (OffsetDelete - not supported yet)

# leader-election
# perform_leader_election

# [log-dirs]
# describe (currently broken)
# alter (AlterReplicaLogDirs - not supported yet)

# [client-quotas]
# describe (DescribeClientQuotas - not supported yet)
# alter (AlterClientQuotas - not supported yet)
Expand All @@ -154,12 +134,16 @@ def run_cli(args=None):

# [topics]
# describe-partitions (DescribeTopicPartitions - not supported yet)
# list-offsets (not supported yet)
# delete-offsets (OffsetDelete - not supported yet)
# alter-reassignments (AlterPartitionReassignments - not supported yet)
# list-reassignments (ListPartitionReassignments - not supported yet)
# create-partitions
# elect-leaders

# [cluster]
# describe-features (DescribeFeatures - not supported yet)
# update-features (UpdateFeatures - not supported yet)
# version
# api-versions



# alter-log-dirs (AlterReplicaLogDirs - not supported yet)
3 changes: 2 additions & 1 deletion kafka/cli/admin/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys

from .describe import DescribeCluster
from .log_dirs import DescribeLogDirs


class ClusterSubCommand:
Expand All @@ -9,6 +10,6 @@ class ClusterSubCommand:
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster')
commands = parser.add_subparsers()
for cmd in [DescribeCluster]:
for cmd in [DescribeCluster, DescribeLogDirs]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ class DescribeLogDirs:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Get topic log directories for brokers')
parser = subparsers.add_parser('log-dirs', help='Get topic log directories and stats')
parser.add_argument('-b', '--broker', type=int, action='append', dest='brokers', help='Query specific broker(s)')
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get additional data about specific topic(s)')
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get data about specific topic(s)')
parser.set_defaults(command=lambda cli, args: cli.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers))
14 changes: 0 additions & 14 deletions kafka/cli/admin/log_dirs/__init__.py

This file was deleted.

Loading