Skip to content

Commit 68ee217

Browse files
committed
Merge branch 'RTDE-Driver' of https://github.com/Harrier-SA/amrc-connectivity-stack into RTDE-Driver
2 parents 4bb3236 + 7125040 commit 68ee217

File tree

2 files changed

+76
-54
lines changed

2 files changed

+76
-54
lines changed

acs-edge/lib/devices/MTConnect.ts

Lines changed: 75 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ export class MTConnectConnection extends (
1818
#connDetails: restConnDetails
1919
#cancelToken: CancelTokenStatic;
2020
#source: CancelTokenSource
21+
#nextSequence: number | null = null;
22+
#pollInterval: NodeJS.Timeout | null = null;
2123

2224
constructor(type: string, connDetails: restConnDetails) {
2325
super(type, connDetails);
@@ -38,66 +40,79 @@ export class MTConnectConnection extends (
3840
}
3941

4042
/**
41-
* Opens http connection stream and emits data response
42-
* @param pollInt Connection Polling interval
43+
* Polls MTConnect endpoints at regular intervals
44+
* @param metrics Metrics object containing endpoint addresses
45+
* @param interval Polling interval in milliseconds
4346
*/
4447
async sample(metrics: Metrics, interval: number) {
45-
let res = await this.get(
46-
`/sample?interval=0&heartbeat=1000`,
47-
{
48-
responseType: "stream",
49-
cancelToken: this.#source.token
50-
}
51-
);
52-
let headersCompleted = false;
53-
let bodyCompleted = false;
54-
let buffer = Buffer.alloc(0);
55-
let receivedBodyChunk = 0;
56-
let contentLength = 0;
57-
const pass = new PassThrough();
58-
pass.on('data', (chunk) => {
59-
if (!headersCompleted) {
60-
const headers = chunk.toString().split(/\r?\n/);
61-
if (headers.length >= 3) {
62-
try {
63-
contentLength = parseInt(headers[2].split(":")[1]);
64-
if (!isNaN(contentLength)) {
65-
buffer = Buffer.alloc(contentLength);
66-
headersCompleted = true;
67-
}
68-
} catch (e) {
69-
console.log(e);
70-
}
48+
// Get all unique addresses
49+
const endpoints = [...new Set(metrics.addresses)];
50+
51+
log(`[MTConnect] Starting polling of ${endpoints.length} endpoint(s) every ${interval}ms`);
52+
53+
// Function to fetch data from a specific endpoint
54+
const pollEndpoint = async (endpoint: string) => {
55+
try {
56+
// Build URL with sequence parameter if available
57+
let url = endpoint;
58+
59+
// Add sequence parameter for /sample endpoints
60+
if (endpoint.includes('/sample') && this.#nextSequence) {
61+
url = endpoint.includes('?')
62+
? `${endpoint}&from=${this.#nextSequence}`
63+
: `${endpoint}?from=${this.#nextSequence}`;
7164
}
72-
}
73-
if (receivedBodyChunk < contentLength || isNaN(contentLength)) {
74-
const contentStartOffset = Math.max(chunk.indexOf("<?xml", 0, 'utf8'), 0);
75-
try {
76-
chunk.copy(buffer, receivedBodyChunk, contentStartOffset, chunk.byteLength);
77-
} catch (err) {
78-
console.log(err);
65+
66+
log(`[MTConnect] Polling: ${url}`);
67+
68+
let res = await this.get(url, {
69+
cancelToken: this.#source.token
70+
});
71+
72+
// Convert response to string
73+
const xmlData = typeof res.data === 'string'
74+
? res.data
75+
: res.data.toString();
76+
77+
// Extract and update nextSequence for future requests
78+
const nextSeqMatch = xmlData.match(/nextSequence="(\d+)"/);
79+
if (nextSeqMatch) {
80+
this.#nextSequence = parseInt(nextSeqMatch[1]);
81+
log(`[MTConnect] Updated sequence to: ${this.#nextSequence}`);
7982
}
80-
receivedBodyChunk += chunk.byteLength - contentStartOffset;
8183

82-
if (receivedBodyChunk === contentLength) {
83-
bodyCompleted = true;
84+
// Emit data only for this specific endpoint
85+
let obj: any = {};
86+
obj[endpoint] = Buffer.from(xmlData);
87+
this.emit('data', obj);
88+
89+
} catch (err: any) {
90+
// Log detailed error information including the problematic address
91+
if (err.response) {
92+
// HTTP error (4xx, 5xx)
93+
log(`[MTConnect] ERROR - Invalid address or HTTP error for endpoint: ${endpoint}`);
94+
log(`[MTConnect] HTTP Status: ${err.response.status} ${err.response.statusText}`);
95+
log(`[MTConnect] Response: ${JSON.stringify(err.response.data)}`);
96+
} else if (err.request) {
97+
// Request was made but no response received (network error, timeout, etc.)
98+
log(`[MTConnect] ERROR - No response from endpoint: ${endpoint}`);
99+
log(`[MTConnect] Possible causes: incorrect hostname/IP, network issue, or service not running`);
100+
log(`[MTConnect] Details: ${err.message}`);
101+
} else {
102+
// Something else went wrong
103+
log(`[MTConnect] ERROR - Failed to poll endpoint: ${endpoint}`);
104+
log(`[MTConnect] Error: ${err.message}`);
84105
}
85106
}
86-
if (bodyCompleted) {
87-
// this.emit("asyncData", buffer);
88-
let payload = Buffer.from(buffer);
89-
metrics.addresses.forEach(addr => {
90-
let obj: any = {};
91-
obj[addr] = payload;
92-
this.emit('data', obj);
93-
})
94-
headersCompleted = false;
95-
bodyCompleted = false;
96-
buffer = Buffer.alloc(0);
97-
receivedBodyChunk = 0;
98-
}
99-
});
100-
res.data.pipe(pass);
107+
};
108+
109+
// Poll all endpoints immediately on start
110+
await Promise.all(endpoints.map(endpoint => pollEndpoint(endpoint)));
111+
112+
// Then poll all endpoints at regular intervals
113+
this.#pollInterval = setInterval(async () => {
114+
await Promise.all(endpoints.map(endpoint => pollEndpoint(endpoint)));
115+
}, interval);
101116
}
102117

103118
open() {
@@ -107,6 +122,13 @@ export class MTConnectConnection extends (
107122

108123
close(): void {
109124
log("Closing MTConnectConnection");
125+
126+
// Clear the polling interval
127+
if (this.#pollInterval) {
128+
clearInterval(this.#pollInterval);
129+
this.#pollInterval = null;
130+
}
131+
110132
this.#source.cancel("Stream Close Requested");
111133
this.emit("close");
112134
}

acs-edge/package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)