-
Notifications
You must be signed in to change notification settings - Fork 5
FDN-4338: add support for catalog item events #500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughExtends event processing to handle Inserted, Updated, Upserted, and Deleted; adds an UpsertLike trait with Inserted/Updated types; implements processUpsertLike, pairUpEvents and pairWithDeleted to group/merge upsert-like events with Deleted; CapturedType now carries multiple discriminators and a custom JSON writer; tests added. Changes
Sequence Diagram(s)sequenceDiagram
rect rgba(100,150,200,0.5)
participant Source as Event Source
end
rect rgba(120,200,120,0.5)
participant Controller as Controller
end
rect rgba(200,150,100,0.5)
participant Pairer as pairUpEvents / pairWithDeleted
end
rect rgba(180,100,200,0.5)
participant Serializer as capturedTypeWrites
end
Source->>Controller: emit events (inserted/updated/upserted/deleted)
Controller->>Controller: processUpsertLike -> build UpsertLike events
Controller->>Pairer: pairUpEvents(inserted, updated, upserted, deleted)
Pairer->>Pairer: group upsert-like by type/id and merge discriminators
Pairer->>Pairer: match grouped upsert-like with Deleted entries
Pairer->>Serializer: produce CapturedType(s)
Serializer->>Controller: JSON descriptor(s)
Controller->>Source: saveDescriptor / emit to Kinesis
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. 🧹 Recent nitpick comments
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
🧰 Additional context used🧬 Code graph analysis (2)src/test/scala/io/flow/stream/ControllerSpec.scala (3)
src/main/scala/io/flow/stream/Controller.scala (3)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
🔇 Additional comments (10)
✏️ Tip: You can disable this entire section by setting Comment |
gheine
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you produce the api and api-internal repo stream descriptors with and without this change and do a diff between the two, so that we can verify there are no other, unintended changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@build.sbt`:
- Around line 13-14: The coverage thresholds were lowered via
coverageMinimumStmtTotal and coverageMinimumBranchTotal (48→40 and 51→40)
causing a notable regression; either add tests to exercise the new event
processing logic to restore statement and branch coverage back to previous
levels, document a brief justification in the PR/issue if the reduction is
temporary, or create a follow-up ticket to re-instate higher thresholds after
migration and reference these symbols (coverageMinimumStmtTotal,
coverageMinimumBranchTotal) so reviewers can verify the intended remediation.
🧹 Nitpick comments (1)
src/main/scala/io/flow/stream/Controller.scala (1)
366-378: Backward-compatible serialization approach is sound.The custom
Writes[CapturedType]correctly emits both the legacyupsertedDiscriminatorfield and the newupsertedDiscriminatorsarray, enabling a gradual migration.Minor safety consideration: The
.headcall on line 374 relies on the implicit invariant thatupsertedDiscriminatorsis never empty. While this is currently guaranteed by thepairUpEventslogic, consider adding a defensive check or usingheadOption.getOrElsewith a fallback to make this invariant explicit.🛡️ Optional: Add defensive check for empty sequence
implicit val w1: Writes[CapturedType] = (ct: CapturedType) => Json.obj( "fieldName" -> ct.fieldName, "typeName" -> ct.typeName, "modelType" -> ct.modelType, - "upsertedDiscriminator" -> ct.upsertedDiscriminators.head, + "upsertedDiscriminator" -> ct.upsertedDiscriminators.headOption.getOrElse(""), "deletedDiscriminator" -> ct.deletedDiscriminator, "deletedHasModel" -> ct.deletedHasModel, "upsertedDiscriminators" -> ct.upsertedDiscriminators, )Alternatively, consider using
cats.data.NonEmptyListorNonEmptySeqforupsertedDiscriminatorsto enforce the non-empty invariant at the type level.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
build.sbtsrc/main/scala/io/flow/stream/Controller.scalasrc/main/scala/io/flow/stream/EventType.scalasrc/main/scala/io/flow/stream/KinesisStream.scala
🧰 Additional context used
🧬 Code graph analysis (2)
src/main/scala/io/flow/stream/EventType.scala (1)
src/main/scala/io/flow/lint/linters/UpsertedDeletedEventModels.scala (2)
Upserted(18-20)Deleted(22-24)
src/main/scala/io/flow/stream/Controller.scala (3)
src/main/scala/io/flow/stream/EventType.scala (4)
EventType(10-51)Inserted(17-24)Updated(26-33)Upserted(35-42)src/main/scala/io/flow/stream/EventUnionTypeMatcher.scala (2)
EventUnionTypeMatcher(7-38)matchFieldToPayloadType(9-11)src/main/scala/io/flow/stream/KinesisStream.scala (1)
CapturedType(16-23)
🔇 Additional comments (8)
src/main/scala/io/flow/stream/EventType.scala (1)
11-42: LGTM! Well-structured type hierarchy for upsert-like events.The
UpsertLikesealed trait effectively captures the common structure shared byInserted,Updated, andUpsertedevents, while correctly excludingDeleted(which has an optional payload). This design enables clean pattern matching and grouping logic in the controller.src/main/scala/io/flow/stream/Controller.scala (6)
98-102: LGTM! Clean pattern matching for event type classification.The use of
collectwith partial functions effectively separates the four event types for subsequent pairing logic.
126-126: LGTM! Regex correctly extended for new event types.The pattern
"(.*)_(upserted|inserted|updated|deleted)_?(.*)"properly captures all four event type variants.
163-168: LGTM! Clean delegation pattern for upsert-like event processing.Using
processUpsertLikewith case class constructors (EventType.Inserted.apply, etc.) effectively eliminates code duplication across the three event types.
192-210: LGTM! Well-factored helper for building UpsertLike events.The method correctly extracts payload information and constructs events using the provided builder function. The for-comprehension safely handles cases where payload fields or ID fields are not found.
313-329: LGTM! Grouping logic correctly merges related upsert events.The grouping by
(typeName, idField.name)ensures that inserted and updated events for the same entity are paired with a single deleted event. SincegroupBynever produces empty groups andupserted.map(List(_))creates single-element lists, the inner lists inallGroupsare guaranteed non-empty.
331-361: LGTM! Pairing logic correctly handles upsert groups with deleted events.The recursive approach properly:
- Matches groups to deleted events by
typeNameandidField.name- Prefers deleted events with payloads when available
- Logs unpaired events for debugging
- Removes consumed deleted events to prevent duplicate pairing
src/main/scala/io/flow/stream/KinesisStream.scala (1)
16-23: LGTM! Type change aligns with new multi-discriminator requirements.The change from
upsertedDiscriminator: StringtoupsertedDiscriminators: Seq[String]correctly supports the new grouping logic where inserted and updated events are merged.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| val mergedInsertedUpdated: List[List[EventType.UpsertLike]] = (inserted ++ updated) | ||
| .groupBy(u => (u.typeName, u.idField.name)) | ||
| .values | ||
| .toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to verify that the payload type is the same for insert/update pairs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added payload-type to the group-by key, and associated test case.
| head.typeName, | ||
| head.payloadType, | ||
| head.discriminator, | ||
| group.map(_.discriminator), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of changing the discriminator field from String to Seq[String], could we emit multiple CapturedType? like
group.map { event =>
CapturedType(
event.fieldName,
event.typeName,
event.payloadType,
event.discriminator,
d.discriminator,
d.payloadType.isDefined,
)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "deletedDiscriminator" -> ct.deletedDiscriminator, | ||
| "deletedHasModel" -> ct.deletedHasModel, | ||
| "upsertedDiscriminators" -> ct.upsertedDiscriminators, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would not be needed if we don't change from "upsertedDiscriminator: String" to "upsertedDiscriminators: Seq[String]"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gheine
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/main/scala/io/flow/stream/Controller.scala`:
- Line 373: The use of ct.upsertedDiscriminators.head when building the map
entry "upsertedDiscriminator" can throw NoSuchElementException if the sequence
is empty; change this to use ct.upsertedDiscriminators.headOption and handle the
empty case explicitly (e.g., getOrElse { throw new
IllegalStateException(s"expected upsertedDiscriminators non-empty for $ct") } or
log and provide a safe fallback) so the code at the map entry for
"upsertedDiscriminator" fails with a clear, controlled error or fallback instead
of crashing with NoSuchElementException.
- Around line 319-323: When building mergedInsertedUpdated (the grouped
List[List[EventType.UpsertLike]]) you must validate that all events in each
group share the same payloadType before using head.payloadType to construct the
CapturedType; add a check over each group in mergedInsertedUpdated (e.g., inside
the code that constructs CapturedType at the point where head.payloadType is
used) to assert or throw/log if group.map(_.payloadType).distinct.size > 1 so
inconsistent payloadTypes are detected and handled (or document the invariant
and explicitly enforce it); reference EventType.UpsertLike,
mergedInsertedUpdated and CapturedType when adding this validation.
🧹 Nitpick comments (2)
src/main/scala/io/flow/stream/Controller.scala (1)
336-337: Potential runtime error ifgroupis empty.
group.headwill throwNoSuchElementExceptionifgroupis empty. While the current code flow frompairUpEventsshouldn't produce empty groups (sincegroupByfilters them out andupserted.map(List(_))always creates single-element lists), this is a defensive coding consideration.♻️ Defensive headOption pattern
upsertGroups match { case group :: tail => - val head = group.head + group.headOption.fold { + pairWithDeleted(tail, deleted) + } { head => val candidates = deleted.filter(d => d.typeName == head.typeName && d.idField.name == head.idField.name) // ... rest of logic + }src/test/scala/io/flow/stream/ControllerSpec.scala (1)
1-269: Consider adding a test for emptyupsertedDiscriminatorsedge case.Given the potential runtime issue identified in
capturedTypeWriteswhereheadis called onupsertedDiscriminators, consider adding a test that validates the expected behavior (or documents that this case should never occur).
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/main/scala/io/flow/stream/Controller.scalasrc/test/scala/io/flow/stream/ControllerSpec.scala
🧰 Additional context used
🧬 Code graph analysis (2)
src/main/scala/io/flow/stream/Controller.scala (3)
src/main/scala/io/flow/stream/EventType.scala (4)
EventType(10-51)Inserted(17-24)Updated(26-33)Upserted(35-42)src/main/scala/io/flow/stream/EventUnionTypeMatcher.scala (2)
EventUnionTypeMatcher(7-38)matchFieldToPayloadType(9-11)src/main/scala/io/flow/stream/KinesisStream.scala (1)
CapturedType(16-23)
src/test/scala/io/flow/stream/ControllerSpec.scala (3)
src/main/scala/io/flow/stream/Controller.scala (3)
stream(313-329)stream(331-361)Controller(12-394)src/main/scala/io/flow/stream/KinesisStream.scala (1)
CapturedType(16-23)src/main/scala/io/flow/stream/EventType.scala (3)
EventType(10-51)Inserted(17-24)Updated(26-33)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Jenkins
🔇 Additional comments (13)
src/main/scala/io/flow/stream/Controller.scala (5)
98-102: LGTM!The extraction of four event types (inserted, updated, upserted, deleted) and passing them to
pairUpEventsis well-structured and aligns with the newUpsertLikehierarchy defined inEventType.scala.
126-126: LGTM!The regex extension to support
inserted|updated|upserted|deletedpatterns withprivate[stream]visibility for testing is appropriate.
163-168: LGTM!Clean delegation to
processUpsertLikefor each event type with the appropriate factory function. The pattern matching handles all four event types consistently.
192-210: LGTM!The
processUpsertLikehelper cleanly abstracts the common logic for constructingInserted,Updated, andUpsertedevents. The use of a higher-orderbuildfunction is idiomatic.
363-378: Backward compatibility approach is sound, but consider documenting the deprecation timeline.The custom
Writes[CapturedType]correctly emits bothupsertedDiscriminator(for backward compatibility) andupsertedDiscriminators(new format). The comment mentions removing the old form "once all clients updated" - consider adding a@deprecatedannotation or a TODO with a target version/date for removal.src/test/scala/io/flow/stream/ControllerSpec.scala (8)
10-20: LGTM!Clean helper methods for creating test fixtures. The
makeModelandmakeIdFieldfunctions provide good abstractions for test setup.
22-63: LGTM!Comprehensive JSON serialization tests that verify both backward compatibility (
upsertedDiscriminator) and the new format (upsertedDiscriminators). Good coverage of single and multiple discriminator scenarios.
65-117: LGTM!Thorough pattern matching tests covering all event types, suffixes, compound type names, and negative cases. The test at line 111-116 for invalid event types is a good edge case.
119-167: LGTM!Well-structured helper methods for creating event type instances. Each helper clearly constructs the required test data.
169-193: LGTM!Good tests for basic pairing scenarios. The test at line 182-193 properly validates that inserted and updated events are merged with their discriminators combined.
195-222: Good test for keeping upserted separate from inserted/updated.This test validates the important behavior that
upsertedevents remain separate and are not merged withinserted/updatedevents for the same type.
224-252: LGTM!Good coverage of multiple types, empty input handling, and skipping unmatched events.
254-267: LGTM!Important test verifying that deleted events with payload are preferred over those without. This aligns with the logic in
pairWithDeletedat line 339 of Controller.scala.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
I was able to do this, and only the 4 new streams (coming from DynamoDB tables appear) while existing ones are unchanged (aside from having the new array of discriminators). |
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/main/scala/io/flow/stream/Controller.scala`:
- Around line 337-353: The current selection of a matching deleted event
(variables head, candidates, candidate in pairWithDeleted) only matches on
typeName/idField and then prefers any deleted with payloadType defined; update
the selection to first try to find a deleted whose payloadType equals
head.payloadType (i.e., payload-aware match), then fall back to the existing
.find(_.payloadType.isDefined).orElse(candidates.headOption) behavior so we pair
upserts with deleted events of the same payload when possible before using a
generic fallback; keep the rest of the CapturedType construction unchanged.
♻️ Duplicate comments (1)
src/main/scala/io/flow/stream/Controller.scala (1)
363-377: Guard against emptyupsertedDiscriminatorsduring serialization.
Line 373 will throw if the sequence is empty; consider enforcing non‑empty or usingheadOptionwith a clear failure.
🧹 Nitpick comments (1)
src/test/scala/io/flow/stream/ControllerSpec.scala (1)
119-292: Consider a payload‑specific deleted pairing test.
If you adopt payload‑aware deleted matching, add a case where two deleted events exist for the same type but different payloads, and assert the matching payload is selected.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/main/scala/io/flow/stream/Controller.scalasrc/test/scala/io/flow/stream/ControllerSpec.scala
🧰 Additional context used
🧬 Code graph analysis (1)
src/test/scala/io/flow/stream/ControllerSpec.scala (3)
src/main/scala/io/flow/stream/Controller.scala (3)
stream(313-329)stream(331-361)Controller(12-394)src/main/scala/io/flow/stream/KinesisStream.scala (1)
CapturedType(16-23)src/main/scala/io/flow/stream/EventType.scala (3)
EventType(10-51)Inserted(17-24)Updated(26-33)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Jenkins
🔇 Additional comments (7)
src/main/scala/io/flow/stream/Controller.scala (5)
98-102: Clean separation of event types before pairing.
126-126: Regex expansion looks good.
163-169: Upsert-like routing is clear and consistent.
192-209: Helper keeps upsert-like extraction DRY.
313-328: Payload-aware grouping aligns with the stated intent.src/test/scala/io/flow/stream/ControllerSpec.scala (2)
22-62: Solid coverage for backward-compatible JSON shape.
65-116: Regex test coverage looks good.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
Catalog item events have item inserted and updated events, while upserted is an interface but is not concretely used AFAIK. We could support stream descriptors with more than one event of "upserted" type, keeping the JSON backwards compatible and forcing Scala clients to use the new field (sequence of strings as opposed to just a string). Later, when all clients use the new code, we can clean up the JSON and only emit the sequence.
Summary by CodeRabbit
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.