Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ group :development, :test do
gem 'yard'
gem 'rubocop-git'
gem 'aws-sdk-sns'
gem 'concurrent-ruby'
end
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PATH
event_source (0.6.0)
addressable (>= 2.8.0)
bunny (>= 2.14)
concurrent-ruby
deep_merge (~> 1.2.0)
dry-configurable (~> 0.12)
dry-events (~> 0.3)
Expand Down Expand Up @@ -345,6 +346,7 @@ PLATFORMS

DEPENDENCIES
aws-sdk-sns
concurrent-ruby
database_cleaner
event_source!
faker
Expand Down
1 change: 1 addition & 0 deletions event_source.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'oj', '~> 3.11'
spec.add_dependency 'ox', '~> 2.14'
spec.add_dependency 'typhoeus', '~> 1.4.0'
spec.add_dependency 'concurrent-ruby'

# TODO: Change to development dependency
spec.add_development_dependency 'database_cleaner'
Expand Down
2 changes: 2 additions & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
require 'event_source/channel'
require 'event_source/queue'
require 'event_source/worker'
require 'event_source/worker_pool'
require 'event_source/publish_operation'
require 'event_source/subscribe_operation'
require 'event_source/message'
Expand All @@ -45,6 +46,7 @@
require 'event_source/operations/build_message_options'
require 'event_source/operations/build_message'
require 'event_source/boot_registry'
require 'event_source/paginator'

# Event source provides ability to compose, publish and subscribe to events
module EventSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require "event_source/protocols/http/contracts/subscribe_operation_bindings_contract"
require "event_source/protocols/amqp/contracts/subscribe_operation_binding_contract"
require "event_source/protocols/sns/contracts/subscribe_operation_binding_contract"

module EventSource
module AsyncApi
Expand All @@ -11,6 +12,7 @@ class SubscribeOperationBindingsContract < Contract
params do
optional(:http).hash
optional(:amqp).hash
optional(:sns).hash
end

rule(:http) do
Expand All @@ -24,6 +26,20 @@ class SubscribeOperationBindingsContract < Contract
end
end
end

rule(:sns) do
if key? && value
validation_result = ::EventSource::Protocols::Sns::Contracts::SubscribeOperationBindingsContract.new.call(value)
if validation_result&.failure?
key.failure(
text: 'invalid operation bindings',
error: validation_result.errors.to_h
)
else
values.data.merge({ sns: validation_result.values })
end
end
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/event_source/async_api/publish_bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require "event_source/protocols/http/publish_bindings"
require "event_source/protocols/amqp/types"
require "event_source/protocols/amqp/publish_bindings"
require "event_source/protocols/sns/publish_bindings"
require "event_source/protocols/sns/operation_bindings"

module EventSource
module AsyncApi
Expand Down
2 changes: 2 additions & 0 deletions lib/event_source/async_api/subscribe_bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
require "event_source/protocols/http/types"
require "event_source/protocols/http/subscribe_bindings"
require "event_source/protocols/amqp/subscribe_bindings"
require "event_source/protocols/sns/operation_bindings"

module EventSource
module AsyncApi
class SubscribeBindings < Dry::Struct
transform_keys(&:to_sym)
attribute :http, ::EventSource::Protocols::Http::SubscribeBindings.meta(omittable: true)
attribute :amqp, Types::Hash.meta(omittable: true)
attribute :sns, ::EventSource::Protocols::Sns::SubscribeBindings.meta(omittable: true)
end
end
end
2 changes: 2 additions & 0 deletions lib/event_source/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def find_connection(params)
connection.publish_operation_exists?(params[:publish_operation_name])
end
else
# raise params[:subscribe_operation_name].inspect if params[:protocol] == :sns
# raise connections.first.subscribe_operations.keys.inspect
connections.detect do |connection|
connection.subscribe_operation_exists?(
params[:subscribe_operation_name]
Expand Down
10 changes: 10 additions & 0 deletions lib/event_source/loggers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

require 'logging'
require_relative "loggers/inherited_logger"

module EventSource
# Logger implementations for EventSource, providing sensible defaults.
module Loggers
end
end
22 changes: 22 additions & 0 deletions lib/event_source/loggers/inherited_logger.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require 'logging'

module EventSource
module Loggers
# A logger which, no matter the name, inherits from the EventSource primary logger
class InheritedLogger < ::Logging::Logger
# Inherit from the logger for EventSource
# rubocop:disable Lint/MissingSuper
def initialize(name)
case name
when String
raise(ArgumentError, "logger must have a name") if name.empty?
else raise(ArgumentError, "logger name must be a String") end

_setup(name, :parent => ::Logging.logger["EventSource"])
end
# rubocop:enable Lint/MissingSuper
end
end
end
4 changes: 4 additions & 0 deletions lib/event_source/logging.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# frozen_string_literal: true

require_relative 'loggers'
require 'logging'

module EventSource
# Module for logging
module Logging
def inherited_logger(name)
::EventSource::Loggers::InheritedLogger[name]
end

def logger
logger_instance = ::Logging.logger['EventSource']
Expand Down
26 changes: 26 additions & 0 deletions lib/event_source/paginator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

module EventSource
# Abstract paginator for protocols which support it.
class Paginator
attr_reader :state, :continuation_data

def initialize(more, state, continue_data)
@more = more
@state = state
@continuation_data = continue_data
end

def more?
@more
end

def self.continue(state, continue_data)
self.new(true, state, continue_data)
end

def self.finished
self.new(false, nil, nil)
end
end
end
6 changes: 5 additions & 1 deletion lib/event_source/protocols/amqp_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def self.scheme_patterns
def self.proxy_klass
EventSource::Protocols::Amqp::BunnyConnectionProxy
end

def self.subscribe_operation_name_for(app_name, subscriber_key)
"on_#{app_name}.#{subscriber_key}"
end
end
end
end
end
10 changes: 0 additions & 10 deletions lib/event_source/protocols/http/faraday_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,6 @@ def subscribe(subscriber_klass, _options)
@subject.register_action(subscriber_klass, unique_key)
end

def consumer_proxy_for(operation_bindings)
FaradayConsumerProxy.new(
@subject.channel,
@subject,
'',
operation_bindings[:no_ack],
operation_bindings[:exclusive]
)
end

def respond_to_missing?(name, include_private); end

# Forward all missing method calls to the EventSource::Queue instance
Expand Down
4 changes: 4 additions & 0 deletions lib/event_source/protocols/http_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def self.scheme_patterns
def self.proxy_klass
EventSource::Protocols::Http::FaradayConnectionProxy
end

def self.subscribe_operation_name_for(_app_name, subscriber_key)
"/on#{subscriber_key}"
end
end
end
end
13 changes: 13 additions & 0 deletions lib/event_source/protocols/registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@ class Registry

def initialize
@registry = []
@connection_modules = []
end

def register_protocol(connection_module)
@registry << [connection_module.scheme_patterns, connection_module.proxy_klass]
@connection_modules << [connection_module.scheme_patterns, connection_module]
end

def subscribe_operation_name_for(protocol, app_name, subscriber_key)
match = @connection_modules.detect do |r_item|
r_item[0].any? { |ri| ri == protocol}
end
match ? match[1].subscribe_operation_name_for(app_name, subscriber_key) : nil
end

def protocol_klass_for(protocol)
Expand All @@ -39,6 +48,10 @@ def self.register_protocol(connection_module)
def self.protocol_klass_for(protocol)
instance.protocol_klass_for(protocol.to_sym)
end

def self.subscribe_operation_name_for(protocol, app_name, subscriber_key)
instance.subscribe_operation_name_for(protocol.to_sym, app_name, subscriber_key)
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ module Contracts
class PublishOperationBindingsContract < Dry::Validation::Contract
params do
# SNS suppports a number of protocols for publishing and
# consumption. Right now we only support SMS (text) publishing.
# consumption. Right now we only support SMS (text) publishing and
# invocations of the SDK to manage information about resources.
# The `consumers` settings, available under the AsyncAPI binding
# definition for an SNS publish operation, determine what actual
# transport protocol will be used to deliver the message.
optional(:consumers).array(:hash) do
required(:protocol).filled(:string, :included_in? => ["sms"])
optional(:x_sdk_action).maybe(:string, :included_in? => ["ListPhoneNumbersOptedOut"])
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

module EventSource
module Protocols
module Sns
module Contracts
# Validate parameters for AsyncAPI SNS subcribe bindings.
class SubscribeOperationBindingsContract < Dry::Validation::Contract
params do
# SNS suppports a number of protocols for publishing and
# consumption. Right now we only support SMS (text) subscription
# to SDK events.
# The `consumers` settings, available under the AsyncAPI binding
# definition for an SNS subscribe operation, determine what actual
# transport protocol will be used to deliver the message.
optional(:consumers).array(:hash) do
required(:protocol).filled(:string, :included_in? => ["sms"])
required(:x_sdk_action).maybe(:string, :included_in? => ["ListPhoneNumbersOptedOut"])
optional(:x_paginate).maybe(:bool)
end
end
end
end
end
end
end
40 changes: 40 additions & 0 deletions lib/event_source/protocols/sns/operation_bindings.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

module EventSource
module Protocols
module Sns
# Describe the Consumer settings under a publish binding.
class ConsumerConfiguration < Dry::Struct
transform_keys(&:to_sym)
attribute :protocol, Types::String.meta(omittable: false)
# If this is an SDK action - indicate the action invoked.
attribute :x_sdk_action, Types::String.meta(omittable: true)

def sms?
protocol.to_s.downcase == "sms"
end

def sdk_invocation?
!x_sdk_action.blank?
end
end

# Describe the Consumer settings under a subscriber binding.
class SubscriberConsumerConfiguration < ConsumerConfiguration
attribute :x_paginate, Types::Bool.meta(omittable: true)
end

# Describes publish operation bindings.
class PublishBindings < Dry::Struct
transform_keys(&:to_sym)
attribute :consumers, Types::Array.of(ConsumerConfiguration).meta(omittable: true)
end

# Describes subscriber operation bindings.
class SubscribeBindings < Dry::Struct
transform_keys(&:to_sym)
attribute :consumers, Types::Array.of(SubscriberConsumerConfiguration).meta(omittable: true)
end
end
end
end
23 changes: 0 additions & 23 deletions lib/event_source/protocols/sns/publish_bindings.rb

This file was deleted.

13 changes: 11 additions & 2 deletions lib/event_source/protocols/sns/sns_channel_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@ def initialize(client, channel_name, channel_item)
@client = client
@channel_name = channel_name
@channel_item = channel_item
@worker_proxy = SnsSdkWorkerProxy.new(client, channel_item)
end

def add_publish_operation(async_api_channel_item)
SnsPublisherProxy.new(@client, async_api_channel_item)
if async_api_channel_item.publish.bindings.sns.consumers.any?(&:sdk_invocation?)
SnsSdkPublisherProxy.new(@worker_proxy, async_api_channel_item)
else
SnsPublisherProxy.new(@client, async_api_channel_item)
end
end

def add_subscribe_operation(async_api_channel_item)
SnsSdkQueueProxy.new(@worker_proxy, async_api_channel_item)
end
end
end
end
end
end
Loading