Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions alexia/api/decorators.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
from functools import wraps

from django.core.exceptions import PermissionDenied
from modernrpc.core import REQUEST_KEY


def manager_required(f):
@wraps(f)
def wrap(request, *args, **kwargs):
def wrap(*args, **kwargs):
request = kwargs.get(REQUEST_KEY)
if not request.user.is_authenticated or not request.user.is_superuser and (
not request.organization or not request.user.profile.is_manager(request.organization)):
not request.organization or not request.user.profile.is_manager(request.organization)
):
raise PermissionDenied
return f(request, *args, **kwargs)
return f(*args, **kwargs)
return wrap

def login_required(f):
@wraps(f)
def wrap(*args, **kwargs):
request = kwargs.get(REQUEST_KEY)
if not request.user.is_authenticated:
raise PermissionDenied
return f(*args, **kwargs)
return wrap
32 changes: 19 additions & 13 deletions alexia/api/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
from jsonrpc.exceptions import Error
from modernrpc.exceptions import RPCException, RPC_INVALID_PARAMS, RPC_CUSTOM_ERROR_BASE


class ForbiddenError(Error):
class ForbiddenError(RPCException):
""" The token was not recognized. """
code = 403
status = 403
message = 'Forbidden.'
def __init__(self, message=None):
super(ForbiddenError, self).__init__(
code=(RPC_CUSTOM_ERROR_BASE + 3),
message='Forbidden.' if message is None else message
)


class ObjectNotFoundError(Error):
class ObjectNotFoundError(RPCException):
""" The requested object does not exist. """
code = 404
message = 'Object not found.'
status = 404
def __init__(self, message=None):
super(ObjectNotFoundError, self).__init__(
code=404,
message='Object not found.' if message is None else message
)


class InvalidParamsError(Error):
class InvalidParamsError(RPCException):
""" Invalid method parameters. """
code = -32602
message = 'Invalid params.'
status = 422
def __init__(self, message=None):
super(InvalidParamsError, self).__init__(
code=RPC_INVALID_PARAMS,
message='Invalid params.' if message is None else message
)
53 changes: 53 additions & 0 deletions alexia/api/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json

from modernrpc.handlers import JSONRPCHandler
from modernrpc.handlers.jsonhandler import JsonResult, JsonSuccessResult, JsonErrorResult

from alexia.api.v1 import version


class AlexiaJSONRPCHandler(JSONRPCHandler):

# Override content types because older apps send an initialization request with the content type
# text/plain, so we need to add that to the allowed content types for JSON-RPC. -- albertskja 2025-05-12
@staticmethod
def valid_content_types():
return [
"text/plain",
"application/json",
"application/json-rpc",
"application/jsonrequest",
]

def process_single_request(self, request_data, context):
# Older apps send the jsonrpc version as a float, but the library expects it as a string.
# So we need to overwrite that attribute to the correct type if it is a float. -- albertskja 2025-05-12
if request_data and isinstance(request_data, dict) and "jsonrpc" in request_data and isinstance(request_data['jsonrpc'], float):
request_data['jsonrpc'] = str(request_data['jsonrpc'])

# Older apps may set the jsonrpc attribute to "1.0", but the library expects "2.0".
# Because there is no functional difference otherwise, and to maintain backwards compatibility
# we can just override it to "2.0" in those cases. -- albertskja 2025-05-12
version_overridden = False
if request_data and isinstance(request_data, dict) and "jsonrpc" in request_data and request_data['jsonrpc'] == "1.0":
request_data['jsonrpc'] = "2.0"
version_overridden = True

result_data = super().process_single_request(request_data=request_data, context=context)

# Put back the version "1.0" if it was previously overridden -- albertskja 2025-05-12
if version_overridden:
result_data.version = "1.0"

return result_data

def dumps_result(self, result: JsonResult) -> str:
# The old API backend included the "error" attribute set to None, even if the request was successful.
# Also, it included the "result" attribute set to None, even if the request failed.
# ModernRPC does not, so we need to add those back to maintain backwards compatibility -- albertskja 2025-05-12
result_json = json.loads(super().dumps_result(result))
if isinstance(result, JsonSuccessResult) and "error" not in result_json.keys():
result_json["error"] = None
if isinstance(result, JsonErrorResult) and "result" not in result_json.keys():
result_json["result"] = None
return json.dumps(result_json)
13 changes: 7 additions & 6 deletions alexia/api/urls.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from django.conf.urls import url
from django.urls import path

from .v1 import APIv1BrowserView, APIv1DocumentationView, api_v1_site
from .views import APIInfoView

from modernrpc.core import Protocol
from modernrpc.views import RPCEntryPoint

urlpatterns = [
url(r'^$', APIInfoView.as_view(), name='api'),
path('', APIInfoView.as_view(), name='api'),

url(r'^1/$', api_v1_site.dispatch, name='api_v1_mountpoint'),
url(r'^1/browse/$', APIv1BrowserView.as_view(), name='api_v1_browse'),
url(r'^1/doc/$', APIv1DocumentationView.as_view(), name='api_v1_doc'),
path('1/', RPCEntryPoint.as_view(protocol=Protocol.JSON_RPC, entry_point="v1"), name="jsonrpc_mountpoint"),
path('1/doc/', RPCEntryPoint.as_view(enable_doc=True, enable_rpc=False, template_name="api/v1/doc.html", entry_point="v1"), name="jsonrpc_docs"),
]
2 changes: 0 additions & 2 deletions alexia/api/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
from .config import * # NOQA
from .methods import * # NOQA
from .views import * # NOQA
4 changes: 0 additions & 4 deletions alexia/api/v1/config.py

This file was deleted.

152 changes: 102 additions & 50 deletions alexia/api/v1/methods/authorization.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,58 @@
from typing import List, Dict, Optional

from django.contrib.auth.models import User
from django.db import transaction
from django.utils import timezone
from jsonrpc import jsonrpc_method
from modernrpc.core import rpc_method, REQUEST_KEY

from alexia.api.decorators import manager_required
from alexia.api.exceptions import InvalidParamsError
from alexia.apps.billing.models import Authorization
from alexia.auth.backends import OIDC_BACKEND_NAME

from ..common import format_authorization
from ..config import api_v1_site


@jsonrpc_method('authorization.list(radius_username=String) -> Array', site=api_v1_site, authenticated=True, safe=True)
@rpc_method(name='authorization.list', entry_point='v1')
@manager_required
def authorization_list(request, radius_username=None):
def authorization_list(radius_username: Optional[str] = None, **kwargs) -> List[Dict]:
"""
Retrieve registered authorizations for the current selected organization.
**Signature**: `authorization.list(radius_username)`

**Arguments**:

- `radius_username` : `str` -- *(optional)* Username to search for.

**Return type**: List of `dict`

**Idempotent**: yes

Required user level: Manager
**Required user level**: Manager

**Documentation**:

Retrieve registered authorizations for the current selected organization.

Provide radius_username to select only authorizations of the provided user.

Returns an array of accounts of registered authorizations.

radius_username -- (optional) Username to search for.
**Example return value**:

Example return value:
[
{
[
{
"id": 1,
"end_date": null,
"start_date": "2014-09-21T14:16:06+00:00",
"user": "s0000000"
}
]
}
]

Raises error -32602 (Invalid params) if the username does not exist.
**Raises errors**:

- `-32602` (Invalid params) if the username does not exist.
"""
request = kwargs.get(REQUEST_KEY)
result = []
authorizations = Authorization.objects.filter(organization=request.organization)

Expand All @@ -58,30 +73,44 @@ def authorization_list(request, radius_username=None):
return result


@jsonrpc_method('authorization.get(radius_username=String) -> Array', site=api_v1_site, authenticated=True, safe=True)
@rpc_method(name='authorization.get', entry_point='v1')
@manager_required
def authorization_get(request, radius_username):
def authorization_get(radius_username: str, **kwargs) -> List[Dict]:
"""
**Signature**: `authorization.get(radius_username)`

**Arguments**:

- `radius_username` : `str` -- Username to search for.

**Return type**: `dict`

**Idempotent**: yes

**Required user level**: Manager

**Documentation**:

Retrieve registered authorizations for a specified user and current selected
organization.

Required user level: Manager

Returns an array of accounts of registered authorizations.

radius_username -- Username to search for.
**Example return value**:

Example return value:
[
{
[
{
"id": 1,
"end_date": null,
"start_date": "2014-09-21T14:16:06+00:00"
}
]
}
]

**Raises errors**:

Raises error -32602 (Invalid params) if the username does not exist.
- `-32602` (Invalid params) if the username does not exist.
"""
request = kwargs.get(REQUEST_KEY)
result = []

try:
Expand All @@ -102,33 +131,44 @@ def authorization_get(request, radius_username):
return result


@jsonrpc_method(
'authorization.add(radius_username=String, account=String) -> Object',
site=api_v1_site,
authenticated=True
)
@rpc_method(name='authorization.add', entry_point='v1')
@manager_required
@transaction.atomic
def authorization_add(request, radius_username, account):
def authorization_add(radius_username: str, account: str, **kwargs) -> Dict:
"""
Add a new authorization to the specified user.
**Signature**: `authorization.add(radius_username, account)`

**Arguments**:

- `radius_username` : `str` -- Username to search for.
- `account` : `str` -- Unused

**Return type**: `dict`

Required user level: Manager
**Idempotent**: no

**Required user level**: Manager

**Documentation**:

Add a new authorization to the specified user.

Returns the authorization on success.

radius_username -- Username to search for.
**Example return value**:

Example return value:
{
"id": 1,
"end_date": null,
"start_date": "2014-09-21T14:16:06+00:00",
"user": "s0000000"
}
{
"id": 1,
"end_date": null,
"start_date": "2014-09-21T14:16:06+00:00",
"user": "s0000000"
}

**Raises errors**:

Raises error -32602 (Invalid params) if the username does not exist.
- `-32602` (Invalid params) if the username does not exist.
"""
request = kwargs.get(REQUEST_KEY)
try:
user = User.objects.get(authenticationdata__backend=OIDC_BACKEND_NAME,
authenticationdata__username=radius_username)
Expand All @@ -141,24 +181,36 @@ def authorization_add(request, radius_username, account):
return format_authorization(authorization)


@jsonrpc_method('authorization.end(radius_username=String, authorization_id=Number) -> Boolean', site=api_v1_site,
authenticated=True)
@rpc_method(name='authorization.end', entry_point='v1')
@manager_required
@transaction.atomic
def authorization_end(request, radius_username, authorization_id):
def authorization_end(radius_username: str, authorization_id: int, **kwargs) -> bool:
"""
End an authorization from the specified user.
**Signature**: `authorization.end(radius_username, authorization_id)`

**Arguments**:

- `radius_username` : `str` -- Username to search for.
- `authorization_id` : `int` -- ID of the authorization to end

Required user level: Manager
**Return type**: `bool`

**Idempotent**: no

**Required user level**: Manager

**Documentation**:

End an authorization from the specified user.

Returns true when successful. Returns false when the authorization was already ended.

radius_username -- Username to search for.
identifier -- RFID card hardware identifier (max. 16 chars)
**Raises errors**:

Raises error -32602 (Invalid params) if the username does not exist.
Raises error -32602 (Invalid params) if provided authorization cannot be found.
- `-32602` (Invalid params) if the username does not exist.
- `-32602` (Invalid params) if provided authorization cannot be found.
"""
request = kwargs.get(REQUEST_KEY)
try:
user = User.objects.get(authenticationdata__backend=OIDC_BACKEND_NAME,
authenticationdata__username=radius_username)
Expand Down
Loading