diff --git a/sfn-s3vectors-rag-refresh-cdk/README.md b/sfn-s3vectors-rag-refresh-cdk/README.md new file mode 100644 index 000000000..9f5ea8938 --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/README.md @@ -0,0 +1,135 @@ +# Knowledge base refresh pipeline with AWS Step Functions and Amazon S3 Vectors + +This pattern deploys an AWS Step Functions workflow that automates the ingestion of new documents into an Amazon S3 Vectors knowledge base so AI agents always answer from up-to-date information. When new documents land in S3, the workflow fans out via Distributed Map to generate vector embeddings using Amazon Bedrock and store them with `PutVectors` in parallel. After ingestion, `QueryVectors` validates that the new content is searchable, and a Choice state either confirms success or rolls back by deleting the newly added vectors if validation fails. + +Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/sfn-s3vectors-rag-refresh-cdk](https://serverlessland.com/patterns/sfn-s3vectors-rag-refresh-cdk) + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI v2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) (latest available version) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/getting_started.html) (version 2.221.0 or later) installed and configured +* [Node.js 22.x](https://nodejs.org/) installed + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + + ```bash + git clone https://github.com/aws-samples/serverless-patterns + ``` + +1. Change directory to the pattern directory: + + ```bash + cd sfn-s3vectors-rag-refresh-cdk + ``` + +1. Install the project dependencies: + + ```bash + npm install + ``` + +1. Install the Lambda dependencies: + + ```bash + cd lambda && npm install && cd .. + ``` + +1. Deploy the CDK stack: + + ```bash + cdk deploy + ``` + + Note: Deploy to your default AWS region. Please refer to the [AWS capabilities explorer](https://builder.aws.com/build/capabilities/explore) for feature availability in your desired region. + +1. Note the outputs from the CDK deployment process. These contain the resource names used for testing. + +## How it works + +This pattern creates a single stack with the following resources: + +1. **S3 Document Bucket** — Stores the source documents to be ingested. Upload files to the `documents/` prefix. + +2. **S3 Vectors Bucket & Index** — An S3 Vectors vector bucket with a `knowledge-base` index configured for 1024-dimensional cosine similarity (matching Amazon Titan Text Embeddings V2 output). + +3. **Step Functions State Machine** — Orchestrates the full ingestion pipeline: + - **Distributed Map** fans out over every object under `s3:///documents/`, processing up to 40 documents concurrently + - For each document, the **EmbedAndStore** Lambda reads the file, calls Amazon Bedrock Titan Text Embeddings V2 to generate a 1024-dimensional vector, and writes it to the S3 Vectors index via `PutVectors` with the source file path as metadata + - **ValidateIngestion** Lambda fetches the Distributed Map result manifest from S3, collects all vector keys from the SUCCEEDED results, generates a probe embedding, and calls `QueryVectors` to confirm at least one newly ingested vector is returned + - A **Choice** state checks the validation result: on success the workflow completes; on failure the **RollbackVectors** Lambda calls `DeleteVectors` to remove all newly added vectors, then the workflow fails + +## Architecture + +![State Machine](state-machine.png) + +## Testing + +After deployment, upload sample documents and start the workflow. + +### Upload test documents + +```bash +BUCKET=$(aws cloudformation describe-stacks \ + --stack-name RagRefreshStack \ + --query "Stacks[0].Outputs[?OutputKey=='DocumentBucketName'].OutputValue" \ + --output text) + +echo "Amazon S3 Vectors is a new vector storage capability." > /tmp/doc1.txt +echo "Step Functions Distributed Map enables parallel processing at scale." > /tmp/doc2.txt + +aws s3 cp /tmp/doc1.txt s3://$BUCKET/documents/doc1.txt +aws s3 cp /tmp/doc2.txt s3://$BUCKET/documents/doc2.txt +``` + +### Start the workflow + +```bash +STATE_MACHINE_ARN=$(aws cloudformation describe-stacks \ + --stack-name RagRefreshStack \ + --query "Stacks[0].Outputs[?OutputKey=='StateMachineArn'].OutputValue" \ + --output text) + +aws stepfunctions start-execution \ + --state-machine-arn $STATE_MACHINE_ARN +``` + +### Monitor execution + +```bash +aws stepfunctions list-executions \ + --state-machine-arn $STATE_MACHINE_ARN \ + --max-results 1 +``` + +### Expected result + +The workflow should complete successfully. In the Step Functions console you'll see: +1. Distributed Map processed both documents in parallel +2. Each document was embedded and stored as a vector +3. Validation confirmed the vectors are queryable +4. The workflow reached the `IngestionSucceeded` state + +## Cleanup + +1. Delete the stack: + + ```bash + cdk destroy + ``` + +1. Confirm the stack has been deleted: + + ```bash + aws cloudformation list-stacks --stack-status-filter DELETE_COMPLETE + ``` + +---- +Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/sfn-s3vectors-rag-refresh-cdk/bin/app.ts b/sfn-s3vectors-rag-refresh-cdk/bin/app.ts new file mode 100644 index 000000000..8fde85eb9 --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/bin/app.ts @@ -0,0 +1,13 @@ +#!/usr/bin/env node +import 'source-map-support/register'; +import * as cdk from 'aws-cdk-lib'; +import { RagRefreshStack } from '../lib/rag-refresh-stack'; + +const app = new cdk.App(); + +new RagRefreshStack(app, 'RagRefreshStack', { + env: { + account: process.env.CDK_DEFAULT_ACCOUNT, + region: process.env.AWS_REGION, + }, +}); diff --git a/sfn-s3vectors-rag-refresh-cdk/cdk.json b/sfn-s3vectors-rag-refresh-cdk/cdk.json new file mode 100644 index 000000000..e6475b158 --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/cdk.json @@ -0,0 +1,8 @@ +{ + "app": "npx ts-node --prefer-ts-exts bin/app.ts", + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": ["aws", "aws-cn"] + } +} diff --git a/sfn-s3vectors-rag-refresh-cdk/example-pattern.json b/sfn-s3vectors-rag-refresh-cdk/example-pattern.json new file mode 100644 index 000000000..75b522bf0 --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/example-pattern.json @@ -0,0 +1,68 @@ +{ + "title": "Knowledge base refresh pipeline with AWS Step Functions & Amazon S3 Vectors", + "description": "Automate ingestion of new documents into an Amazon S3 Vectors knowledge base using AWS Step Functions Distributed Map with validation", + "language": "TypeScript", + "level": "300", + "framework": "AWS CDK", + "introBox": { + "headline": "How it works", + "text": [ + "When new documents land in an S3 bucket, a Step Functions workflow fans out via Distributed Map to process each document in parallel.", + "For each document, a Lambda function reads the content, generates vector embeddings using Amazon Bedrock, and stores them with PutVectors in the S3 Vectors vector bucket.", + "After ingestion completes, a validation step uses QueryVectors to confirm the new content is searchable. A Choice state either confirms success or rolls back by deleting the newly added vectors if validation fails." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sfn-s3vectors-rag-refresh-cdk", + "templateURL": "serverless-patterns/sfn-s3vectors-rag-refresh-cdk", + "projectFolder": "sfn-s3vectors-rag-refresh-cdk", + "templateFile": "lib/rag-refresh-stack.ts" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon S3 Vectors documentation", + "link": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/vectors.html" + }, + { + "text": "AWS Step Functions Distributed Map", + "link": "https://docs.aws.amazon.com/step-functions/latest/dg/concepts-asl-use-map-state-distributed.html" + }, + { + "text": "Amazon Bedrock Embeddings", + "link": "https://docs.aws.amazon.com/bedrock/latest/userguide/embeddings.html" + }, + { + "text": "AWS CDK Developer Guide", + "link": "https://docs.aws.amazon.com/cdk/latest/guide/" + } + ] + }, + "deploy": { + "text": [ + "npm install", + "cd lambda && npm install && cd ..", + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: cdk destroy." + ] + }, + "authors": [ + { + "name": "Ben Freiberg", + "image": "https://serverlessland.com/assets/images/resources/contributors/ben-freiberg.jpg", + "bio": "Ben is a Senior Solutions Architect at Amazon Web Services (AWS) based in Frankfurt, Germany.", + "linkedin": "benfreiberg" + } + ] +} \ No newline at end of file diff --git a/sfn-s3vectors-rag-refresh-cdk/lambda/embed.ts b/sfn-s3vectors-rag-refresh-cdk/lambda/embed.ts new file mode 100644 index 000000000..2f437d66d --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/lambda/embed.ts @@ -0,0 +1,51 @@ +import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; +import { BedrockRuntimeClient, InvokeModelCommand } from '@aws-sdk/client-bedrock-runtime'; +import { S3VectorsClient, PutVectorsCommand } from '@aws-sdk/client-s3vectors'; + +const s3 = new S3Client(); +const bedrock = new BedrockRuntimeClient(); +const s3vectors = new S3VectorsClient(); + +const DOCUMENT_BUCKET = process.env.DOCUMENT_BUCKET!; +const VECTOR_BUCKET_NAME = process.env.VECTOR_BUCKET_NAME!; +const VECTOR_INDEX_NAME = process.env.VECTOR_INDEX_NAME!; +const MODEL_ID = 'amazon.titan-embed-text-v2:0'; + +interface S3ItemEvent { + Key: string; +} + +export async function handler(event: S3ItemEvent) { + const { Key } = event; + + // 1. Read document from S3 + const getObj = await s3.send(new GetObjectCommand({ + Bucket: DOCUMENT_BUCKET, + Key, + })); + const text = await getObj.Body!.transformToString('utf-8'); + + // 2. Generate embedding via Bedrock Titan Embeddings V2 + const invokeResp = await bedrock.send(new InvokeModelCommand({ + modelId: MODEL_ID, + contentType: 'application/json', + accept: 'application/json', + body: JSON.stringify({ inputText: text, dimensions: 1024, normalize: true }), + })); + const embeddingResult = JSON.parse(new TextDecoder().decode(invokeResp.body)); + const embedding: number[] = embeddingResult.embedding; + + // 3. Store vector with PutVectors + const vectorKey = Key.replace(/\//g, '_'); + await s3vectors.send(new PutVectorsCommand({ + vectorBucketName: VECTOR_BUCKET_NAME, + indexName: VECTOR_INDEX_NAME, + vectors: [{ + key: vectorKey, + data: { float32: embedding }, + metadata: { source: Key }, + }], + })); + + return { vectorKey, documentKey: Key }; +} diff --git a/sfn-s3vectors-rag-refresh-cdk/lambda/package.json b/sfn-s3vectors-rag-refresh-cdk/lambda/package.json new file mode 100644 index 000000000..3b5e3d647 --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/lambda/package.json @@ -0,0 +1,10 @@ +{ + "name": "rag-refresh-lambdas", + "version": "1.0.0", + "main": "index.js", + "dependencies": { + "@aws-sdk/client-s3": "^3.700.0", + "@aws-sdk/client-bedrock-runtime": "^3.700.0", + "@aws-sdk/client-s3vectors": "^3.700.0" + } +} diff --git a/sfn-s3vectors-rag-refresh-cdk/lambda/rollback.ts b/sfn-s3vectors-rag-refresh-cdk/lambda/rollback.ts new file mode 100644 index 000000000..b9e953c6e --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/lambda/rollback.ts @@ -0,0 +1,27 @@ +import { S3VectorsClient, DeleteVectorsCommand } from '@aws-sdk/client-s3vectors'; + +const s3vectors = new S3VectorsClient(); + +const VECTOR_BUCKET_NAME = process.env.VECTOR_BUCKET_NAME!; +const VECTOR_INDEX_NAME = process.env.VECTOR_INDEX_NAME!; + +interface RollbackEvent { + vectorKeys: string[]; +} + +export async function handler(event: RollbackEvent) { + const { vectorKeys } = event; + + // DeleteVectors accepts up to 500 keys per call + const BATCH_SIZE = 500; + for (let i = 0; i < vectorKeys.length; i += BATCH_SIZE) { + const batch = vectorKeys.slice(i, i + BATCH_SIZE); + await s3vectors.send(new DeleteVectorsCommand({ + vectorBucketName: VECTOR_BUCKET_NAME, + indexName: VECTOR_INDEX_NAME, + keys: batch, + })); + } + + return { deleted: vectorKeys.length }; +} diff --git a/sfn-s3vectors-rag-refresh-cdk/lambda/tsconfig.json b/sfn-s3vectors-rag-refresh-cdk/lambda/tsconfig.json new file mode 100644 index 000000000..8d85f87f2 --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/lambda/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "commonjs", + "moduleResolution": "node", + "lib": ["es2022"], + "outDir": ".", + "rootDir": ".", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true + }, + "include": ["*.ts"], + "exclude": ["node_modules"] +} diff --git a/sfn-s3vectors-rag-refresh-cdk/lambda/validate.ts b/sfn-s3vectors-rag-refresh-cdk/lambda/validate.ts new file mode 100644 index 000000000..77f45656b --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/lambda/validate.ts @@ -0,0 +1,62 @@ +import { BedrockRuntimeClient, InvokeModelCommand } from '@aws-sdk/client-bedrock-runtime'; +import { S3VectorsClient, QueryVectorsCommand } from '@aws-sdk/client-s3vectors'; +import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; + +const bedrock = new BedrockRuntimeClient(); +const s3vectors = new S3VectorsClient(); +const s3 = new S3Client(); + +const VECTOR_BUCKET_NAME = process.env.VECTOR_BUCKET_NAME!; +const VECTOR_INDEX_NAME = process.env.VECTOR_INDEX_NAME!; +const MODEL_ID = 'amazon.titan-embed-text-v2:0'; + +interface ResultWriterEvent { + ResultWriterDetails: { Bucket: string; Key: string }; +} + +export async function handler(event: ResultWriterEvent) { + // Read the Distributed Map result manifest from S3 + const { Bucket, Key } = event.ResultWriterDetails; + const manifestResp = await s3.send(new GetObjectCommand({ Bucket, Key })); + const manifest = JSON.parse(await manifestResp.Body!.transformToString()); + + // Read succeeded results file(s) to extract vector keys + const destBucket = manifest.DestinationBucket; + const vectorKeys: string[] = []; + for (const result of manifest.ResultFiles?.SUCCEEDED ?? []) { + const fileResp = await s3.send(new GetObjectCommand({ Bucket: destBucket, Key: result.Key })); + const results = JSON.parse(await fileResp.Body!.transformToString()); + for (const r of results) { + const output = typeof r.Output === 'string' ? JSON.parse(r.Output) : r.Output; + if (output?.vectorKey) vectorKeys.push(output.vectorKey); + } + } + + if (vectorKeys.length === 0) { + return { valid: false, vectorKeys }; + } + + // Generate a probe embedding and query the index + const probeText = 'vector storage and embeddings for AI applications'; + const invokeResp = await bedrock.send(new InvokeModelCommand({ + modelId: MODEL_ID, + contentType: 'application/json', + accept: 'application/json', + body: JSON.stringify({ inputText: probeText, dimensions: 1024, normalize: true }), + })); + const probeEmbedding: number[] = JSON.parse(new TextDecoder().decode(invokeResp.body)).embedding; + + const queryResp = await s3vectors.send(new QueryVectorsCommand({ + vectorBucketName: VECTOR_BUCKET_NAME, + indexName: VECTOR_INDEX_NAME, + queryVector: { float32: probeEmbedding }, + topK: 5, + returnMetadata: true, + })); + + // Validate: at least one newly ingested vector appears in results + const returnedKeys = new Set(queryResp.vectors?.map(v => v.key) ?? []); + const found = vectorKeys.some(k => returnedKeys.has(k)); + + return { valid: found, vectorKeys }; +} diff --git a/sfn-s3vectors-rag-refresh-cdk/lib/rag-refresh-stack.ts b/sfn-s3vectors-rag-refresh-cdk/lib/rag-refresh-stack.ts new file mode 100644 index 000000000..b3ade6c8b --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/lib/rag-refresh-stack.ts @@ -0,0 +1,250 @@ +import * as cdk from 'aws-cdk-lib'; +import * as s3 from 'aws-cdk-lib/aws-s3'; +import * as s3vectors from 'aws-cdk-lib/aws-s3vectors'; +import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import * as iam from 'aws-cdk-lib/aws-iam'; +import * as logs from 'aws-cdk-lib/aws-logs'; +import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'; +import * as path from 'path'; +import { Construct } from 'constructs'; + +// Titan Text Embeddings V2 produces 1024-dimensional vectors by default +const EMBEDDING_DIMENSION = 1024; + +export class RagRefreshStack extends cdk.Stack { + constructor(scope: Construct, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + // --- S3 bucket for source documents --- + const documentBucket = new s3.Bucket(this, 'DocumentBucket', { + removalPolicy: cdk.RemovalPolicy.DESTROY, + autoDeleteObjects: true, + }); + + // --- S3 Vectors resources --- + const vectorBucket = new s3vectors.CfnVectorBucket(this, 'VectorBucket', { + vectorBucketName: `rag-vectors-${cdk.Aws.ACCOUNT_ID}-${cdk.Aws.REGION}`, + }); + + const vectorIndex = new s3vectors.CfnIndex(this, 'VectorIndex', { + vectorBucketName: vectorBucket.vectorBucketName!, + indexName: 'knowledge-base', + dataType: 'float32', + dimension: EMBEDDING_DIMENSION, + distanceMetric: 'cosine', + }); + vectorIndex.addDependency(vectorBucket); + + // --- Lambda: embed & store vectors --- + const embedFunction = new NodejsFunction(this, 'EmbedFunction', { + functionName: 'ragEmbedAndStore', + runtime: lambda.Runtime.NODEJS_22_X, + entry: path.join(__dirname, '../lambda/embed.ts'), + handler: 'handler', + timeout: cdk.Duration.minutes(5), + memorySize: 512, + environment: { + DOCUMENT_BUCKET: documentBucket.bucketName, + VECTOR_BUCKET_NAME: vectorBucket.vectorBucketName!, + VECTOR_INDEX_NAME: vectorIndex.indexName!, + }, + }); + + documentBucket.grantRead(embedFunction); + embedFunction.addToRolePolicy(new iam.PolicyStatement({ + actions: ['bedrock:InvokeModel'], + resources: [ + 'arn:aws:bedrock:*::foundation-model/amazon.titan-embed-text-v2:0', + ], + })); + embedFunction.addToRolePolicy(new iam.PolicyStatement({ + actions: ['s3vectors:PutVectors'], + resources: ['*'], + })); + + // --- Lambda: validate vectors via QueryVectors --- + const validateFunction = new NodejsFunction(this, 'ValidateFunction', { + functionName: 'ragValidateVectors', + runtime: lambda.Runtime.NODEJS_22_X, + entry: path.join(__dirname, '../lambda/validate.ts'), + handler: 'handler', + timeout: cdk.Duration.minutes(1), + memorySize: 256, + environment: { + VECTOR_BUCKET_NAME: vectorBucket.vectorBucketName!, + VECTOR_INDEX_NAME: vectorIndex.indexName!, + }, + }); + + documentBucket.grantRead(validateFunction); + validateFunction.addToRolePolicy(new iam.PolicyStatement({ + actions: ['bedrock:InvokeModel'], + resources: [ + 'arn:aws:bedrock:*::foundation-model/amazon.titan-embed-text-v2:0', + ], + })); + validateFunction.addToRolePolicy(new iam.PolicyStatement({ + actions: ['s3vectors:QueryVectors', 's3vectors:GetVectors'], + resources: ['*'], + })); + + // --- Lambda: rollback (delete vectors) --- + const rollbackFunction = new NodejsFunction(this, 'RollbackFunction', { + functionName: 'ragRollbackVectors', + runtime: lambda.Runtime.NODEJS_22_X, + entry: path.join(__dirname, '../lambda/rollback.ts'), + handler: 'handler', + timeout: cdk.Duration.minutes(2), + memorySize: 256, + environment: { + VECTOR_BUCKET_NAME: vectorBucket.vectorBucketName!, + VECTOR_INDEX_NAME: vectorIndex.indexName!, + }, + }); + + rollbackFunction.addToRolePolicy(new iam.PolicyStatement({ + actions: ['s3vectors:DeleteVectors'], + resources: ['*'], + })); + + // --- Step Functions workflow (JSONata) --- + + const definition = { + QueryLanguage: 'JSONata', + StartAt: 'ProcessDocuments', + States: { + ProcessDocuments: { + Type: 'Map', + ItemProcessor: { + ProcessorConfig: { + Mode: 'DISTRIBUTED', + ExecutionType: 'STANDARD', + }, + StartAt: 'EmbedAndStore', + States: { + EmbedAndStore: { + Type: 'Task', + Resource: 'arn:aws:states:::lambda:invoke', + Arguments: { + FunctionName: embedFunction.functionArn, + Payload: '{% $states.input %}', + }, + Output: { + vectorKey: '{% $states.result.Payload.vectorKey %}', + documentKey: '{% $states.result.Payload.documentKey %}', + }, + End: true, + }, + }, + }, + ItemReader: { + Resource: 'arn:aws:states:::s3:listObjectsV2', + Arguments: { + Bucket: documentBucket.bucketName, + Prefix: 'documents/', + }, + }, + MaxConcurrency: 40, + ResultWriter: { + Resource: 'arn:aws:states:::s3:putObject', + Arguments: { + Bucket: documentBucket.bucketName, + Prefix: 'results/', + }, + }, + Next: 'ValidateIngestion', + }, + ValidateIngestion: { + Type: 'Task', + Resource: 'arn:aws:states:::lambda:invoke', + Arguments: { + FunctionName: validateFunction.functionArn, + Payload: '{% $states.input %}', + }, + Output: { + valid: '{% $states.result.Payload.valid %}', + vectorKeys: '{% $states.result.Payload.vectorKeys %}', + }, + Next: 'ValidationPassed', + }, + ValidationPassed: { + Type: 'Choice', + Choices: [ + { + Condition: '{% $states.input.valid %}', + Next: 'IngestionSucceeded', + }, + ], + Default: 'RollbackVectors', + }, + IngestionSucceeded: { + Type: 'Succeed', + }, + RollbackVectors: { + Type: 'Task', + Resource: 'arn:aws:states:::lambda:invoke', + Arguments: { + FunctionName: rollbackFunction.functionArn, + Payload: { + vectorKeys: '{% $states.input.vectorKeys %}', + }, + }, + Next: 'IngestionFailed', + }, + IngestionFailed: { + Type: 'Fail', + Cause: 'Validation failed — vectors rolled back', + }, + }, + }; + + const logGroup = new logs.LogGroup(this, 'StateMachineLogGroup', { + logGroupName: '/aws/stepfunctions/rag-refresh', + retention: logs.RetentionDays.ONE_WEEK, + removalPolicy: cdk.RemovalPolicy.DESTROY, + }); + + const stateMachine = new sfn.StateMachine(this, 'RagRefreshStateMachine', { + stateMachineName: 'rag-knowledge-base-refresh', + definitionBody: sfn.DefinitionBody.fromString(JSON.stringify(definition)), + timeout: cdk.Duration.hours(1), + logs: { + destination: logGroup, + level: sfn.LogLevel.ALL, + }, + }); + + // Grant the state machine permission to read the S3 bucket (for Distributed Map) + documentBucket.grantRead(stateMachine); + documentBucket.grantReadWrite(stateMachine); + + // Grant the state machine permission to invoke the Lambda functions + embedFunction.grantInvoke(stateMachine); + validateFunction.grantInvoke(stateMachine); + rollbackFunction.grantInvoke(stateMachine); + + // Distributed Map (STANDARD child executions) needs StartExecution + DescribeExecution + StopExecution on itself + // Use a constructed ARN to avoid circular dependency between the state machine and its role policy + stateMachine.addToRolePolicy(new iam.PolicyStatement({ + actions: ['states:StartExecution', 'states:DescribeExecution', 'states:StopExecution'], + resources: [`arn:aws:states:${this.region}:${this.account}:stateMachine:rag-knowledge-base-refresh`], + })); + + // --- Outputs --- + new cdk.CfnOutput(this, 'DocumentBucketName', { + value: documentBucket.bucketName, + description: 'Upload documents to s3:///documents/', + }); + + new cdk.CfnOutput(this, 'VectorBucketName', { + value: vectorBucket.vectorBucketName!, + description: 'S3 Vectors bucket name', + }); + + new cdk.CfnOutput(this, 'StateMachineArn', { + value: stateMachine.stateMachineArn, + description: 'Step Functions state machine ARN', + }); + } +} diff --git a/sfn-s3vectors-rag-refresh-cdk/package.json b/sfn-s3vectors-rag-refresh-cdk/package.json new file mode 100644 index 000000000..b7b7629a6 --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/package.json @@ -0,0 +1,21 @@ +{ + "name": "sfn-s3vectors-rag-refresh-cdk", + "version": "1.0.0", + "description": "Step Functions RAG knowledge base refresh pipeline with S3 Vectors and Bedrock embeddings", + "scripts": { + "build": "tsc", + "cdk": "cdk", + "deploy": "cdk deploy", + "synth": "cdk synth" + }, + "devDependencies": { + "@types/node": "^20.0.0", + "aws-cdk": "^2.221.0", + "typescript": "^5.0.0" + }, + "dependencies": { + "aws-cdk-lib": "^2.221.0", + "constructs": "^10.0.0", + "source-map-support": "^0.5.21" + } +} diff --git a/sfn-s3vectors-rag-refresh-cdk/state-machine.png b/sfn-s3vectors-rag-refresh-cdk/state-machine.png new file mode 100644 index 000000000..23b38060b Binary files /dev/null and b/sfn-s3vectors-rag-refresh-cdk/state-machine.png differ diff --git a/sfn-s3vectors-rag-refresh-cdk/tsconfig.json b/sfn-s3vectors-rag-refresh-cdk/tsconfig.json new file mode 100644 index 000000000..9ed0a3f75 --- /dev/null +++ b/sfn-s3vectors-rag-refresh-cdk/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": ["es2020"], + "declaration": true, + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "noImplicitThis": true, + "alwaysStrict": true, + "noUnusedLocals": false, + "noUnusedParameters": false, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": false, + "inlineSourceMap": true, + "inlineSources": true, + "experimentalDecorators": true, + "strictPropertyInitialization": false, + "typeRoots": ["./node_modules/@types"], + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true + }, + "exclude": ["node_modules", "cdk.out", "lambda"] +}