Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions src/main/java/cz/smarteon/loxone/LoxoneWebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class LoxoneWebSocket {
private CountDownLatch authSeqLatch;
private CountDownLatch visuLatch;

private SyncCommandGuard<?> syncCommandGuard;

private int authTimeoutSeconds = 3;
private int visuTimeoutSeconds = 3;
private int retries = 5;
Expand Down Expand Up @@ -115,7 +117,7 @@ public void registerListener(@NotNull final LoxoneEventListener listener) {
eventListeners.add(listener);
}

public void sendCommand(@NotNull final Command<?> command) {
public synchronized void sendCommand(@NotNull final Command<?> command) {
requireNonNull(command, "command can't be null");
if (command.isWsSupported()) {
sendWithRetry(command, retries);
Expand All @@ -124,10 +126,27 @@ public void sendCommand(@NotNull final Command<?> command) {
}
}

public void sendSecureCommand(@NotNull final ControlCommand<?> command) {
public synchronized void sendSecureCommand(@NotNull final ControlCommand<?> command) {
sendSecureWithRetry(command, retries);
}

@SuppressWarnings("unchecked")
public synchronized <T> T commandRequest(@NotNull final Command<T> command) {
requireNonNull(command, "command can't be null");
if (command.isWsSupported()) {
try {
syncCommandGuard = new SyncCommandGuard<>(command);
sendWithRetry(command, retries);

return (T) syncCommandGuard.waitForResponse(retries * authTimeoutSeconds);
} finally {
syncCommandGuard = null;
}
} else {
throw new IllegalArgumentException("Only websocket commands are supported");
}
}

public void close() {
scheduler.shutdownNow();
closeWebSocket();
Expand Down Expand Up @@ -328,7 +347,7 @@ void sendInternal(final Command<?> command) {
LOG.debug("Sending websocket message: " + command.getCommand());
webSocketClient.send(command.getCommand());
// KEEP_ALIVE command has no response at all
if (!KEEP_ALIVE.getCommand().equals(command.getCommand())) {
if (!KEEP_ALIVE.getCommand().equals(command.getCommand()) && syncCommandGuard == null) {
commands.add(command);
}
}
Expand All @@ -339,7 +358,12 @@ void sendInternal(final Command<?> command) {
*/
void processMessage(final String message) {
try {
final Command<?> command = commands.remove();
Command<?> command;
if (syncCommandGuard != null) {
command = syncCommandGuard.getCommand();
} else {
command= commands.remove();
}
if (!Void.class.equals(command.getResponseType())) {
final Object parsedMessage = Codec.readMessage(message, command.getResponseType());
if (parsedMessage instanceof LoxoneMessage) {
Expand Down Expand Up @@ -467,26 +491,29 @@ private boolean checkLoxoneMessage(final Command<?> command, final LoxoneMessage

@SuppressWarnings("unchecked")
private void processCommand(final Command<?> command, final Object message, final boolean isError) {
CommandResponseListener.State commandState = CommandResponseListener.State.IGNORED;
final Iterator<CommandResponseListener<?>> listeners = commandResponseListeners.iterator();
while (listeners.hasNext() && commandState != CommandResponseListener.State.CONSUMED) {
@SuppressWarnings("rawtypes")
final CommandResponseListener next = listeners.next();
if (isError && next instanceof LoxoneMessageCommandResponseListener) {
if (((LoxoneMessageCommandResponseListener) next).acceptsErrorResponses()) {
if (syncCommandGuard != null) {
syncCommandGuard.receive(message);
} else {
CommandResponseListener.State commandState = CommandResponseListener.State.IGNORED;
final Iterator<CommandResponseListener<?>> listeners = commandResponseListeners.iterator();
while (listeners.hasNext() && commandState != CommandResponseListener.State.CONSUMED) {
@SuppressWarnings("rawtypes") final CommandResponseListener next = listeners.next();
if (isError && next instanceof LoxoneMessageCommandResponseListener) {
if (((LoxoneMessageCommandResponseListener) next).acceptsErrorResponses()) {
commandState = commandState.fold(next.onCommand(command, message));
}
} else if (next.accepts(message.getClass())) {
commandState = commandState.fold(next.onCommand(command, message));
}
} else if (next.accepts(message.getClass())) {
commandState = commandState.fold(next.onCommand(command, message));
}
}

if (commandState == CommandResponseListener.State.IGNORED) {
LOG.warn("No command listener registered, ignoring command=" + command);
}
if (commandState == CommandResponseListener.State.IGNORED) {
LOG.warn("No command listener registered, ignoring command=" + command);
}

if (command != null && command.getCommand().startsWith(C_SYS_ENC)) {
LOG.warn("Encrypted message receive is not supported");
if (command != null && command.getCommand().startsWith(C_SYS_ENC)) {
LOG.warn("Encrypted message receive is not supported");
}
}
}

Expand Down
56 changes: 56 additions & 0 deletions src/main/java/cz/smarteon/loxone/SyncCommandGuard.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cz.smarteon.loxone;

import cz.smarteon.loxone.message.LoxoneMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

class SyncCommandGuard<T> {

private static final Logger LOG = LoggerFactory.getLogger(SyncCommandGuard.class);

private final CountDownLatch latch;

private final Command<T> command;

private Object response;

SyncCommandGuard(final Command<T> command) {
this.command = command;
latch = new CountDownLatch(1);
}

@SuppressWarnings("unchecked")
T waitForResponse(int seconds) {
try {
if (latch.await(seconds, TimeUnit.SECONDS)) {
try {
return (T) response;
} catch (ClassCastException cce) {
if (response instanceof LoxoneMessage<?>) {
LoxoneMessage<?> error = (LoxoneMessage<?>) response;
throw new LoxoneException("Error received of " + error.getControl() + " code " + error.getCode());
} else {
throw new LoxoneException("Unrecognizable error received to " + command.getCommand());
}
}
} else {
throw new LoxoneException("Timeout waiting for sync command response " + command.getCommand());
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for sync command request completion", e);
throw new LoxoneException("Interrupted while waiting for sync command request completion");
}
}

void receive(final Object response) {
this.response = response;
latch.countDown();
}

Command<T> getCommand() {
return command;
}
}
17 changes: 16 additions & 1 deletion src/test/kotlin/LoxoneAT.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cz.smarteon.loxone

import cz.smarteon.loxone.app.SwitchControl
import cz.smarteon.loxone.message.ControlCommand
import cz.smarteon.loxone.message.JsonValue
import cz.smarteon.loxone.message.LoxoneMessage
import io.mockk.every
Expand Down Expand Up @@ -101,6 +102,20 @@ class LoxoneAT {

@Test
@Order(4)
fun `should pulse on switch sync`() {
val response = device?.let { device ->
loxone.webSocket().commandRequest(ControlCommand.genericControlCommand(device.uuid.toString(), "Pulse"))
}

expectThat(response){
isA<LoxoneMessage<*>>()
.get { value }.isA<JsonValue>()
.get { jsonNode.textValue() }.isEqualTo("1")
}
}

@Test
@Order(5)
fun `should pulse on secured switch`() {
val latch = commands.expectCommand(".*${secDevice?.uuid}/Pulse")
secDevice?.let {secDevice -> loxone.sendControlPulse(secDevice) }
Expand All @@ -119,7 +134,7 @@ class LoxoneAT {
}

@Test
@Order(5)
@Order(6)
fun `should refresh token`() {
val evaluator = mockk<TokenStateEvaluator> {
every { evaluate(any()) } answers { mockk {
Expand Down