Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/ldclient-rb/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ class DataSystemConfig
# The (optional) builder proc for FDv1-compatible fallback synchronizer
#
def initialize(initializers: nil, primary_synchronizer: nil, secondary_synchronizer: nil,
data_store_mode: LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY, data_store: nil, fdv1_fallback_synchronizer: nil)
data_store_mode: LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_ONLY, data_store: nil, fdv1_fallback_synchronizer: nil)
@initializers = initializers
@primary_synchronizer = primary_synchronizer
@secondary_synchronizer = secondary_synchronizer
Expand Down
6 changes: 3 additions & 3 deletions lib/ldclient-rb/data_system.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize
@primary_synchronizer = nil
@secondary_synchronizer = nil
@fdv1_fallback_synchronizer = nil
@data_store_mode = LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY
@data_store_mode = LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_ONLY
@data_store = nil
end

Expand Down Expand Up @@ -205,7 +205,7 @@ def self.custom
# @return [ConfigBuilder]
#
def self.daemon(store)
custom.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY)
custom.data_store(store, LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_ONLY)
end

#
Expand All @@ -219,7 +219,7 @@ def self.daemon(store)
# @return [ConfigBuilder]
#
def self.persistent_store(store)
default.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_WRITE)
default.data_store(store, LaunchDarkly::Interfaces::DataSystem::DataStoreMode::READ_WRITE)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/ldclient-rb/impl/data_store/store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def get_data_store_status_provider
private def send_change_events(affected_items)
affected_items.each do |item|
if item[:kind] == FEATURES
@flag_change_broadcaster.broadcast(item[:key])
@flag_change_broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(item[:key]))
end
end
end
Expand Down
288 changes: 288 additions & 0 deletions lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
require 'concurrent/atomics'
require 'ldclient-rb/impl/data_system'
require 'ldclient-rb/interfaces/data_system'
require 'ldclient-rb/util'
require 'thread'

module LaunchDarkly
module Impl
module Integrations
module TestData
#
# Internal implementation of both Initializer and Synchronizer protocols for TestDataV2.
#
# This component bridges the test data management in TestDataV2 with the FDv2 protocol
# interfaces. Each instance implements both Initializer and Synchronizer protocols
# and receives change notifications for dynamic updates.
#
class TestDataSourceV2
include LaunchDarkly::Interfaces::DataSystem::Initializer
include LaunchDarkly::Interfaces::DataSystem::Synchronizer

# @api private
#
# @param test_data [LaunchDarkly::Integrations::TestDataV2] the test data instance
#
def initialize(test_data)
@test_data = test_data
@closed = false
@update_queue = Queue.new
@lock = Mutex.new

# Always register for change notifications
@test_data.add_instance(self)
end

#
# Return the name of this data source.
#
# @return [String]
#
def name
'TestDataV2'
end

#
# Implementation of the Initializer.fetch method.
#
# Returns the current test data as a Basis for initial data loading.
#
# @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for test data)
# @return [LaunchDarkly::Result] A Result containing either a Basis or an error message
#
def fetch(selector_store)
begin
@lock.synchronize do
if @closed
return LaunchDarkly::Result.fail('TestDataV2 source has been closed')
end

# Get all current flags and segments from test data
init_data = @test_data.make_init_data
version = @test_data.get_version

# Build a full transfer changeset
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL)

# Add all flags to the changeset
init_data[:flags].each do |key, flag_data|
builder.add_put(
LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG,
key,
flag_data[:version] || 1,
flag_data
)
end

# Add all segments to the changeset
init_data[:segments].each do |key, segment_data|
builder.add_put(
LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT,
key,
segment_data[:version] || 1,
segment_data
)
end

# Create selector for this version
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
change_set = builder.finish(selector)

basis = LaunchDarkly::Interfaces::DataSystem::Basis.new(change_set: change_set, persist: false, environment_id: nil)

LaunchDarkly::Result.success(basis)
end
rescue => e
LaunchDarkly::Result.fail("Error fetching test data: #{e.message}", e)
end
end

#
# Implementation of the Synchronizer.sync method.
#
# Yields updates as test data changes occur.
#
# @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for test data)
# @yield [LaunchDarkly::Interfaces::DataSystem::Update] Yields Update objects as synchronization progresses
# @return [void]
#
def sync(selector_store)
# First yield initial data
initial_result = fetch(selector_store)
unless initial_result.success?
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
0,
initial_result.error,
Time.now
)
)
return
end

# Yield the initial successful state
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
change_set: initial_result.value.change_set
)

# Continue yielding updates as they arrive
until @closed
begin
# stop() will push nil to the queue to wake us up when shutting down
update = @update_queue.pop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keelerm84 Ruby doesn't have a pop with timeout like we do in Python. The stop method will push nil into the queue to stop it from blocking so I'm not sure it is needed but I wanted to call it out.


# Handle nil sentinel for shutdown
break if update.nil?

# Yield the actual update
yield update
rescue => e
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN,
0,
"Error in test data synchronizer: #{e.message}",
Time.now
)
)
break
end
end
end

#
# Stop the data source and clean up resources
#
# @return [void]
#
def stop
@lock.synchronize do
return if @closed
@closed = true
end

@test_data.closed_instance(self)
# Signal shutdown to sync generator
@update_queue.push(nil)
end

#
# Called by TestDataV2 when a flag is updated.
#
# This method converts the flag update into an FDv2 changeset and
# queues it for delivery through the sync() generator.
#
# @param flag_data [Hash] the flag data
# @return [void]
#
def upsert_flag(flag_data)
@lock.synchronize do
return if @closed

begin
version = @test_data.get_version

# Build a changes transfer changeset
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES)

# Add the updated flag
builder.add_put(
LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG,
flag_data[:key],
flag_data[:version] || 1,
flag_data
)

# Create selector for this version
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
change_set = builder.finish(selector)

# Queue the update
update = LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
change_set: change_set
)

@update_queue.push(update)
rescue => e
# Queue an error update
error_update = LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
0,
"Error processing flag update: #{e.message}",
Time.now
)
)
@update_queue.push(error_update)
end
end
end

#
# Called by TestDataV2 when a segment is updated.
#
# This method converts the segment update into an FDv2 changeset and
# queues it for delivery through the sync() generator.
#
# @param segment_data [Hash] the segment data
# @return [void]
#
def upsert_segment(segment_data)
@lock.synchronize do
return if @closed

begin
version = @test_data.get_version

# Build a changes transfer changeset
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES)

# Add the updated segment
builder.add_put(
LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT,
segment_data[:key],
segment_data[:version] || 1,
segment_data
)

# Create selector for this version
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
change_set = builder.finish(selector)

# Queue the update
update = LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
change_set: change_set
)

@update_queue.push(update)
rescue => e
# Queue an error update
error_update = LaunchDarkly::Interfaces::DataSystem::Update.new(
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
0,
"Error processing segment update: #{e.message}",
Time.now
)
)
@update_queue.push(error_update)
end
end
end
end
end
end
end
end

Loading