From af29df3798e54fb0c847632ec6fbc205251c679a Mon Sep 17 00:00:00 2001 From: yehao <944207160@qq.com> Date: Sun, 11 May 2025 22:30:06 +0800 Subject: [PATCH] feat: add record data type and use grpc protocol for transmission Signed-off-by: yehao <944207160@qq.com> --- .pylintrc | 4 +- opengemini_client/client.py | 9 ++ opengemini_client/client_impl.py | 83 +++++++++-- opengemini_client/codec/__init__.py | 13 ++ opengemini_client/codec/binary_decoder.py | 67 +++++++++ opengemini_client/codec/binary_encoder.py | 55 ++++++++ opengemini_client/codec/size.py | 81 +++++++++++ opengemini_client/grpc_client.py | 55 ++++++++ opengemini_client/grpc_client_test.py | 55 ++++++++ opengemini_client/models.py | 35 +++-- opengemini_client/proto/__init__.py | 13 ++ opengemini_client/proto/write.proto | 58 ++++++++ opengemini_client/proto/write_pb2.py | 60 ++++++++ opengemini_client/proto/write_pb2.pyi | 84 +++++++++++ opengemini_client/proto/write_pb2_grpc.py | 156 ++++++++++++++++++++ opengemini_client/record/__init__.py | 13 ++ opengemini_client/record/colval.py | 146 +++++++++++++++++++ opengemini_client/record/field.py | 51 +++++++ opengemini_client/record/record.py | 78 ++++++++++ opengemini_client/record_transform.py | 164 ++++++++++++++++++++++ opengemini_client/test_utils.py | 5 +- requirements.txt | 3 + 22 files changed, 1266 insertions(+), 22 deletions(-) create mode 100644 opengemini_client/codec/__init__.py create mode 100644 opengemini_client/codec/binary_decoder.py create mode 100644 opengemini_client/codec/binary_encoder.py create mode 100644 opengemini_client/codec/size.py create mode 100644 opengemini_client/grpc_client.py create mode 100644 opengemini_client/grpc_client_test.py create mode 100644 opengemini_client/proto/__init__.py create mode 100644 opengemini_client/proto/write.proto create mode 100644 opengemini_client/proto/write_pb2.py create mode 100644 opengemini_client/proto/write_pb2.pyi create mode 100644 opengemini_client/proto/write_pb2_grpc.py create mode 100644 opengemini_client/record/__init__.py create mode 100644 opengemini_client/record/colval.py create mode 100644 opengemini_client/record/field.py create mode 100644 opengemini_client/record/record.py create mode 100644 opengemini_client/record_transform.py diff --git a/.pylintrc b/.pylintrc index 3f2ab37..952525c 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,5 +1,5 @@ -[MASTER] -ignore=static,.git +[MAIN] +ignore=static,.git,proto,grpc_client.py [MESSAGES CONTROL] disable= invalid-name, diff --git a/opengemini_client/client.py b/opengemini_client/client.py index d260068..e29450f 100644 --- a/opengemini_client/client.py +++ b/opengemini_client/client.py @@ -52,6 +52,15 @@ def write_batch_points(self, database: str, batch_points: BatchPoints): :return: return an error message """ + def write_by_grpc(self, database: str, batch_points: BatchPoints, rp: str = ''): + """ + batch points to assigned database + :param database: name + :param batch_points: BatchPoints object + :param rp: retention policy + :return: return an error message + """ + @abstractmethod def create_database(self, database: str, rp: RpConfig = None): """ diff --git a/opengemini_client/client_impl.py b/opengemini_client/client_impl.py index f8d4acc..f809c60 100644 --- a/opengemini_client/client_impl.py +++ b/opengemini_client/client_impl.py @@ -17,33 +17,40 @@ import gzip import io import itertools +import os.path from abc import ABC from http import HTTPStatus from typing import List +import grpc import requests from requests import HTTPError +from opengemini_client import grpc_client from opengemini_client.client import Client from opengemini_client.measurement import Measurement, MeasurementCondition from opengemini_client.models import Config, BatchPoints, Query, QueryResult, Series, SeriesResult, RpConfig, \ - ValuesResult, KeyValue + ValuesResult, KeyValue, AuthConfig from opengemini_client.url_const import UrlConst from opengemini_client.models import AuthType, TlsConfig +def check_auth_config(auth_config: AuthConfig): + if auth_config is not None: + if auth_config.auth_type == AuthType.PASSWORD: + if len(auth_config.username) == 0: + raise ValueError("invalid auth config due to empty username") + if len(auth_config.password) == 0: + raise ValueError("invalid auth config due to empty password") + if auth_config.auth_type == AuthType.TOKEN and len(auth_config.token) == 0: + raise ValueError("invalid auth config due to empty token") + + def check_config(config: Config): if len(config.address) == 0: raise ValueError("must have at least one address") - if config.auth_config is not None: - if config.auth_config.auth_type == AuthType.PASSWORD: - if len(config.auth_config.username) == 0: - raise ValueError("invalid auth config due to empty username") - if len(config.auth_config.password) == 0: - raise ValueError("invalid auth config due to empty password") - if config.auth_config.auth_type == AuthType.TOKEN and len(config.auth_config.token) == 0: - raise ValueError("invalid auth config due to empty token") + check_auth_config(config.auth_config) if config.tls_enabled and config.tls_config is None: config.tls_config = TlsConfig() @@ -60,6 +67,17 @@ def check_config(config: Config): if config.connection_timeout is None or config.connection_timeout <= datetime.timedelta(seconds=0): config.connection_timeout = datetime.timedelta(seconds=10) + if config.grpc_config is None: + return config + + if len(config.grpc_config.address) == 0: + raise ValueError("grpc config must have at least one address") + + check_auth_config(config.grpc_config.auth_config) + + if config.grpc_config.tls_enable and config.grpc_config.tls_config is None: + config.grpc_config.tls_config = TlsConfig() + return config @@ -95,6 +113,9 @@ def __init__(self, config: Config): self.session.verify = config.tls_config.ca_file self.endpoints = [f"{protocol}{addr.host}:{addr.port}" for addr in config.address] self.endpoints_iter = itertools.cycle(self.endpoints) + if self.config.grpc_config is not None: + self.grpc_endpoints = [f"{addr.host}:{addr.port}" for addr in config.grpc_config.address] + self.grpc_endpoints_iter = itertools.cycle(self.grpc_endpoints) def close(self): self.session.close() @@ -108,6 +129,31 @@ def __exit__(self, _exc_type, _exc_val, _exc_tb): def _get_server_url(self): return next(self.endpoints_iter) + def _get_grpc_server_url(self): + return next(self.grpc_endpoints_iter) + + def _get_grpc_channel(self): + server_url = self._get_grpc_server_url() + if self.config.grpc_config.tls_enable is False: + return grpc.insecure_channel(server_url) + + root_certificates = None + private_key = None + certificate_chain = None + if os.path.exists(self.config.grpc_config.tls_config.ca_file): + with open(self.config.grpc_config.tls_config.ca_file, 'rb') as fd: + root_certificates = fd.read() + if os.path.exists(self.config.grpc_config.tls_config.cert_file): + with open(self.config.grpc_config.tls_config.cert_file, 'rb') as fd: + certificate_chain = fd.read() + if os.path.exists(self.config.grpc_config.tls_config.key_file): + with open(self.config.grpc_config.tls_config.key_file, 'rb') as fd: + private_key = fd.read() + return grpc.secure_channel( + target=server_url, + credentials=grpc.ssl_channel_credentials(root_certificates, private_key, certificate_chain) + ) + def _update_headers(self, method, url_path, headers=None) -> dict: if headers is None: headers = {} @@ -191,6 +237,25 @@ def write_batch_points(self, database: str, batch_points: BatchPoints): return raise HTTPError(f"write_batch_points error resp, code: {resp.status_code}, body: {resp.text}") + def write_by_grpc(self, database: str, batch_points: BatchPoints, rp: str = ''): + username = '' + password = '' + if self.config.grpc_config.auth_config is not None: + username = self.config.grpc_config.auth_config.username + password = self.config.grpc_config.auth_config.password + + # send grpc request + channel = self._get_grpc_channel() + grpc_client.write( + channel=channel, + database=database, + batch_points=batch_points, + rp=rp, + username=username, + password=password, + timeout=self.config.timeout.seconds, + ) + def create_database(self, database: str, rp: RpConfig = None): if not database: raise ValueError("empty database name") diff --git a/opengemini_client/codec/__init__.py b/opengemini_client/codec/__init__.py new file mode 100644 index 0000000..c6860dc --- /dev/null +++ b/opengemini_client/codec/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/opengemini_client/codec/binary_decoder.py b/opengemini_client/codec/binary_decoder.py new file mode 100644 index 0000000..4f8035d --- /dev/null +++ b/opengemini_client/codec/binary_decoder.py @@ -0,0 +1,67 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +from dataclasses import dataclass +from typing import List +import numpy +from opengemini_client.codec.size import size_of_int64, size_of_uint16, size_of_uint32 + + +@dataclass +class BinaryDecoder: + buf: 'bytes' = b'' + offset: int = 0 + + def int64(self) -> int: + u = self.buf[self.offset:self.offset + size_of_int64()] + v = u[7] | u[6] << 8 | u[5] << 16 | u[4] << 24 | u[3] << 32 | u[2] << 40 | u[1] << 48 | u[0] << 56 + v = (numpy.int64(v) >> 1) ^ ((numpy.int64(v) << 63) >> 63) + self.offset += size_of_int64() + return int(v) + + def uint16(self) -> int: + u = self.buf[self.offset:self.offset + size_of_uint16()] + v = u[1] | u[0] << 8 + self.offset += size_of_uint16() + return int(v) + + def uint32(self) -> int: + u = self.buf[self.offset:self.offset + size_of_uint32()] + v = u[3] | u[2] << 8 | u[1] << 16 | u[0] << 24 + self.offset += size_of_uint32() + return int(v) + + def string(self) -> str: + length = self.uint16() + v = self.buf[self.offset:self.offset + length].decode("utf-8") + self.offset += length + return v + + def bytes(self) -> bytes: + length = self.uint32() + if length == 0: + return b'' + v = self.buf[self.offset:self.offset + length] + self.offset += length + return v + + def uint32_list(self) -> List[int]: + length = self.uint32() + if length == 0: + return [] + length1 = length * size_of_uint32() + v = struct.unpack('<' + 'I ' * length, self.buf[self.offset:self.offset + length1]) + self.offset += length1 + return [int(i) for i in v] diff --git a/opengemini_client/codec/binary_encoder.py b/opengemini_client/codec/binary_encoder.py new file mode 100644 index 0000000..8831827 --- /dev/null +++ b/opengemini_client/codec/binary_encoder.py @@ -0,0 +1,55 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +from typing import List +import numpy + + +def append_int64(b: bytes, v: int) -> bytes: + v = (numpy.int64(v) << 1) ^ (v >> 63) + v = numpy.uint64(v) + u = [numpy.uint8(v >> 56), numpy.uint8(v >> 48), numpy.uint8(v >> 40), numpy.uint8(v >> 32), numpy.uint8(v >> 24), + numpy.uint8(v >> 16), numpy.uint8(v >> 8), numpy.uint8(v)] + return b + bytes(u) + + +def append_uint16(b: bytes, v: int) -> bytes: + v = numpy.uint16(v) + u = [numpy.uint8(v >> 8), numpy.uint8(v)] + return b + bytes(u) + + +def append_uint32(b: bytes, v: int) -> bytes: + v = numpy.uint32(v) + u = [numpy.uint8(v >> 24), numpy.uint8(v >> 16), numpy.uint8(v >> 8), numpy.uint8(v)] + return b + bytes(u) + + +def append_string(b: bytes, v: str) -> bytes: + b = append_uint16(b, len(v)) + return b + v.encode("utf-8") + + +def append_bytes(b: bytes, v: bytes) -> bytes: + b = append_uint32(b, len(v)) + return b + v + + +def append_uint32_list(b: bytes, v: List[int]) -> bytes: + b = append_uint32(b, len(v)) + if len(v) == 0: + return b + byte_data = struct.pack('<' + 'I ' * len(v), *v) + return b + byte_data diff --git a/opengemini_client/codec/size.py b/opengemini_client/codec/size.py new file mode 100644 index 0000000..e8ca253 --- /dev/null +++ b/opengemini_client/codec/size.py @@ -0,0 +1,81 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List + +_SizeOfInt16 = 2 +_SizeOfInt32 = 4 +_SizeOfInt64 = 8 + +_SizeOfUint8 = 1 +_SizeOfUint16 = 2 +_SizeOfUint32 = 4 +_SizeOfUint64 = 8 + +_SizeOfFloat32 = 4 +_SizeOfFloat64 = 8 + +_SizeOfBool = 1 + + +def size_of_int16(): + return _SizeOfInt16 + + +def size_of_int32(): + return _SizeOfInt32 + + +def size_of_int64(): + return _SizeOfInt64 + + +def size_of_uint8(): + return _SizeOfUint8 + + +def size_of_uint16(): + return _SizeOfUint16 + + +def size_of_uint32(): + return _SizeOfUint32 + + +def size_of_uint64(): + return _SizeOfUint64 + + +def size_of_float32(): + return _SizeOfFloat32 + + +def size_of_float64(): + return _SizeOfFloat64 + + +def size_of_bool(): + return _SizeOfBool + + +def size_of_string(s: str) -> int: + return len(s) + size_of_uint16() + + +def size_of_bytes(b: bytes) -> int: + return len(b) + size_of_uint32() + + +def size_of_uint32_list(v: List[int]) -> int: + return len(v) * size_of_uint32() + size_of_uint32() diff --git a/opengemini_client/grpc_client.py b/opengemini_client/grpc_client.py new file mode 100644 index 0000000..5c9b516 --- /dev/null +++ b/opengemini_client/grpc_client.py @@ -0,0 +1,55 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from requests import HTTPError + +from opengemini_client.models import BatchPoints +from opengemini_client.proto import write_pb2, write_pb2_grpc +from opengemini_client.record_transform import RecordTransform + + +def write(channel, database: str, batch_points: BatchPoints, rp: str = '', username: str = '', password: str = '', + timeout: int = 0): + # generate grpc request records + record_transforms = {} + for point in batch_points.points: + rt = record_transforms.get(point.measurement) + if rt is None: + rt = RecordTransform() + rt.add_point(point) + record_transforms[point.measurement] = rt + records = [] + for measurement, rt in record_transforms.items(): + record = write_pb2.Record( + measurement=measurement, + min_time=rt.min_time, + max_time=rt.max_time, + block=rt.convert_to_record().marshal(b''), + ) + records.append(record) + + # send grpc request + response = write_pb2_grpc.WriteServiceStub(channel).Write( + write_pb2.WriteRequest( + database=database, + retention_policy=rp, + username=username, + password=password, + records=records, + ), + timeout=timeout + ) + if response.code == 0: + return + raise HTTPError(f"write_by_grpc error resp, code: {response.code}") diff --git a/opengemini_client/grpc_client_test.py b/opengemini_client/grpc_client_test.py new file mode 100644 index 0000000..1967acf --- /dev/null +++ b/opengemini_client/grpc_client_test.py @@ -0,0 +1,55 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest +from datetime import datetime + +from opengemini_client import models +from opengemini_client import test_utils + + +class GrpcClientTest(unittest.TestCase): + + def test_write_success(self): + with test_utils.get_test_default_client() as cli: + cli.create_database('grpc_write_test') + point1 = models.Point( + measurement='write_mm', + precision=models.Precision.PrecisionSecond, + fields={'x': 12.0, 'y': 4.0}, + tags={'a': 'ax', 'b': 'bx'}, + timestamp=datetime.now(), + ) + point2 = models.Point( + measurement='write_mm', + precision=models.Precision.PrecisionSecond, + fields={'x': 15.0, 'y': 5.0, 'z': 8, 'name': 'xx'}, + tags={'a': 'ax', 'b': 'bx', 'c': 'cz'}, + timestamp=datetime.now(), + ) + point3 = models.Point( + measurement='write_nn', + precision=models.Precision.PrecisionSecond, + fields={'name': 'xx', 'age': 20}, + tags={'sex': 'man'}, + timestamp=datetime.now(), + ) + cli.write_by_grpc("grpc_write_test", models.BatchPoints(points=[point1, point2, point3])) + time.sleep(5) + qr = cli.query(models.Query(database='grpc_write_test', command='select * from write_mm', + retention_policy='')) + print(qr) + self.assertNotEqual(len(qr.results), 0) + cli.drop_database('grpc_write_test') diff --git a/opengemini_client/models.py b/opengemini_client/models.py index a380d70..56a5fe0 100644 --- a/opengemini_client/models.py +++ b/opengemini_client/models.py @@ -51,6 +51,14 @@ class BatchConfig: batch_size: int +@dataclass +class GrpcConfig: + address: List[Address] + auth_config: AuthConfig = None + tls_enable: bool = False + tls_config: TlsConfig = None + + @dataclass class Config: address: List[Address] @@ -61,6 +69,7 @@ class Config: tls_enabled: bool = False auth_config: AuthConfig = None tls_config: TlsConfig = None + grpc_config: GrpcConfig = None @dataclass @@ -200,23 +209,29 @@ def write_fields(self, writer: io.StringIO): else: writer.write('F') - def write_timestamp(self, writer: io.StringIO): + def generate_timestamp(self): + ts = 0 if self.timestamp is None: - return - writer.write(' ') + return ts if self.precision == Precision.PrecisionMicrosecond: - ts_str = str(round_datetime(self.timestamp, timedelta(microseconds=1))) + ts = round_datetime(self.timestamp, timedelta(microseconds=1)) elif self.precision == Precision.PrecisionMillisecond: - ts_str = str(round_datetime(self.timestamp, timedelta(milliseconds=1))) + ts = round_datetime(self.timestamp, timedelta(milliseconds=1)) elif self.precision == Precision.PrecisionSecond: - ts_str = str(round_datetime(self.timestamp, timedelta(seconds=1))) + ts = round_datetime(self.timestamp, timedelta(seconds=1)) elif self.precision == Precision.PrecisionMinute: - ts_str = str(round_datetime(self.timestamp, timedelta(minutes=1))) + ts = round_datetime(self.timestamp, timedelta(minutes=1)) elif self.precision == Precision.PrecisionHour: - ts_str = str(round_datetime(self.timestamp, timedelta(hours=1))) + ts = round_datetime(self.timestamp, timedelta(hours=1)) else: - ts_str = str(self.timestamp.timestamp() * 1000 * 1000 * 1000) - writer.write(ts_str) + ts = self.timestamp.timestamp() * 1000 * 1000 * 1000 + return ts + + def write_timestamp(self, writer: io.StringIO): + if self.timestamp is None: + return + writer.write(' ') + writer.write(str(self.generate_timestamp())) @dataclass diff --git a/opengemini_client/proto/__init__.py b/opengemini_client/proto/__init__.py new file mode 100644 index 0000000..c6860dc --- /dev/null +++ b/opengemini_client/proto/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/opengemini_client/proto/write.proto b/opengemini_client/proto/write.proto new file mode 100644 index 0000000..94ef92d --- /dev/null +++ b/opengemini_client/proto/write.proto @@ -0,0 +1,58 @@ +syntax = "proto3"; +package proto; + +// WriteService represents a openGemini RPC write service. +service WriteService { + // Write writes the given records to the specified database and retention policy. + rpc Write (WriteRequest) returns (WriteResponse) {} + // Ping is used to check if the server is alive + rpc Ping(PingRequest) returns (PingResponse) {} +} + +message WriteRequest { + uint32 version = 1; + string database = 2; + string retention_policy = 3; + string username = 4; + string password = 5; + repeated Record records = 6; +} + +message WriteResponse { + ResponseCode code = 1; +} + +message Record { + string measurement = 1; + int64 min_time = 2; + int64 max_time = 3; + CompressMethod compress_method = 4; + bytes block = 5; +} + +enum CompressMethod { + UNCOMPRESSED = 0; + LZ4_FAST = 1; + ZSTD_FAST = 2; + SNAPPY = 3; +} + +enum ResponseCode { + Success = 0; + Partial = 1; + Failed = 2; +} + +message PingRequest { + string client_id = 1; +} + +enum ServerStatus { + Up = 0; + Down = 1; + Unknown = 99; +} + +message PingResponse { + ServerStatus status = 1; +} diff --git a/opengemini_client/proto/write_pb2.py b/opengemini_client/proto/write_pb2.py new file mode 100644 index 0000000..41f9f06 --- /dev/null +++ b/opengemini_client/proto/write_pb2.py @@ -0,0 +1,60 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 1, + '', + 'write.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bwrite.proto\x12\x05proto\"\x8f\x01\n\x0cWriteRequest\x12\x0f\n\x07version\x18\x01 \x01(\r\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x18\n\x10retention_policy\x18\x03 \x01(\t\x12\x10\n\x08username\x18\x04 \x01(\t\x12\x10\n\x08password\x18\x05 \x01(\t\x12\x1e\n\x07records\x18\x06 \x03(\x0b\x32\r.proto.Record\"2\n\rWriteResponse\x12!\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x13.proto.ResponseCode\"\x80\x01\n\x06Record\x12\x13\n\x0bmeasurement\x18\x01 \x01(\t\x12\x10\n\x08min_time\x18\x02 \x01(\x03\x12\x10\n\x08max_time\x18\x03 \x01(\x03\x12.\n\x0f\x63ompress_method\x18\x04 \x01(\x0e\x32\x15.proto.CompressMethod\x12\r\n\x05\x62lock\x18\x05 \x01(\x0c\" \n\x0bPingRequest\x12\x11\n\tclient_id\x18\x01 \x01(\t\"3\n\x0cPingResponse\x12#\n\x06status\x18\x01 \x01(\x0e\x32\x13.proto.ServerStatus*K\n\x0e\x43ompressMethod\x12\x10\n\x0cUNCOMPRESSED\x10\x00\x12\x0c\n\x08LZ4_FAST\x10\x01\x12\r\n\tZSTD_FAST\x10\x02\x12\n\n\x06SNAPPY\x10\x03*4\n\x0cResponseCode\x12\x0b\n\x07Success\x10\x00\x12\x0b\n\x07Partial\x10\x01\x12\n\n\x06\x46\x61iled\x10\x02*-\n\x0cServerStatus\x12\x06\n\x02Up\x10\x00\x12\x08\n\x04\x44own\x10\x01\x12\x0b\n\x07Unknown\x10\x63\x32w\n\x0cWriteService\x12\x34\n\x05Write\x12\x13.proto.WriteRequest\x1a\x14.proto.WriteResponse\"\x00\x12\x31\n\x04Ping\x12\x12.proto.PingRequest\x1a\x13.proto.PingResponse\"\x00\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'write_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_COMPRESSMETHOD']._serialized_start=438 + _globals['_COMPRESSMETHOD']._serialized_end=513 + _globals['_RESPONSECODE']._serialized_start=515 + _globals['_RESPONSECODE']._serialized_end=567 + _globals['_SERVERSTATUS']._serialized_start=569 + _globals['_SERVERSTATUS']._serialized_end=614 + _globals['_WRITEREQUEST']._serialized_start=23 + _globals['_WRITEREQUEST']._serialized_end=166 + _globals['_WRITERESPONSE']._serialized_start=168 + _globals['_WRITERESPONSE']._serialized_end=218 + _globals['_RECORD']._serialized_start=221 + _globals['_RECORD']._serialized_end=349 + _globals['_PINGREQUEST']._serialized_start=351 + _globals['_PINGREQUEST']._serialized_end=383 + _globals['_PINGRESPONSE']._serialized_start=385 + _globals['_PINGRESPONSE']._serialized_end=436 + _globals['_WRITESERVICE']._serialized_start=616 + _globals['_WRITESERVICE']._serialized_end=735 +# @@protoc_insertion_point(module_scope) diff --git a/opengemini_client/proto/write_pb2.pyi b/opengemini_client/proto/write_pb2.pyi new file mode 100644 index 0000000..d134c6e --- /dev/null +++ b/opengemini_client/proto/write_pb2.pyi @@ -0,0 +1,84 @@ +from google.protobuf.internal import containers as _containers +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class CompressMethod(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + UNCOMPRESSED: _ClassVar[CompressMethod] + LZ4_FAST: _ClassVar[CompressMethod] + ZSTD_FAST: _ClassVar[CompressMethod] + SNAPPY: _ClassVar[CompressMethod] + +class ResponseCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + Success: _ClassVar[ResponseCode] + Partial: _ClassVar[ResponseCode] + Failed: _ClassVar[ResponseCode] + +class ServerStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + Up: _ClassVar[ServerStatus] + Down: _ClassVar[ServerStatus] + Unknown: _ClassVar[ServerStatus] +UNCOMPRESSED: CompressMethod +LZ4_FAST: CompressMethod +ZSTD_FAST: CompressMethod +SNAPPY: CompressMethod +Success: ResponseCode +Partial: ResponseCode +Failed: ResponseCode +Up: ServerStatus +Down: ServerStatus +Unknown: ServerStatus + +class WriteRequest(_message.Message): + __slots__ = ("version", "database", "retention_policy", "username", "password", "records") + VERSION_FIELD_NUMBER: _ClassVar[int] + DATABASE_FIELD_NUMBER: _ClassVar[int] + RETENTION_POLICY_FIELD_NUMBER: _ClassVar[int] + USERNAME_FIELD_NUMBER: _ClassVar[int] + PASSWORD_FIELD_NUMBER: _ClassVar[int] + RECORDS_FIELD_NUMBER: _ClassVar[int] + version: int + database: str + retention_policy: str + username: str + password: str + records: _containers.RepeatedCompositeFieldContainer[Record] + def __init__(self, version: _Optional[int] = ..., database: _Optional[str] = ..., retention_policy: _Optional[str] = ..., username: _Optional[str] = ..., password: _Optional[str] = ..., records: _Optional[_Iterable[_Union[Record, _Mapping]]] = ...) -> None: ... + +class WriteResponse(_message.Message): + __slots__ = ("code",) + CODE_FIELD_NUMBER: _ClassVar[int] + code: ResponseCode + def __init__(self, code: _Optional[_Union[ResponseCode, str]] = ...) -> None: ... + +class Record(_message.Message): + __slots__ = ("measurement", "min_time", "max_time", "compress_method", "block") + MEASUREMENT_FIELD_NUMBER: _ClassVar[int] + MIN_TIME_FIELD_NUMBER: _ClassVar[int] + MAX_TIME_FIELD_NUMBER: _ClassVar[int] + COMPRESS_METHOD_FIELD_NUMBER: _ClassVar[int] + BLOCK_FIELD_NUMBER: _ClassVar[int] + measurement: str + min_time: int + max_time: int + compress_method: CompressMethod + block: bytes + def __init__(self, measurement: _Optional[str] = ..., min_time: _Optional[int] = ..., max_time: _Optional[int] = ..., compress_method: _Optional[_Union[CompressMethod, str]] = ..., block: _Optional[bytes] = ...) -> None: ... + +class PingRequest(_message.Message): + __slots__ = ("client_id",) + CLIENT_ID_FIELD_NUMBER: _ClassVar[int] + client_id: str + def __init__(self, client_id: _Optional[str] = ...) -> None: ... + +class PingResponse(_message.Message): + __slots__ = ("status",) + STATUS_FIELD_NUMBER: _ClassVar[int] + status: ServerStatus + def __init__(self, status: _Optional[_Union[ServerStatus, str]] = ...) -> None: ... diff --git a/opengemini_client/proto/write_pb2_grpc.py b/opengemini_client/proto/write_pb2_grpc.py new file mode 100644 index 0000000..51e3761 --- /dev/null +++ b/opengemini_client/proto/write_pb2_grpc.py @@ -0,0 +1,156 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc + +import opengemini_client.proto.write_pb2 as write__pb2 + +GRPC_GENERATED_VERSION = '1.68.1' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in write_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class WriteServiceStub(object): + """WriteService represents a openGemini RPC write service. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Write = channel.unary_unary( + '/proto.WriteService/Write', + request_serializer=write__pb2.WriteRequest.SerializeToString, + response_deserializer=write__pb2.WriteResponse.FromString, + _registered_method=True) + self.Ping = channel.unary_unary( + '/proto.WriteService/Ping', + request_serializer=write__pb2.PingRequest.SerializeToString, + response_deserializer=write__pb2.PingResponse.FromString, + _registered_method=True) + + +class WriteServiceServicer(object): + """WriteService represents a openGemini RPC write service. + """ + + def Write(self, request, context): + """Write writes the given records to the specified database and retention policy. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Ping(self, request, context): + """Ping is used to check if the server is alive + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_WriteServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Write': grpc.unary_unary_rpc_method_handler( + servicer.Write, + request_deserializer=write__pb2.WriteRequest.FromString, + response_serializer=write__pb2.WriteResponse.SerializeToString, + ), + 'Ping': grpc.unary_unary_rpc_method_handler( + servicer.Ping, + request_deserializer=write__pb2.PingRequest.FromString, + response_serializer=write__pb2.PingResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'proto.WriteService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('proto.WriteService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class WriteService(object): + """WriteService represents a openGemini RPC write service. + """ + + @staticmethod + def Write(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/proto.WriteService/Write', + write__pb2.WriteRequest.SerializeToString, + write__pb2.WriteResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Ping(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/proto.WriteService/Ping', + write__pb2.PingRequest.SerializeToString, + write__pb2.PingResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/opengemini_client/record/__init__.py b/opengemini_client/record/__init__.py new file mode 100644 index 0000000..c6860dc --- /dev/null +++ b/opengemini_client/record/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/opengemini_client/record/colval.py b/opengemini_client/record/colval.py new file mode 100644 index 0000000..26d489c --- /dev/null +++ b/opengemini_client/record/colval.py @@ -0,0 +1,146 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +from dataclasses import dataclass, field +from typing import List +from opengemini_client.codec.binary_encoder import (append_int64, append_bytes, append_uint32_list) +from opengemini_client.codec.binary_decoder import BinaryDecoder +from opengemini_client.codec.size import size_of_int64, size_of_bytes, size_of_uint32_list + +_BitMask = [1, 2, 4, 8, 16, 32, 64, 128] +_FlippedBitMask = [254, 253, 251, 247, 239, 223, 191, 127] + + +@dataclass +class ColVal: + Val: bytes = b'' + Offset: List[int] = field(default_factory=list) + Bitmap: bytes = b'' + BitmapOffset: int = 0 + Len: int = 0 + NilCount: int = 0 + + def _set_bitmap(self, index: int): + if (self.Len + self.BitmapOffset) >> 3 >= len(self.Bitmap): + self.Bitmap += struct.pack('> 3 + self.Bitmap = (self.Bitmap[:index1] + bytes([self.Bitmap[index1] | _BitMask[index & 0x07]]) + + self.Bitmap[index1 + 1:]) + + def _reset_bitmap(self, index: int): + if (self.Len + self.BitmapOffset) >> 3 >= len(self.Bitmap): + self.Bitmap = struct.pack('> 3 + self.Bitmap = (self.Bitmap[:index1] + bytes([self.Bitmap[index1] & _FlippedBitMask[index & 0x07]]) + + self.Bitmap[index1 + 1:]) + + def _append_null(self): + self._reset_bitmap(self.Len) + self.Len += 1 + self.NilCount += 1 + + def append_integer(self, v: int): + self.Val += struct.pack(' bytes: + buf = append_int64(buf, self.Len) + buf = append_int64(buf, self.NilCount) + buf = append_int64(buf, self.BitmapOffset) + buf = append_bytes(buf, self.Val) + buf = append_bytes(buf, self.Bitmap) + buf = append_uint32_list(buf, self.Offset) + return buf + + def unmarshal(self, buf: bytes): + if len(buf) == 0: + return + dec = BinaryDecoder(buf=buf, offset=0) + self.Len = dec.int64() + self.NilCount = dec.int64() + self.BitmapOffset = dec.int64() + self.Val = dec.bytes() + self.Bitmap = dec.bytes() + self.Offset = dec.uint32_list() + + def codec_size(self) -> int: + size = 0 + # Len + size += size_of_int64() + # NilCount + size += size_of_int64() + # BitmapOffset + size += size_of_int64() + # Val + size += size_of_bytes(self.Val) + # Bitmap + size += size_of_bytes(self.Bitmap) + # Offset + size += size_of_uint32_list(self.Offset) + return size diff --git a/opengemini_client/record/field.py b/opengemini_client/record/field.py new file mode 100644 index 0000000..fb182aa --- /dev/null +++ b/opengemini_client/record/field.py @@ -0,0 +1,51 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from opengemini_client.codec.size import size_of_string, size_of_int64 +from opengemini_client.codec.binary_encoder import append_string, append_int64 +from opengemini_client.codec.binary_decoder import BinaryDecoder + +Field_Type_Unknown = 0 +Field_Type_Int = 1 +Field_Type_UInt = 2 +Field_Type_Float = 3 +Field_Type_String = 4 +Field_Type_Boolean = 5 +Field_Type_Tag = 6 +Field_Type_Last = 7 + + +@dataclass +class Field: + Type: int = 0 + Name: str = '' + + def marshal(self, buf: bytes) -> bytes: + buf = append_string(buf, self.Name) + buf = append_int64(buf, self.Type) + return buf + + def unmarshal(self, buf: bytes): + if len(buf) == 0: + return + dec = BinaryDecoder(buf=buf, offset=0) + self.Name = dec.string() + self.Type = dec.int64() + + def codec_size(self) -> int: + size = 0 + size += size_of_string(self.Name) + size += size_of_int64() + return size diff --git a/opengemini_client/record/record.py b/opengemini_client/record/record.py new file mode 100644 index 0000000..f2b8d49 --- /dev/null +++ b/opengemini_client/record/record.py @@ -0,0 +1,78 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field +from typing import List + +from opengemini_client.record.colval import ColVal +from opengemini_client.record.field import Field +from opengemini_client.codec.size import size_of_uint32 +from opengemini_client.codec.binary_encoder import append_uint32 +from opengemini_client.codec.binary_decoder import BinaryDecoder + +TimeField = "time" + + +@dataclass +class Record: + ColVals: List[ColVal] = field(default_factory=list) + Fields: List[Field] = field(default_factory=list) + + def marshal(self, buf: bytes) -> bytes: + # Fields + buf = append_uint32(buf, len(self.Fields)) + for f in self.Fields: + buf = append_uint32(buf, f.codec_size()) + buf = f.marshal(buf) + + # ColVals + buf = append_uint32(buf, len(self.ColVals)) + for col in self.ColVals: + buf = append_uint32(buf, col.codec_size()) + buf = col.marshal(buf) + return buf + + def unmarshal(self, buf: bytes): + if len(buf) == 0: + return + dec = BinaryDecoder(buf=buf, offset=0) + + # Fields + fl = dec.uint32() + for _ in range(fl): + fd = Field() + fd.unmarshal(dec.bytes()) + self.Fields.append(fd) + + # ColVals + cl = dec.uint32() + for _ in range(cl): + col = ColVal() + col.unmarshal(dec.bytes()) + self.ColVals.append(col) + + def code_size(self) -> int: + size = 0 + # Fields + size += size_of_uint32() + for f in self.Fields: + size += size_of_uint32() + size += f.codec_size() + + # ColVals + size += size_of_uint32() + for col in self.ColVals: + size += size_of_uint32() + size += col.codec_size() + return size diff --git a/opengemini_client/record_transform.py b/opengemini_client/record_transform.py new file mode 100644 index 0000000..7f9c07e --- /dev/null +++ b/opengemini_client/record_transform.py @@ -0,0 +1,164 @@ +# Copyright 2025 openGemini Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field +from typing import Dict, Union + +from opengemini_client.models import Point +from opengemini_client.record.colval import ColVal +from opengemini_client.record.field import Field, Field_Type_Int, Field_Type_Float, Field_Type_Boolean, \ + Field_Type_String, Field_Type_Tag +from opengemini_client.record.record import TimeField, Record + +err_invalid_field_value = ValueError("invalid field value type") +err_empty_record = ValueError("empty record") + + +def get_field_type(v): + if isinstance(v, int): + t = Field_Type_Int + elif isinstance(v, float): + t = Field_Type_Float + elif isinstance(v, bool): + t = Field_Type_Boolean + elif isinstance(v, str): + t = Field_Type_String + else: + raise err_invalid_field_value + return t + + +@dataclass +class Column: + field: Field + col: ColVal + + def add_nulls(self, row_count): + if self.field.Type in (Field_Type_String, Field_Type_Tag): + self.col.append_string_nulls(row_count) + elif self.field.Type == Field_Type_Int: + self.col.append_integer_nulls(row_count) + elif self.field.Type == Field_Type_Float: + self.col.append_float_nulls(row_count) + elif self.field.Type == Field_Type_Boolean: + self.col.append_boolean_nulls(row_count) + else: + raise err_invalid_field_value + + def add_value(self, value): + if self.field.Type in (Field_Type_String, Field_Type_Tag): + self.col.append_string(value) + elif self.field.Type == Field_Type_Int: + self.col.append_integer(value) + elif self.field.Type == Field_Type_Float: + self.col.append_float(value) + elif self.field.Type == Field_Type_Boolean: + self.col.append_boolean(value) + else: + raise err_invalid_field_value + + +@dataclass +class RecordTransform: + row_count: int = 0 + min_time: int = 0 + max_time: int = 0 + columns: Dict[str, Column] = field(default_factory=dict) + fills: Dict[str, bool] = field(default_factory=dict) + + def add_tag_columns(self, tags: Dict[str, str]): + for name, value in tags.items(): + column = self.columns.get(name) + if column is None: + column = Column( + field=Field(Name=name, Type=Field_Type_Tag), + col=ColVal(), + ) + column.add_nulls(self.row_count) + column.add_value(value) + self.fills[name] = True + self.columns[name] = column + + def add_field_columns(self, fields: Dict[str, Union[str, int, float, bool]]): + for name, value in fields.items(): + column = self.columns.get(name) + if column is None: + column = Column( + field=Field(Name=name, Type=get_field_type(value)), + col=ColVal(), + ) + column.add_nulls(self.row_count) + column.add_value(value) + self.fills[name] = True + self.columns[name] = column + + def add_timestamp(self, timestamp): + column = self.columns.get(TimeField) + if column is None: + column = Column( + field=Field(Name=TimeField, Type=Field_Type_Int), + col=ColVal(), + ) + column.add_nulls(self.row_count) + column.add_value(timestamp) + self.columns[TimeField] = column + + self.min_time = min(self.min_time, timestamp) + self.max_time = max(self.max_time, timestamp) + + def add_miss_value_columns(self): + for name, ok in self.fills.items(): + if ok is True: + continue + column = self.columns[name] + if column is None: + continue + count = self.row_count - column.col.Len + if count <= 0: + continue + column.add_nulls(count) + + for name in self.fills: + self.fills[name] = False + + def add_point(self, point: Point): + self.add_tag_columns(point.tags) + self.add_field_columns(point.fields) + self.add_timestamp(point.generate_timestamp()) + self.row_count += 1 + self.add_miss_value_columns() + + def convert_to_record(self): + if len(self.columns) == 0: + raise err_empty_record + tags_field = [] + tags_col = [] + fields_field = [] + fields_col = [] + timestamps_field = [] + timestamps_col = [] + for column in self.columns.values(): + if column.field.Name == TimeField: + timestamps_field.append(column.field) + timestamps_col.append(column.col) + continue + if column.field.Type == Field_Type_Tag: + tags_field.append(column.field) + tags_col.append(column.col) + continue + fields_field.append(column.field) + fields_col.append(column.col) + fields = fields_field + tags_field + timestamps_field + cols = fields_col + tags_col + timestamps_col + return Record(Fields=fields, ColVals=cols) diff --git a/opengemini_client/test_utils.py b/opengemini_client/test_utils.py index 54a5a64..7d79888 100644 --- a/opengemini_client/test_utils.py +++ b/opengemini_client/test_utils.py @@ -17,6 +17,9 @@ def get_test_default_client(): - cfg = models.Config(address=[models.Address(host='127.0.0.1', port=8086)]) + cfg = models.Config(address=[models.Address(host='127.0.0.1', port=8086)], + grpc_config=models.GrpcConfig( + address=[models.Address(host='127.0.0.1', port=8305)], + )) cli = client_impl.OpenGeminiDBClient(cfg) return cli diff --git a/requirements.txt b/requirements.txt index 0eb8cae..96f6e88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ requests>=2.31.0 +grpcio>=1.68.1 +protobuf>=5.29.2 +numpy>=2.0.2