Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions sfn-s3vectors-rag-refresh-cdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Knowledge base refresh pipeline with AWS Step Functions and Amazon S3 Vectors

This pattern deploys an AWS Step Functions workflow that automates the ingestion of new documents into an Amazon S3 Vectors knowledge base so AI agents always answer from up-to-date information. When new documents land in S3, the workflow fans out via Distributed Map to generate vector embeddings using Amazon Bedrock and store them with `PutVectors` in parallel. After ingestion, `QueryVectors` validates that the new content is searchable, and a Choice state either confirms success or rolls back by deleting the newly added vectors if validation fails.

Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/sfn-s3vectors-rag-refresh-cdk](https://serverlessland.com/patterns/sfn-s3vectors-rag-refresh-cdk)

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI v2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) (latest available version) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/getting_started.html) (version 2.221.0 or later) installed and configured
* [Node.js 22.x](https://nodejs.org/) installed

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:

```bash
git clone https://github.com/aws-samples/serverless-patterns
```

1. Change directory to the pattern directory:

```bash
cd sfn-s3vectors-rag-refresh-cdk
```

1. Install the project dependencies:

```bash
npm install
```

1. Install the Lambda dependencies:

```bash
cd lambda && npm install && cd ..
```

1. Deploy the CDK stack:

```bash
cdk deploy
```

Note: Deploy to your default AWS region. Please refer to the [AWS capabilities explorer](https://builder.aws.com/build/capabilities/explore) for feature availability in your desired region.

1. Note the outputs from the CDK deployment process. These contain the resource names used for testing.

## How it works

This pattern creates a single stack with the following resources:

1. **S3 Document Bucket** — Stores the source documents to be ingested. Upload files to the `documents/` prefix.

2. **S3 Vectors Bucket & Index** — An S3 Vectors vector bucket with a `knowledge-base` index configured for 1024-dimensional cosine similarity (matching Amazon Titan Text Embeddings V2 output).

3. **Step Functions State Machine** — Orchestrates the full ingestion pipeline:
- **Distributed Map** fans out over every object under `s3://<bucket>/documents/`, processing up to 40 documents concurrently
- For each document, the **EmbedAndStore** Lambda reads the file, calls Amazon Bedrock Titan Text Embeddings V2 to generate a 1024-dimensional vector, and writes it to the S3 Vectors index via `PutVectors` with the source file path as metadata
- **ValidateIngestion** Lambda fetches the Distributed Map result manifest from S3, collects all vector keys from the SUCCEEDED results, generates a probe embedding, and calls `QueryVectors` to confirm at least one newly ingested vector is returned
- A **Choice** state checks the validation result: on success the workflow completes; on failure the **RollbackVectors** Lambda calls `DeleteVectors` to remove all newly added vectors, then the workflow fails

## Architecture

![State Machine](state-machine.png)

## Testing

After deployment, upload sample documents and start the workflow.

### Upload test documents

```bash
BUCKET=$(aws cloudformation describe-stacks \
--stack-name RagRefreshStack \
--query "Stacks[0].Outputs[?OutputKey=='DocumentBucketName'].OutputValue" \
--output text)

echo "Amazon S3 Vectors is a new vector storage capability." > /tmp/doc1.txt
echo "Step Functions Distributed Map enables parallel processing at scale." > /tmp/doc2.txt

aws s3 cp /tmp/doc1.txt s3://$BUCKET/documents/doc1.txt
aws s3 cp /tmp/doc2.txt s3://$BUCKET/documents/doc2.txt
```

### Start the workflow

```bash
STATE_MACHINE_ARN=$(aws cloudformation describe-stacks \
--stack-name RagRefreshStack \
--query "Stacks[0].Outputs[?OutputKey=='StateMachineArn'].OutputValue" \
--output text)

aws stepfunctions start-execution \
--state-machine-arn $STATE_MACHINE_ARN
```

### Monitor execution

```bash
aws stepfunctions list-executions \
--state-machine-arn $STATE_MACHINE_ARN \
--max-results 1
```

### Expected result

The workflow should complete successfully. In the Step Functions console you'll see:
1. Distributed Map processed both documents in parallel
2. Each document was embedded and stored as a vector
3. Validation confirmed the vectors are queryable
4. The workflow reached the `IngestionSucceeded` state

## Cleanup

1. Delete the stack:

```bash
cdk destroy
```

1. Confirm the stack has been deleted:

```bash
aws cloudformation list-stacks --stack-status-filter DELETE_COMPLETE
```

----
Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
13 changes: 13 additions & 0 deletions sfn-s3vectors-rag-refresh-cdk/bin/app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from 'aws-cdk-lib';
import { RagRefreshStack } from '../lib/rag-refresh-stack';

const app = new cdk.App();

new RagRefreshStack(app, 'RagRefreshStack', {
env: {
account: process.env.CDK_DEFAULT_ACCOUNT,
region: process.env.AWS_REGION,
},
});
8 changes: 8 additions & 0 deletions sfn-s3vectors-rag-refresh-cdk/cdk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"app": "npx ts-node --prefer-ts-exts bin/app.ts",
"context": {
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
"@aws-cdk/core:checkSecretUsage": true,
"@aws-cdk/core:target-partitions": ["aws", "aws-cn"]
}
}
68 changes: 68 additions & 0 deletions sfn-s3vectors-rag-refresh-cdk/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"title": "Knowledge base refresh pipeline with AWS Step Functions & Amazon S3 Vectors",
"description": "Automate ingestion of new documents into an Amazon S3 Vectors knowledge base using AWS Step Functions Distributed Map with validation",
"language": "TypeScript",
"level": "300",
"framework": "AWS CDK",
"introBox": {
"headline": "How it works",
"text": [
"When new documents land in an S3 bucket, a Step Functions workflow fans out via Distributed Map to process each document in parallel.",
"For each document, a Lambda function reads the content, generates vector embeddings using Amazon Bedrock, and stores them with PutVectors in the S3 Vectors vector bucket.",
"After ingestion completes, a validation step uses QueryVectors to confirm the new content is searchable. A Choice state either confirms success or rolls back by deleting the newly added vectors if validation fails."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sfn-s3vectors-rag-refresh-cdk",
"templateURL": "serverless-patterns/sfn-s3vectors-rag-refresh-cdk",
"projectFolder": "sfn-s3vectors-rag-refresh-cdk",
"templateFile": "lib/rag-refresh-stack.ts"
}
},
"resources": {
"bullets": [
{
"text": "Amazon S3 Vectors documentation",
"link": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/vectors.html"
},
{
"text": "AWS Step Functions Distributed Map",
"link": "https://docs.aws.amazon.com/step-functions/latest/dg/concepts-asl-use-map-state-distributed.html"
},
{
"text": "Amazon Bedrock Embeddings",
"link": "https://docs.aws.amazon.com/bedrock/latest/userguide/embeddings.html"
},
{
"text": "AWS CDK Developer Guide",
"link": "https://docs.aws.amazon.com/cdk/latest/guide/"
}
]
},
"deploy": {
"text": [
"npm install",
"cd lambda && npm install && cd ..",
"cdk deploy"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>cdk destroy</code>."
]
},
"authors": [
{
"name": "Ben Freiberg",
"image": "https://serverlessland.com/assets/images/resources/contributors/ben-freiberg.jpg",
"bio": "Ben is a Senior Solutions Architect at Amazon Web Services (AWS) based in Frankfurt, Germany.",
"linkedin": "benfreiberg"
}
]
}
51 changes: 51 additions & 0 deletions sfn-s3vectors-rag-refresh-cdk/lambda/embed.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import { BedrockRuntimeClient, InvokeModelCommand } from '@aws-sdk/client-bedrock-runtime';
import { S3VectorsClient, PutVectorsCommand } from '@aws-sdk/client-s3vectors';

const s3 = new S3Client();
const bedrock = new BedrockRuntimeClient();
const s3vectors = new S3VectorsClient();

const DOCUMENT_BUCKET = process.env.DOCUMENT_BUCKET!;
const VECTOR_BUCKET_NAME = process.env.VECTOR_BUCKET_NAME!;
const VECTOR_INDEX_NAME = process.env.VECTOR_INDEX_NAME!;
const MODEL_ID = 'amazon.titan-embed-text-v2:0';

interface S3ItemEvent {
Key: string;
}

export async function handler(event: S3ItemEvent) {
const { Key } = event;

// 1. Read document from S3
const getObj = await s3.send(new GetObjectCommand({
Bucket: DOCUMENT_BUCKET,
Key,
}));
const text = await getObj.Body!.transformToString('utf-8');

// 2. Generate embedding via Bedrock Titan Embeddings V2
const invokeResp = await bedrock.send(new InvokeModelCommand({
modelId: MODEL_ID,
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({ inputText: text, dimensions: 1024, normalize: true }),
}));
const embeddingResult = JSON.parse(new TextDecoder().decode(invokeResp.body));
const embedding: number[] = embeddingResult.embedding;

// 3. Store vector with PutVectors
const vectorKey = Key.replace(/\//g, '_');
await s3vectors.send(new PutVectorsCommand({
vectorBucketName: VECTOR_BUCKET_NAME,
indexName: VECTOR_INDEX_NAME,
vectors: [{
key: vectorKey,
data: { float32: embedding },
metadata: { source: Key },
}],
}));

return { vectorKey, documentKey: Key };
}
10 changes: 10 additions & 0 deletions sfn-s3vectors-rag-refresh-cdk/lambda/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"name": "rag-refresh-lambdas",
"version": "1.0.0",
"main": "index.js",
"dependencies": {
"@aws-sdk/client-s3": "^3.700.0",
"@aws-sdk/client-bedrock-runtime": "^3.700.0",
"@aws-sdk/client-s3vectors": "^3.700.0"
}
}
27 changes: 27 additions & 0 deletions sfn-s3vectors-rag-refresh-cdk/lambda/rollback.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { S3VectorsClient, DeleteVectorsCommand } from '@aws-sdk/client-s3vectors';

const s3vectors = new S3VectorsClient();

const VECTOR_BUCKET_NAME = process.env.VECTOR_BUCKET_NAME!;
const VECTOR_INDEX_NAME = process.env.VECTOR_INDEX_NAME!;

interface RollbackEvent {
vectorKeys: string[];
}

export async function handler(event: RollbackEvent) {
const { vectorKeys } = event;

// DeleteVectors accepts up to 500 keys per call
const BATCH_SIZE = 500;
for (let i = 0; i < vectorKeys.length; i += BATCH_SIZE) {
const batch = vectorKeys.slice(i, i + BATCH_SIZE);
await s3vectors.send(new DeleteVectorsCommand({
vectorBucketName: VECTOR_BUCKET_NAME,
indexName: VECTOR_INDEX_NAME,
keys: batch,
}));
}

return { deleted: vectorKeys.length };
}
17 changes: 17 additions & 0 deletions sfn-s3vectors-rag-refresh-cdk/lambda/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "commonjs",
"moduleResolution": "node",
"lib": ["es2022"],
"outDir": ".",
"rootDir": ".",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true
},
"include": ["*.ts"],
"exclude": ["node_modules"]
}
Loading