Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[MASTER]
ignore=static,.git
[MAIN]
ignore=static,.git,proto,grpc_client.py
[MESSAGES CONTROL]
disable=
invalid-name,
Expand Down
9 changes: 9 additions & 0 deletions opengemini_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ def write_batch_points(self, database: str, batch_points: BatchPoints):
:return: return an error message
"""

def write_by_grpc(self, database: str, batch_points: BatchPoints, rp: str = ''):
"""
batch points to assigned database
:param database: name
:param batch_points: BatchPoints object
:param rp: retention policy
:return: return an error message
"""

@abstractmethod
def create_database(self, database: str, rp: RpConfig = None):
"""
Expand Down
83 changes: 74 additions & 9 deletions opengemini_client/client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,40 @@
import gzip
import io
import itertools
import os.path
from abc import ABC
from http import HTTPStatus
from typing import List

import grpc
import requests
from requests import HTTPError

from opengemini_client import grpc_client
from opengemini_client.client import Client
from opengemini_client.measurement import Measurement, MeasurementCondition
from opengemini_client.models import Config, BatchPoints, Query, QueryResult, Series, SeriesResult, RpConfig, \
ValuesResult, KeyValue
ValuesResult, KeyValue, AuthConfig
from opengemini_client.url_const import UrlConst
from opengemini_client.models import AuthType, TlsConfig


def check_auth_config(auth_config: AuthConfig):
if auth_config is not None:
if auth_config.auth_type == AuthType.PASSWORD:
if len(auth_config.username) == 0:
raise ValueError("invalid auth config due to empty username")
if len(auth_config.password) == 0:
raise ValueError("invalid auth config due to empty password")
if auth_config.auth_type == AuthType.TOKEN and len(auth_config.token) == 0:
raise ValueError("invalid auth config due to empty token")


def check_config(config: Config):
if len(config.address) == 0:
raise ValueError("must have at least one address")

if config.auth_config is not None:
if config.auth_config.auth_type == AuthType.PASSWORD:
if len(config.auth_config.username) == 0:
raise ValueError("invalid auth config due to empty username")
if len(config.auth_config.password) == 0:
raise ValueError("invalid auth config due to empty password")
if config.auth_config.auth_type == AuthType.TOKEN and len(config.auth_config.token) == 0:
raise ValueError("invalid auth config due to empty token")
check_auth_config(config.auth_config)

if config.tls_enabled and config.tls_config is None:
config.tls_config = TlsConfig()
Expand All @@ -60,6 +67,17 @@ def check_config(config: Config):
if config.connection_timeout is None or config.connection_timeout <= datetime.timedelta(seconds=0):
config.connection_timeout = datetime.timedelta(seconds=10)

if config.grpc_config is None:
return config

if len(config.grpc_config.address) == 0:
raise ValueError("grpc config must have at least one address")

check_auth_config(config.grpc_config.auth_config)

if config.grpc_config.tls_enable and config.grpc_config.tls_config is None:
config.grpc_config.tls_config = TlsConfig()

return config


Expand Down Expand Up @@ -95,6 +113,9 @@ def __init__(self, config: Config):
self.session.verify = config.tls_config.ca_file
self.endpoints = [f"{protocol}{addr.host}:{addr.port}" for addr in config.address]
self.endpoints_iter = itertools.cycle(self.endpoints)
if self.config.grpc_config is not None:
self.grpc_endpoints = [f"{addr.host}:{addr.port}" for addr in config.grpc_config.address]
self.grpc_endpoints_iter = itertools.cycle(self.grpc_endpoints)

def close(self):
self.session.close()
Expand All @@ -108,6 +129,31 @@ def __exit__(self, _exc_type, _exc_val, _exc_tb):
def _get_server_url(self):
return next(self.endpoints_iter)

def _get_grpc_server_url(self):
return next(self.grpc_endpoints_iter)

def _get_grpc_channel(self):
server_url = self._get_grpc_server_url()
if self.config.grpc_config.tls_enable is False:
return grpc.insecure_channel(server_url)

root_certificates = None
private_key = None
certificate_chain = None
if os.path.exists(self.config.grpc_config.tls_config.ca_file):
with open(self.config.grpc_config.tls_config.ca_file, 'rb') as fd:
root_certificates = fd.read()
if os.path.exists(self.config.grpc_config.tls_config.cert_file):
with open(self.config.grpc_config.tls_config.cert_file, 'rb') as fd:
certificate_chain = fd.read()
if os.path.exists(self.config.grpc_config.tls_config.key_file):
with open(self.config.grpc_config.tls_config.key_file, 'rb') as fd:
private_key = fd.read()
return grpc.secure_channel(
target=server_url,
credentials=grpc.ssl_channel_credentials(root_certificates, private_key, certificate_chain)
)

def _update_headers(self, method, url_path, headers=None) -> dict:
if headers is None:
headers = {}
Expand Down Expand Up @@ -191,6 +237,25 @@ def write_batch_points(self, database: str, batch_points: BatchPoints):
return
raise HTTPError(f"write_batch_points error resp, code: {resp.status_code}, body: {resp.text}")

def write_by_grpc(self, database: str, batch_points: BatchPoints, rp: str = ''):
username = ''
password = ''
if self.config.grpc_config.auth_config is not None:
username = self.config.grpc_config.auth_config.username
password = self.config.grpc_config.auth_config.password

# send grpc request
channel = self._get_grpc_channel()
grpc_client.write(
channel=channel,
database=database,
batch_points=batch_points,
rp=rp,
username=username,
password=password,
timeout=self.config.timeout.seconds,
)

def create_database(self, database: str, rp: RpConfig = None):
if not database:
raise ValueError("empty database name")
Expand Down
13 changes: 13 additions & 0 deletions opengemini_client/codec/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 openGemini Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
67 changes: 67 additions & 0 deletions opengemini_client/codec/binary_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2025 openGemini Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import struct
from dataclasses import dataclass
from typing import List
import numpy
from opengemini_client.codec.size import size_of_int64, size_of_uint16, size_of_uint32


@dataclass
class BinaryDecoder:
buf: 'bytes' = b''
offset: int = 0

def int64(self) -> int:
u = self.buf[self.offset:self.offset + size_of_int64()]
v = u[7] | u[6] << 8 | u[5] << 16 | u[4] << 24 | u[3] << 32 | u[2] << 40 | u[1] << 48 | u[0] << 56
v = (numpy.int64(v) >> 1) ^ ((numpy.int64(v) << 63) >> 63)
self.offset += size_of_int64()
return int(v)

def uint16(self) -> int:
u = self.buf[self.offset:self.offset + size_of_uint16()]
v = u[1] | u[0] << 8
self.offset += size_of_uint16()
return int(v)

def uint32(self) -> int:
u = self.buf[self.offset:self.offset + size_of_uint32()]
v = u[3] | u[2] << 8 | u[1] << 16 | u[0] << 24
self.offset += size_of_uint32()
return int(v)

def string(self) -> str:
length = self.uint16()
v = self.buf[self.offset:self.offset + length].decode("utf-8")
self.offset += length
return v

def bytes(self) -> bytes:
length = self.uint32()
if length == 0:
return b''
v = self.buf[self.offset:self.offset + length]
self.offset += length
return v

def uint32_list(self) -> List[int]:
length = self.uint32()
if length == 0:
return []
length1 = length * size_of_uint32()
v = struct.unpack('<' + 'I ' * length, self.buf[self.offset:self.offset + length1])
self.offset += length1
return [int(i) for i in v]
55 changes: 55 additions & 0 deletions opengemini_client/codec/binary_encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2025 openGemini Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import struct
from typing import List
import numpy


def append_int64(b: bytes, v: int) -> bytes:
v = (numpy.int64(v) << 1) ^ (v >> 63)
v = numpy.uint64(v)
u = [numpy.uint8(v >> 56), numpy.uint8(v >> 48), numpy.uint8(v >> 40), numpy.uint8(v >> 32), numpy.uint8(v >> 24),
numpy.uint8(v >> 16), numpy.uint8(v >> 8), numpy.uint8(v)]
return b + bytes(u)


def append_uint16(b: bytes, v: int) -> bytes:
v = numpy.uint16(v)
u = [numpy.uint8(v >> 8), numpy.uint8(v)]
return b + bytes(u)


def append_uint32(b: bytes, v: int) -> bytes:
v = numpy.uint32(v)
u = [numpy.uint8(v >> 24), numpy.uint8(v >> 16), numpy.uint8(v >> 8), numpy.uint8(v)]
return b + bytes(u)


def append_string(b: bytes, v: str) -> bytes:
b = append_uint16(b, len(v))
return b + v.encode("utf-8")


def append_bytes(b: bytes, v: bytes) -> bytes:
b = append_uint32(b, len(v))
return b + v


def append_uint32_list(b: bytes, v: List[int]) -> bytes:
b = append_uint32(b, len(v))
if len(v) == 0:
return b
byte_data = struct.pack('<' + 'I ' * len(v), *v)
return b + byte_data
81 changes: 81 additions & 0 deletions opengemini_client/codec/size.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2025 openGemini Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List

_SizeOfInt16 = 2
_SizeOfInt32 = 4
_SizeOfInt64 = 8

_SizeOfUint8 = 1
_SizeOfUint16 = 2
_SizeOfUint32 = 4
_SizeOfUint64 = 8

_SizeOfFloat32 = 4
_SizeOfFloat64 = 8

_SizeOfBool = 1


def size_of_int16():
return _SizeOfInt16


def size_of_int32():
return _SizeOfInt32


def size_of_int64():
return _SizeOfInt64


def size_of_uint8():
return _SizeOfUint8


def size_of_uint16():
return _SizeOfUint16


def size_of_uint32():
return _SizeOfUint32


def size_of_uint64():
return _SizeOfUint64


def size_of_float32():
return _SizeOfFloat32


def size_of_float64():
return _SizeOfFloat64


def size_of_bool():
return _SizeOfBool


def size_of_string(s: str) -> int:
return len(s) + size_of_uint16()


def size_of_bytes(b: bytes) -> int:
return len(b) + size_of_uint32()


def size_of_uint32_list(v: List[int]) -> int:
return len(v) * size_of_uint32() + size_of_uint32()
Loading