Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/state-manager/CachedAppDataManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ class CachedAppDataManager {
globalOffset,
targetGroupSize,
senderGroupSize,
allNodes.length
allNodes.length,
`factSendCorrespondingCachedAppData `+ txId
)

const correspondingNodes: P2PTypes.NodeListTypes.Node[] = []
Expand Down
33 changes: 11 additions & 22 deletions src/state-manager/TransactionQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4471,7 +4471,8 @@ class TransactionQueue {
queueEntry.correspondingGlobalOffset,
targetGroupSize,
senderGroupSize,
queueEntry.transactionGroup.length
queueEntry.transactionGroup.length,
'factTellCorrespondingNodes ' + queueEntry.logID
)
// check if we should avoid our index in the corresponding nodes
if (Context.config.stateManager.avoidOurIndexInFactTell && correspondingIndices.includes(ourIndexInTxGroup)) {
Expand Down Expand Up @@ -4500,7 +4501,8 @@ class TransactionQueue {
queueEntry.correspondingGlobalOffset,
targetGroupSize,
senderGroupSize,
queueEntry.transactionGroup.length
queueEntry.transactionGroup.length,
`factTellCorrespondingNodes ` + queueEntry.logID
)
if (logFlags.debug)
this.mainLogger.debug(
Expand Down Expand Up @@ -4691,9 +4693,10 @@ class TransactionQueue {
// check if it is a FACT sender
const receivingNodeIndex = queueEntry.ourTXGroupIndex // we are the receiver
const senderNodeIndex = queueEntry.transactionGroup.findIndex((node) => node.id === senderNodeId)
let wrappedSenderNodeIndex = null
// CHANGE: Determine the correct sender index upfront
let effectiveSenderIndex = senderNodeIndex
if (queueEntry.isSenderWrappedTxGroup[senderNodeId] != null) {
wrappedSenderNodeIndex = queueEntry.isSenderWrappedTxGroup[senderNodeId]
effectiveSenderIndex = queueEntry.isSenderWrappedTxGroup[senderNodeId]
}
const receiverGroupSize = queueEntry.executionNodeIdSorted.length
const senderGroupSize = receiverGroupSize
Expand All @@ -4705,7 +4708,7 @@ class TransactionQueue {

let isValidFactSender = verifyCorrespondingSender(
receivingNodeIndex,
senderNodeIndex,
effectiveSenderIndex,
queueEntry.correspondingGlobalOffset,
receiverGroupSize,
senderGroupSize,
Expand All @@ -4715,21 +4718,7 @@ class TransactionQueue {
false,
`tellSender ${queueEntry.logID}`
)
if (isValidFactSender === false && wrappedSenderNodeIndex != null && wrappedSenderNodeIndex >= 0) {
// try again with wrapped sender index
isValidFactSender = verifyCorrespondingSender(
receivingNodeIndex,
wrappedSenderNodeIndex,
queueEntry.correspondingGlobalOffset,
receiverGroupSize,
senderGroupSize,
targetIndices.startIndex,
targetIndices.endIndex,
queueEntry.transactionGroup.length,
false,
`tellSenderWrapped ${queueEntry.logID}`
)
}

// it maybe a FACT sender but sender does not cover the account
if (senderHasAddress === false) {
this.mainLogger.error(
Expand All @@ -4745,7 +4734,7 @@ class TransactionQueue {
// it is neither a FACT corresponding node nor an exe neighbour node
if (isValidFactSender === false) {
this.mainLogger.error(
`factValidateCorrespondingTellSender: logId: ${queueEntry.logID} sender is neither a valid sender nor a neighbour node isValidSender: ${isValidFactSender}`
`factValidateCorrespondingTellSender: logId: ${queueEntry.logID} sender is neither a valid sender nor a neighbour node isValidSender: ${effectiveSenderIndex}`
)
nestedCountersInstance.countEvent(
'stateManager',
Expand Down Expand Up @@ -5086,7 +5075,7 @@ class TransactionQueue {
targetGroupSize,
senderGroupSize,
queueEntry.transactionGroup.length,
queueEntry.logID
`factTellCorrespondingNodesFinalData ` + queueEntry.logID
)

for (const key of keysToShare) {
Expand Down
174 changes: 78 additions & 96 deletions src/utils/fastAggregatedCorrespondingTell.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//get the target nodes for a given corresponding sender
import { Utils } from '@shardeum-foundation/lib-types'
import { logFlags } from '../logger'

import { nestedCountersInstance } from './nestedCounters'
//this only has to be computed once time no matter how many facts are being shared
export function getCorrespondingNodes(
ourIndex: number,
Expand All @@ -14,82 +15,38 @@ export function getCorrespondingNodes(
): number[] {
if (logFlags.verbose) {
console.log(
`getCorrespondingNodes ${note} ${ourIndex} ${startTargetIndex} ${endTargetIndex} ${globalOffset} ${receiverGroupSize} ${sendGroupSize} ${transactionGroupSize}`
`getCorrespondingNodes ${note} ourIndex:${ourIndex} startTarget:${startTargetIndex} endTarget:${endTargetIndex} globalOffset:${globalOffset} receiverGroupSize:${receiverGroupSize} sendGroupSize:${sendGroupSize} transactionGroupSize:${transactionGroupSize}`
)
}
let wrappedIndex: number
let targetNumber: number
let found = false

let unWrappedEndIndex = -1
// handle case where receiver group is split (wraps around)
if (startTargetIndex > endTargetIndex) {
unWrappedEndIndex = endTargetIndex
endTargetIndex = endTargetIndex + transactionGroupSize
}
//wrap our index to the send group size
ourIndex = ourIndex % sendGroupSize

//find our initial staring index into the receiver group (wrappedIndex)
for (let i = startTargetIndex; i < endTargetIndex; i++) {
wrappedIndex = i
if (i >= transactionGroupSize) {
wrappedIndex = i - transactionGroupSize
}
targetNumber = (i + globalOffset) % receiverGroupSize
if (targetNumber === ourIndex) {
found = true
break
}
}
if (!found) {
//return empty array
return []
}


const destinationNodes: number[] = []
//this loop is at most O(k) where k is receiverGroupSize / sendGroupSize
//effectively it is constant time it is required so that a smaller
//group can send to a larger group
while (targetNumber < receiverGroupSize) {
//send all payload to this node
const destinationNode = wrappedIndex

destinationNodes.push(destinationNode)
//console.log(`sender ${ourIndex} send all payload to node ${destinationNode} targetNumber ${targetNumber} `)

// //in-place verification check
// let sendingNodeIndex = ourIndex
// let receivingNodeIndex = destinationNode
// //extra step here, remove in production
// verifySender(receivingNodeIndex, sendingNodeIndex)

//this part is a bit tricky.
//we are incrementing two indexes that control our loop
//wrapped index will have various corrections so that it can
//wrap past the end of a split span, or wrap within the range
//of the receiver group
targetNumber += sendGroupSize
wrappedIndex += sendGroupSize

//wrap to front of transaction group
if (wrappedIndex >= transactionGroupSize) {
wrappedIndex = wrappedIndex - transactionGroupSize
const normalizedSenderIndex = ourIndex % sendGroupSize

// Calculate logical position of first receiver
let logicalPosition = 0
let currentIndex = startTargetIndex

// Iterate through receiver group
for (let count = 0; count < receiverGroupSize; count++) {
// Calculate which sender this logical position maps to
const mappedSenderIndex = ((logicalPosition + globalOffset) % receiverGroupSize) % sendGroupSize

if (mappedSenderIndex === normalizedSenderIndex) {
destinationNodes.push(currentIndex)
}
//wrap to front of receiver group
if (wrappedIndex >= endTargetIndex) {
wrappedIndex = wrappedIndex - receiverGroupSize
}
//special case to stay in bounds when we have a split index and
//the unWrappedEndIndex is smaller than the start index.
//i.e. startTargetIndex = 45, endTargetIndex = 5 for a 50 node group
if (unWrappedEndIndex != -1 && wrappedIndex >= unWrappedEndIndex) {
const howFarPastUnWrapped = wrappedIndex - unWrappedEndIndex
wrappedIndex = startTargetIndex + howFarPastUnWrapped

// Move to next receiver
logicalPosition++
currentIndex++

// Handle wrap-around using modular arithmetic
if (currentIndex === transactionGroupSize) {
currentIndex = 0
}
}

if (logFlags.verbose) {
console.log(`note: ${note} destinationNodes ${destinationNodes}`)
console.log(`getCorrespondingNodes ${note} destinationNodes:${Utils.safeStringify(destinationNodes)}`)
}
return destinationNodes
}
Expand All @@ -108,35 +65,60 @@ export function verifyCorrespondingSender(
): boolean {
if (logFlags.verbose) {
console.log(
`verifyCorrespondingSender ${note} ${receivingNodeIndex} ${sendingNodeIndex} ${globalOffset} ${receiverGroupSize} ${sendGroupSize} ${receiverStartIndex} ${receiverEndIndex} ${transactionGroupSize}`
`verifyCorrespondingSender ${note} receivingNode:${receivingNodeIndex} sendingNode:${sendingNodeIndex} globalOffset:${globalOffset} receiverGroupSize:${receiverGroupSize} sendGroupSize:${sendGroupSize} receiverStart:${receiverStartIndex} receiverEnd:${receiverEndIndex} transactionGroupSize:${transactionGroupSize}`
)
}
//note, in the gather case, we need to check the address range of the sender node also, to prove
//that it does cover the given account range
let unwrappedReceivingNodeIndex = receivingNodeIndex

// handle case where receiver group is split (wraps around)
if (receiverStartIndex > unwrappedReceivingNodeIndex) {
unwrappedReceivingNodeIndex = unwrappedReceivingNodeIndex + transactionGroupSize
}
let unwrappedSendingNodeIndex = sendingNodeIndex
if (shouldUnwrapSender) {
unwrappedSendingNodeIndex = sendingNodeIndex + transactionGroupSize
}

// use unwrappedReceivingNodeIndex to calculate the target index
const targetIndex = ((unwrappedReceivingNodeIndex + globalOffset) % receiverGroupSize) % sendGroupSize
const targetIndex2 = unwrappedSendingNodeIndex % sendGroupSize
if (targetIndex === targetIndex2) {
if (logFlags.verbose)

// Calculate logical position of receiver in its group
let logicalPosition: number

if (receiverStartIndex <= receiverEndIndex) {
// Non-wrapped case
if (receivingNodeIndex >= receiverStartIndex && receivingNodeIndex < receiverEndIndex) {
logicalPosition = receivingNodeIndex - receiverStartIndex
} else {
// Receiver not in group
console.log(
`note: ${note} verification passed ${targetIndex} === ${targetIndex2} ${unwrappedSendingNodeIndex}->${receivingNodeIndex}`
`verifyCorrespondingSender ${note} receiver not in group receivingNode:${receivingNodeIndex} receiverStart:${receiverStartIndex} receiverEnd:${receiverEndIndex}`
)
return true

return false
}
} else {
console.log(
`note: ${note} X verification failed ${targetIndex} !== ${targetIndex2} sender: ${unwrappedSendingNodeIndex} receiver: ${receivingNodeIndex}`
)
return false
// Wrapped case
if (receivingNodeIndex >= receiverStartIndex) {
logicalPosition = receivingNodeIndex - receiverStartIndex
} else if (receivingNodeIndex < receiverEndIndex) {
logicalPosition = (transactionGroupSize - receiverStartIndex) + receivingNodeIndex
} else {
// Receiver not in group
console.log(
`verifyCorrespondingSender ${note} receiver not in group (wrapped case) receivingNode:${receivingNodeIndex} receiverStart:${receiverStartIndex} receiverEnd:${receiverEndIndex}`
)

return false
}
}
}

// Calculate expected sender using pure math
const expectedSenderIndex = ((logicalPosition + globalOffset) % receiverGroupSize) % sendGroupSize
const actualSenderIndex = sendingNodeIndex % sendGroupSize

const result = expectedSenderIndex === actualSenderIndex

if (logFlags.verbose) {
if (result) {
console.log(
`verifyCorrespondingSender ${note} verification passed expectedSender:${expectedSenderIndex} === actualSender:${actualSenderIndex} sender:${sendingNodeIndex}->receiver:${receivingNodeIndex}`
)
}
} else {
console.log(
`verifyCorrespondingSender ${note} verification failed expectedSender:${expectedSenderIndex} !== actualSender:${actualSenderIndex} sender:${sendingNodeIndex} receiver:${receivingNodeIndex}`
)
if (nestedCountersInstance) nestedCountersInstance.countEvent('verifyCorrespondingSender', 'failed')
}


return result
}
Loading
Loading