Skip to content

Commit 5227280

Browse files
authored
Merge branch 'grpc:master' into error_details
2 parents e946cbf + 164d14f commit 5227280

File tree

6 files changed

+102
-38
lines changed

6 files changed

+102
-38
lines changed

packages/grpc-js/src/channel-options.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export interface ChannelOptions {
3636
'grpc.max_send_message_length'?: number;
3737
'grpc.max_receive_message_length'?: number;
3838
'grpc.enable_http_proxy'?: number;
39+
'grpc.http_proxy'?: string;
3940
/* http_connect_target and http_connect_creds are used for passing data
4041
* around internally, and should not be documented as public-facing options
4142
*/

packages/grpc-js/src/http_proxy.ts

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,30 +41,36 @@ interface ProxyInfo {
4141
creds?: string;
4242
}
4343

44-
function getProxyInfo(): ProxyInfo {
45-
let proxyEnv = '';
44+
function getProxyInfo(options: ChannelOptions): ProxyInfo {
45+
let proxyUrlString = '';
4646
let envVar = '';
47-
/* Prefer using 'grpc_proxy'. Fallback on 'http_proxy' if it is not set.
47+
/* Prefer using 'grpc.http_proxy' option. Fallback on 'grpc_proxy' env var if it is not set.
4848
* Also prefer using 'https_proxy' with fallback on 'http_proxy'. The
4949
* fallback behavior can be removed if there's a demand for it.
5050
*/
51-
if (process.env.grpc_proxy) {
51+
if (options['grpc.http_proxy']) {
52+
proxyUrlString = options['grpc.http_proxy'];
53+
} else if (process.env.grpc_proxy) {
5254
envVar = 'grpc_proxy';
53-
proxyEnv = process.env.grpc_proxy;
55+
proxyUrlString = process.env.grpc_proxy;
5456
} else if (process.env.https_proxy) {
5557
envVar = 'https_proxy';
56-
proxyEnv = process.env.https_proxy;
58+
proxyUrlString = process.env.https_proxy;
5759
} else if (process.env.http_proxy) {
5860
envVar = 'http_proxy';
59-
proxyEnv = process.env.http_proxy;
61+
proxyUrlString = process.env.http_proxy;
6062
} else {
6163
return {};
6264
}
6365
let proxyUrl: URL;
6466
try {
65-
proxyUrl = new URL(proxyEnv);
67+
proxyUrl = new URL(proxyUrlString);
6668
} catch (e) {
67-
log(LogVerbosity.ERROR, `cannot parse value of "${envVar}" env var`);
69+
if (envVar) {
70+
log(LogVerbosity.ERROR, `cannot parse value of "${envVar}" env var`);
71+
} else {
72+
log(LogVerbosity.ERROR, `cannot parse value of "grpc.http_proxy" channel option`);
73+
}
6874
return {};
6975
}
7076
if (proxyUrl.protocol !== 'http:') {
@@ -97,9 +103,15 @@ function getProxyInfo(): ProxyInfo {
97103
if (userCred) {
98104
result.creds = userCred;
99105
}
100-
trace(
101-
'Proxy server ' + result.address + ' set by environment variable ' + envVar
102-
);
106+
if (envVar) {
107+
trace(
108+
'Proxy server ' + result.address + ' set by environment variable ' + envVar
109+
);
110+
} else {
111+
trace(
112+
'Proxy server ' + result.address + ' set by channel option grpc.http_proxy'
113+
);
114+
}
103115
return result;
104116
}
105117

@@ -190,7 +202,7 @@ export function mapProxyName(
190202
if (target.scheme === 'unix') {
191203
return noProxyResult;
192204
}
193-
const proxyInfo = getProxyInfo();
205+
const proxyInfo = getProxyInfo(options);
194206
if (!proxyInfo.address) {
195207
return noProxyResult;
196208
}

packages/grpc-js/src/retrying-call.ts

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -760,11 +760,10 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
760760
this.maybeStartHedgingTimer();
761761
}
762762

763-
private handleChildWriteCompleted(childIndex: number) {
764-
const childCall = this.underlyingCalls[childIndex];
765-
const messageIndex = childCall.nextMessageToSend;
763+
private handleChildWriteCompleted(childIndex: number, messageIndex: number) {
766764
this.getBufferEntry(messageIndex).callback?.();
767765
this.clearSentMessages();
766+
const childCall = this.underlyingCalls[childIndex];
768767
childCall.nextMessageToSend += 1;
769768
this.sendNextChildMessage(childIndex);
770769
}
@@ -774,19 +773,33 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
774773
if (childCall.state === 'COMPLETED') {
775774
return;
776775
}
777-
if (this.getBufferEntry(childCall.nextMessageToSend)) {
778-
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
776+
const messageIndex = childCall.nextMessageToSend;
777+
if (this.getBufferEntry(messageIndex)) {
778+
const bufferEntry = this.getBufferEntry(messageIndex);
779779
switch (bufferEntry.entryType) {
780780
case 'MESSAGE':
781781
childCall.call.sendMessageWithContext(
782782
{
783783
callback: error => {
784784
// Ignore error
785-
this.handleChildWriteCompleted(childIndex);
785+
this.handleChildWriteCompleted(childIndex, messageIndex);
786786
},
787787
},
788788
bufferEntry.message!.message
789789
);
790+
// Optimization: if the next entry is HALF_CLOSE, send it immediately
791+
// without waiting for the message callback. This is safe because the message
792+
// has already been passed to the underlying transport.
793+
const nextEntry = this.getBufferEntry(messageIndex + 1);
794+
if (nextEntry.entryType === 'HALF_CLOSE') {
795+
this.trace(
796+
'Sending halfClose immediately after message to child [' +
797+
childCall.call.getCallNumber() +
798+
'] - optimizing for unary/final message'
799+
);
800+
childCall.nextMessageToSend += 1;
801+
childCall.call.halfClose();
802+
}
790803
break;
791804
case 'HALF_CLOSE':
792805
childCall.nextMessageToSend += 1;
@@ -813,7 +826,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
813826
};
814827
this.writeBuffer.push(bufferEntry);
815828
if (bufferEntry.allocated) {
816-
context.callback?.();
829+
// Run this in next tick to avoid suspending the current execution context
830+
// otherwise it might cause half closing the call before sending message
831+
process.nextTick(() => {
832+
context.callback?.();
833+
});
817834
for (const [callIndex, call] of this.underlyingCalls.entries()) {
818835
if (
819836
call.state === 'ACTIVE' &&
@@ -823,7 +840,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
823840
{
824841
callback: error => {
825842
// Ignore error
826-
this.handleChildWriteCompleted(callIndex);
843+
this.handleChildWriteCompleted(callIndex, messageIndex);
827844
},
828845
},
829846
message
@@ -843,7 +860,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
843860
{
844861
callback: error => {
845862
// Ignore error
846-
this.handleChildWriteCompleted(this.committedCallIndex!);
863+
this.handleChildWriteCompleted(this.committedCallIndex!, messageIndex);
847864
},
848865
},
849866
message
@@ -868,12 +885,21 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
868885
allocated: false,
869886
});
870887
for (const call of this.underlyingCalls) {
871-
if (
872-
call?.state === 'ACTIVE' &&
873-
call.nextMessageToSend === halfCloseIndex
874-
) {
875-
call.nextMessageToSend += 1;
876-
call.call.halfClose();
888+
if (call?.state === 'ACTIVE') {
889+
// Send halfClose to call when either:
890+
// - nextMessageToSend === halfCloseIndex - 1: last message sent, callback pending (optimization)
891+
// - nextMessageToSend === halfCloseIndex: all messages sent and acknowledged
892+
if (call.nextMessageToSend === halfCloseIndex
893+
|| call.nextMessageToSend === halfCloseIndex - 1) {
894+
this.trace(
895+
'Sending halfClose immediately to child [' +
896+
call.call.getCallNumber() +
897+
'] - all messages already sent'
898+
);
899+
call.nextMessageToSend += 1;
900+
call.call.halfClose();
901+
}
902+
// Otherwise, halfClose will be sent by sendNextChildMessage when message callbacks complete
877903
}
878904
}
879905
}
@@ -895,4 +921,4 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
895921
return null;
896922
}
897923
}
898-
}
924+
}

packages/grpc-js/src/server.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1609,7 +1609,7 @@ export class Server {
16091609
if (err) {
16101610
this.keepaliveTrace('Ping failed with error: ' + err.message);
16111611
sessionClosedByServer = true;
1612-
session.close();
1612+
session.destroy();
16131613
} else {
16141614
this.keepaliveTrace('Received ping response');
16151615
maybeStartKeepalivePingTimer();
@@ -1631,7 +1631,7 @@ export class Server {
16311631
'Connection dropped due to ping send error: ' + pingSendError
16321632
);
16331633
sessionClosedByServer = true;
1634-
session.close();
1634+
session.destroy();
16351635
return;
16361636
}
16371637

@@ -1640,7 +1640,7 @@ export class Server {
16401640
this.keepaliveTrace('Ping timeout passed without response');
16411641
this.trace('Connection dropped by keepalive timeout');
16421642
sessionClosedByServer = true;
1643-
session.close();
1643+
session.destroy();
16441644
}, this.keepaliveTimeoutMs);
16451645
keepaliveTimer.unref?.();
16461646
};
@@ -1803,7 +1803,7 @@ export class Server {
18031803
duration
18041804
);
18051805
sessionClosedByServer = true;
1806-
session.close();
1806+
session.destroy();
18071807
} else {
18081808
this.keepaliveTrace('Received ping response');
18091809
maybeStartKeepalivePingTimer();
@@ -1826,7 +1826,7 @@ export class Server {
18261826
'Connection dropped due to ping send error: ' + pingSendError
18271827
);
18281828
sessionClosedByServer = true;
1829-
session.close();
1829+
session.destroy();
18301830
return;
18311831
}
18321832

@@ -1840,7 +1840,7 @@ export class Server {
18401840
'Connection dropped by keepalive timeout from ' + clientAddress
18411841
);
18421842
sessionClosedByServer = true;
1843-
session.close();
1843+
session.destroy();
18441844
}, this.keepaliveTimeoutMs);
18451845
keepaliveTimeout.unref?.();
18461846
};

packages/grpc-js/test/test-end-to-end.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import * as assert from 'assert';
1919
import * as path from 'path';
2020
import { loadProtoFile } from './common';
21-
import { Metadata, Server, ServerDuplexStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, experimental, sendUnaryData } from '../src';
21+
import { Metadata, Server, ServerCredentials, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, credentials, experimental, sendUnaryData } from '../src';
2222
import { ServiceClient } from '../src/make-client';
2323

2424
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
@@ -36,6 +36,15 @@ const echoServiceImplementation = {
3636
call.end();
3737
});
3838
},
39+
echoClientStream(call: ServerReadableStream<any, any>, callback: sendUnaryData<any>) {
40+
const messages: any[] = [];
41+
call.on('data', (message: any) => {
42+
messages.push(message);
43+
});
44+
call.on('end', () => {
45+
callback(null, { value: messages.map(m => m.value).join(','), value2: messages.length });
46+
});
47+
},
3948
};
4049

4150
describe('Client should successfully communicate with server', () => {
@@ -77,4 +86,20 @@ describe('Client should successfully communicate with server', () => {
7786
});
7887
});
7988
}).timeout(5000);
89+
90+
it('Client streaming with one message should work', done => {
91+
server = new Server();
92+
server.addService(EchoService.service, echoServiceImplementation);
93+
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (error, port) => {
94+
assert.ifError(error);
95+
client = new EchoService(`localhost:${port}`, credentials.createInsecure());
96+
const call = client.echoClientStream((error: ServiceError, response: any) => {
97+
assert.ifError(error);
98+
assert.deepStrictEqual(response, { value: 'test value', value2: 1 });
99+
done();
100+
});
101+
call.write({ value: 'test value', value2: 42 });
102+
call.end();
103+
});
104+
});
80105
});

packages/grpc-tools/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "grpc-tools",
3-
"version": "1.13.0",
3+
"version": "1.13.1",
44
"author": "Google Inc.",
55
"description": "Tools for developing with gRPC on Node.js",
66
"homepage": "https://grpc.io/",
@@ -24,7 +24,7 @@
2424
"prepublishOnly": "git submodule update --init --recursive && node copy_well_known_protos.js"
2525
},
2626
"dependencies": {
27-
"@mapbox/node-pre-gyp": "^1.0.5"
27+
"@mapbox/node-pre-gyp": "^2.0.0"
2828
},
2929
"binary": {
3030
"module_name": "grpc_tools",

0 commit comments

Comments
 (0)