Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "logstash/util/socket_peer"
require "logstash-input-tcp_jars"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/port_management_support'

require "socket"
require "openssl"
Expand Down Expand Up @@ -68,6 +69,8 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
# ecs_compatibility option, provided by Logstash core or the support adapter.
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)

include LogStash::PluginMixins::PortManagementSupport

config_name "tcp"

default :codec, "line"
Expand Down Expand Up @@ -177,15 +180,22 @@ def register
validate_ssl_config!

if server?
@loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
@port_reservation = port_management.reserve(addr: @host, port: @port) do |reserved_addr, reserved_port|
# we create the loop for the *requested* host addr, because the *reserved* addr
# may be reported overly-broad (e.g, ipv4-only `0.0.0.0` can become ipv6-tolerating `::` depending on arch)
@loop = InputLoop.new(@id, @host, reserved_port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
end
Comment on lines 183 to 187
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to use blocks here since we're not wrapping behavior:

Suggested change
@port_reservation = port_management.reserve(addr: @host, port: @port) do |reserved_addr, reserved_port|
@loop = InputLoop.new(@id, reserved_addr, reserved_port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
end
port_management.reserve(port: @port) # if this succeeds, there is a reservation for the port
@loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)

Also we should set the reservation scope for the port alone. Not sure if it's worth differentiating the addr, we can be conservative here and allow the port to be reserved regardless of the addr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're not wrapping behavior

We are.

PortManagementSupport::Reservation#initialize releases the reservation if an exception is raised by the block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we should set the reservation scope for the port alone. Not sure if it's worth differentiating the addr, we can be conservative here and allow the port to be reserved regardless of the addr.

In order to spawn the server that effectively holds the reservation, we need to know the addr to bind to, so really we are reserving an addr:port pair, not just the port.

end
end

def run(output_queue)
@output_queue = output_queue
if server?
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}", :ssl_enabled => @ssl_enabled)
@loop.run
@port_reservation.convert do |_, reserved_port|
@logger.info("Starting tcp input listener", :address => "#{@host}:#{reserved_port}", :ssl_enabled => @ssl_enabled)
@loop.start
end
@loop.wait_until_closed
else
run_client()
end
Expand Down
1 change: 1 addition & 0 deletions logstash-input-tcp.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Gem::Specification.new do |s|
# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.2'
s.add_runtime_dependency 'logstash-mixin-port_management_support', '~>1.0'

s.add_runtime_dependency 'logstash-core', '>= 8.1.0'

Expand Down
38 changes: 27 additions & 11 deletions spec/inputs/tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,27 @@
#Cabin::Channel.get(LogStash).level = :debug
describe LogStash::Inputs::Tcp, :ecs_compatibility_support do

def get_port
begin
# Start high to better avoid common services
port = rand(10000..65535)
s = TCPServer.new("127.0.0.1", port)
s.close
return port
rescue Errno::EADDRINUSE
retry
end
##
# yield the block with a port that is available
# @return [Integer]: a port that is available
def find_available_port(host)
with_bound_port(host: host, &:itself)
end

##
# Yields block with a port that is unavailable
# @yieldparam port [Integer]
# @yieldreturn [Object]
# @return [Object]
def with_bound_port(host:"::", port:0, &block)
server = TCPServer.new(host, port)

return yield(server.local_address.ip_port)
ensure
server.close
end

let(:port) { get_port }
let(:port) { find_available_port("127.0.0.1") }

context "codec (PR #1372)" do
it "switches from plain to line" do
Expand Down Expand Up @@ -373,6 +381,14 @@ def get_port
expect { subject.register }.to_not raise_error
end

context "when the port is unavailable" do
it 'raises a helpful exception' do
with_bound_port(host: "127.0.0.1", port: port) do |unavailable_port|
expect { subject.register }.to raise_error(Errno::EADDRINUSE)
end
end
end

context "when using ssl" do
let(:config) do
{
Expand Down
28 changes: 24 additions & 4 deletions src/main/java/org/logstash/tcp/InputLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Plain TCP Server Implementation.
*/
public final class InputLoop implements Runnable, Closeable {
public final class InputLoop implements Closeable {

// historically this class was passing around the plugin's logger
private static final Logger logger = LogManager.getLogger("logstash.inputs.tcp");
Expand All @@ -46,6 +46,11 @@ public final class InputLoop implements Runnable, Closeable {
*/
private final ServerBootstrap serverBootstrap;

/**
* The channel after starting
*/
private volatile Channel channel;

/**
* SSL configuration.
*/
Expand Down Expand Up @@ -82,15 +87,30 @@ public InputLoop(final String id, final String host, final int port, final Decod
.childHandler(new InputLoop.InputHandler(decoder, sslContext));
}

@Override
public void run() {
public synchronized void start() {
if (channel != null) {
throw new IllegalStateException("Already started");
}
try {
serverBootstrap.bind(host, port).sync().channel().closeFuture().sync();
channel = serverBootstrap.bind(host, port).sync().channel();
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}

public void waitUntilClosed() {
synchronized (this) {
if (channel == null) {
throw new IllegalStateException("Not started");
}
}
try {
channel.closeFuture().sync();
}catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}

@Override
public void close() {
try {
Expand Down