Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ group :development, :test do
gem 'rubocop'
gem 'yard'
gem 'rubocop-git'
gem 'aws-sdk-sns'
end
20 changes: 19 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ PATH
faraday_middleware (~> 1.0)
logging (~> 2.3.0)
mime-types
nokogiri (= 1.15.7)
nokogiri (>= 1.13.0)
oj (~> 3.11)
ox (~> 2.14)
typhoeus (~> 1.4.0)
Expand Down Expand Up @@ -89,6 +89,21 @@ GEM
public_suffix (>= 2.0.2, < 5.0)
amq-protocol (2.3.2)
ast (2.4.2)
aws-eventstream (1.3.2)
aws-partitions (1.1098.0)
aws-sdk-core (3.223.0)
aws-eventstream (~> 1, >= 1.3.0)
aws-partitions (~> 1, >= 1.992.0)
aws-sigv4 (~> 1.9)
base64
jmespath (~> 1, >= 1.6.1)
logger
aws-sdk-sns (1.98.0)
aws-sdk-core (~> 3, >= 3.216.0)
aws-sigv4 (~> 1.5)
aws-sigv4 (1.11.0)
aws-eventstream (~> 1, >= 1.0.2)
base64 (0.2.0)
bson (4.12.1)
builder (3.2.4)
bunny (2.19.0)
Expand Down Expand Up @@ -179,7 +194,9 @@ GEM
i18n (1.8.10)
concurrent-ruby (~> 1.0)
ice_nine (0.11.2)
jmespath (1.6.2)
little-plugger (1.1.4)
logger (1.7.0)
logging (2.3.0)
little-plugger (~> 1.1)
multi_json (~> 1.14)
Expand Down Expand Up @@ -327,6 +344,7 @@ PLATFORMS
ruby

DEPENDENCIES
aws-sdk-sns
database_cleaner
event_source!
faker
Expand Down
1 change: 1 addition & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
require 'event_source/async_api/async_api'
require 'event_source/railtie' if defined?(Rails)
require 'event_source/configure'
require 'event_source/protocols'
require 'event_source/connection'
require 'event_source/connection_manager'
require 'event_source/channel'
Expand Down
1 change: 0 additions & 1 deletion lib/event_source/async_api/async_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ module EventSource
#
# See {Protocols} for the data communication protocols EventSource supports.
module AsyncApi
require 'event_source/uris/amqp_uri'
require_relative 'error'
require_relative 'types'
require_relative 'external_documentation'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require "event_source/protocols/http/contracts/publish_operation_bindings_contract"
require "event_source/protocols/amqp/contracts/publish_operation_binding_contract"
require "event_source/protocols/sns/contracts/publish_operation_binding_contract"

module EventSource
module AsyncApi
Expand All @@ -11,6 +12,21 @@ class PublishOperationBindingsContract < Contract
params do
optional(:http).hash
optional(:amqp).hash
optional(:sns).hash
end

rule(:sns) do
if key? && value
validation_result = ::EventSource::Protocols::Sns::Contracts::PublishOperationBindingsContract.new.call(value)
if validation_result&.failure?
key.failure(
text: 'invalid operation bindings',
error: validation_result.errors.to_h
)
else
values.data.merge({ sns: validation_result.values })
end
end
end

rule(:http) do
Expand Down
2 changes: 2 additions & 0 deletions lib/event_source/async_api/publish_bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "event_source/protocols/http/publish_bindings"
require "event_source/protocols/amqp/types"
require "event_source/protocols/amqp/publish_bindings"
require "event_source/protocols/sns/publish_bindings"

module EventSource
module AsyncApi
Expand All @@ -12,6 +13,7 @@ class PublishBindings < Dry::Struct
transform_keys(&:to_sym)
attribute :http, ::EventSource::Protocols::Http::PublishBindings.meta(omittable: true)
attribute :amqp, Types::Hash.meta(omittable: true)
attribute :sns, ::EventSource::Protocols::Sns::PublishBindings.meta(omittable: true)
attribute :x_amqp_exchange_to_exchanges, Types::Hash.meta(omittable: true)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/event_source/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def add_publish_operation(async_api_channel_item)
@channel_proxy.add_publish_operation(async_api_channel_item)
return false unless publish_proxy

@channel_proxy.create_exchange_to_exchange_bindings(publish_proxy) if @connection.protocol == :amqp
@channel_proxy.publish_proxy_created(publish_proxy)
operation_id = async_api_channel_item.publish.operationId

logger.info "Adding Publish Operation: #{operation_id}"
Expand Down
4 changes: 4 additions & 0 deletions lib/event_source/configure/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def initialize
def load_protocols
@protocols.each do |protocol|
require "event_source/protocols/#{protocol}_protocol"
protocol_module = EventSource::Protocols.const_get(protocol.camelize)
EventSource::Protocols::Registry.register_protocol(
protocol_module
)
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/event_source/configure/contracts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require_relative "contracts/client_certificate_settings_contract"
require_relative "contracts/soap_settings_contract"
require_relative "contracts/http_configuration_contract"
require_relative "contracts/sns_configuration_contract"

module EventSource
module Configure
Expand Down
17 changes: 17 additions & 0 deletions lib/event_source/configure/contracts/sns_configuration_contract.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module EventSource
module Configure
module Contracts
# Contract for SNS configuration.
class SnsConfigurationContract < Dry::Validation::Contract
params do
required(:url).value(:string)
required(:region).value(:string)
required(:access_key_id).value(:string)
required(:secret_access_key).value(:string)
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def validate_configurations(configs)
def validate_configuration(sc)
params_as_hash = sc.to_h
case sc
when ::EventSource::Configure::SnsConfiguration
result = ::EventSource::Configure::Contracts::SnsConfigurationContract.new.call(params_as_hash)
return Success(sc) if result.success?
Failure([sc, result.errors])
when ::EventSource::Configure::HttpConfiguration
result = ::EventSource::Configure::Contracts::HttpConfigurationContract.new.call(params_as_hash)
return Success(sc) if result.success?
Expand Down
14 changes: 14 additions & 0 deletions lib/event_source/configure/servers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ def to_h
end
end

SnsConfiguration = Struct.new(:protocol, :url, :call_location, :region, :access_key_id, :secret_access_key) do
def to_h
attribute_hash = super()
attribute_hash.compact
end
end

# Represents a server configuration.
class Servers
attr_reader :default_content_type, :configurations
Expand All @@ -84,6 +91,13 @@ def amqp
yield(amqp_conf)
@configurations.push(amqp_conf)
end

def sns
sns_conf = SnsConfiguration.new(:sns)
sns_conf.call_location = caller(1)
yield(sns_conf)
@configurations.push(sns_conf)
end
end
end
end
20 changes: 16 additions & 4 deletions lib/event_source/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ def subscribe_operation_exists?(name)

# Create and register a collection of new {EventSource::Channel} instances on this Connection
def add_channels(async_api_channels)
async_api_channels
.each do |async_api_channel_item|
async_api_channels.each do |async_api_channel_item|
add_channel(async_api_channel_item.id.to_sym, async_api_channel_item)
end
end
Expand All @@ -130,8 +129,7 @@ def add_channels(async_api_channels)
# type: number
# @return [EventSource::Channel]
def add_channel(channel_item_key, async_api_channel_item)
channel_proxy =
@connection_proxy.add_channel(channel_item_key, async_api_channel_item)
channel_proxy = build_protocol_channel_proxy(channel_item_key, async_api_channel_item)
@channels[channel_item_key] =
Channel.new(self, channel_proxy, async_api_channel_item)
end
Expand Down Expand Up @@ -164,5 +162,19 @@ def protocol_version
def client_version
@connection_proxy.client_version
end

protected

def build_protocol_channel_proxy(channel_item_key, async_api_channel_item)
@connection_proxy.add_channel(channel_item_key, async_api_channel_item)
rescue Error => e
raise(
::EventSource::Error::AddConnectionProxyChannelError.new(
async_api_channel_item,
@connection_proxy,
e
)
)
end
end
end
15 changes: 3 additions & 12 deletions lib/event_source/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,9 @@ def drop_connection(connection_uri)
# @param [Symbol] protocol the protocol name, `:http` or `:amqp`
# @return [Class] Protocol Specific Connection Proxy Class
def protocol_klass_for(protocol)
case protocol.to_sym
when :amqp, :amqps, 'amqp', 'amqps'
EventSource::Protocols::Amqp::BunnyConnectionProxy
when :http, :https, 'http', 'https'
EventSource::Protocols::Http::FaradayConnectionProxy
else
raise EventSource::Protocols::Amqp::Error::UnknownConnectionProtocolError,
"unknown protocol: #{protocol}"
end

# raise EventSource::AsyncApi::Error::UnknownConnectionProtocolError,
# "unknown protocol: #{protocol}"
klass = EventSource::Protocols::Registry.protocol_klass_for(protocol)
raise EventSource::Error::UnknownConnectionProtocolError, "unknown protocol: #{protocol}" unless klass
klass
end
end
end
Expand Down
44 changes: 44 additions & 0 deletions lib/event_source/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,49 @@ class Error < StandardError
MessageBuildError = Class.new(Error)
PayloadEncodeError = Class.new(Error)
PayloadDecodeError = Class.new(Error)
PublisherOperationNotFoundError = Class.new(Error)
UnknownConnectionProtocolError = Class.new(Error)

# An error occured loading a definition YAML.
class ServiceDefinitionLoadingError < StandardError
attr_reader :service_definition_path, :errors

def initialize(sd_path, errors)
@service_definition_path = sd_path
@errors = errors
super(craft_message)
end

def craft_message
<<~ERRORFMTSTRING
Failed to load service definition:
Definition File:
#{@service_definition_path}
Connection Proxy:
#{@errors.inspect}
ERRORFMTSTRING
end
end

# Adding a channel using a particular protocol proxy failed.
class AddConnectionProxyChannelError < Error
attr_reader :channel_information, :connection_proxy

def initialize(channel_info, conn_proxy, original = $ERROR_INFO)
@channel_information = channel_info
@connection_proxy = conn_proxy
super(craft_message, original)
end

def craft_message
<<~ERRORFMTSTRING
Failed to initialize channel:
Channel Item:
#{@channel_information.inspect}
Connection Proxy:
#{@connection_proxy.inspect}
ERRORFMTSTRING
end
end
end
end
11 changes: 11 additions & 0 deletions lib/event_source/protocols.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

require_relative "protocols/registry"
require_relative "protocols/base"

module EventSource
# Basic functionality common to all protocols.
# Specific protocols are loaded using during initialization.
module Protocols
end
end
4 changes: 4 additions & 0 deletions lib/event_source/protocols/amqp/bunny_channel_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ def add_subscribe_operation(_async_api_subscribe_operation)
add_queue
end

def publish_proxy_created(publish_proxy)
create_exchange_to_exchange_bindings(publish_proxy)
end

def create_exchange_to_exchange_bindings(exchange_proxy)
exchange_to_exchange_bindings&.each do |exchange_name, options|
options.deep_symbolize_keys!
Expand Down
1 change: 0 additions & 1 deletion lib/event_source/protocols/amqp/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class Error < StandardError
AuthenticationError = Class.new(Error)
ConnectionError = Class.new(Error)
DuplicateConnectionError = Class.new(Error)
UnknownConnectionProtocolError = Class.new(Error)
ChannelBindingContractError = Class.new(Error)
ExchangeNotFoundError = Class.new(Error)
QueueNotFoundError = Class.new(Error)
Expand Down
13 changes: 10 additions & 3 deletions lib/event_source/protocols/amqp_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
require 'bunny'
require 'uri'

require 'event_source/uris/amqp_uri'
require_relative 'amqp/error'
require_relative 'amqp/bunny_message_proxy'
require_relative 'amqp/bunny_exchange_proxy'
Expand All @@ -22,9 +21,17 @@
module EventSource
module Protocols
# Namespace for classes and modules that use AsyncAPI to manage message
# exchange using the AMQP protcol
# exchange using the AMQP protocol
# @since 0.4.0
module Amqp

def self.scheme_patterns
[:amqp, :amqps]
end

def self.proxy_klass
EventSource::Protocols::Amqp::BunnyConnectionProxy
end
end
end
end
end
13 changes: 13 additions & 0 deletions lib/event_source/protocols/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

require_relative "base/connection_proxy"
require_relative "base/channel_proxy"

module EventSource
module Protocols
# Namespace for shared and default protocol behaviour.
# @since 0.7.0
module Base
end
end
end
Loading