Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/environments/values.dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ features:
oidcAuth: true

autoscaling:
enabled: false
enabled: true
pdb:
enabled: false
enabled: true

config:
enabled: true
Expand Down
5 changes: 5 additions & 0 deletions .github/environments/values.pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ features:
basicAuth: true
oidcAuth: true
defaultBucket: false

autoscaling:
enabled: true
pdb:
enabled: true
#
#
# We don't deploy a postgrescluster for PR's
Expand Down
3 changes: 0 additions & 3 deletions .github/environments/values.prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ postgres:
requests:
cpu: 256m
memory: 512Mi
limits:
cpu: 512m
memory: 1024Mi
sidecars:
replicaCertCopy:
resources:
Expand Down
3 changes: 0 additions & 3 deletions .github/environments/values.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ postgres:
requests:
cpu: 256m
memory: 512Mi
limits:
cpu: 512m
memory: 1024Mi
sidecars:
replicaCertCopy:
resources:
Expand Down
20 changes: 11 additions & 9 deletions app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,17 @@ The following variables enable and enforce the use of Basic Authentication for r

The following variables configure the use of a backend database to support user-based access control, tagging and other advanced features

| Config Var | Env Var | Default | Notes |
| ---------- | ------------- | --------- | --------------------------- |
| `database` | `DB_DATABASE` | coms | COMS database name |
| `host` | `DB_HOST` | localhost | Database conection hostname |
| `username` | `DB_USERNAME` | app | Database account username |
| `password` | `DB_PASSWORD` | | Database account password |
| `port` | `DB_PORT` | 5432 | Database connection port |
| `poolMin` | `DB_POOL_MIN` | 2 | avalable connections |
| `poolMax` | `DB_POOL_MAX` | 10 | available connections |
| Config Var | Env Var | Default | Notes |
|----------------------------|---------------------------------|-----------|---------------------------------------------------------------------|
| `acquireConnectionTimeout` | `DB_ACQUIRE_CONNECTION_TIMEOUT` | 90000 | Timeout length on acquiring a database connection (in milliseconds) |
| `database` | `DB_DATABASE` | coms | COMS database name |
| `host` | `DB_HOST` | localhost | Database conection hostname |
| `username` | `DB_USERNAME` | app | Database account username |
| `password` | `DB_PASSWORD` | | Database account password |
| `port` | `DB_PORT` | 5432 | Database connection port |
| `poolMin` | `DB_POOL_MIN` | 2 | avalable connections |
| `poolMax` | `DB_POOL_MAX` | 10 | available connections |


### Keycloak Variables

Expand Down
1 change: 1 addition & 0 deletions app/config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"username": "BASICAUTH_USERNAME"
},
"db": {
"acquireConnectionTimeout": "DB_ACQUIRE_CONNECTION_TIMEOUT",
"database": "DB_DATABASE",
"host": "DB_HOST",
"password": "DB_PASSWORD",
Expand Down
1 change: 1 addition & 0 deletions app/config/default.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"db": {
"acquireConnectionTimeout": 90000,
"database": "coms",
"host": "localhost",
"port": "5432",
Expand Down
5 changes: 4 additions & 1 deletion app/knexfile.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const config = require('config');
const os = require('os');
const { format } = require('date-fns');

const log = require('./src/components/log')(module.filename);
Expand Down Expand Up @@ -31,13 +32,15 @@ types.setTypeParser(1184, (value) => new Date(value).toISOString());
const logWrapper = (level, msg) => log.log(level, msg);

module.exports = {
acquireConnectionTimeout: config.get('db.acquireConnectionTimeout'),
client: 'pg',
connection: {
host: config.get('db.host'),
user: config.get('db.username'),
password: config.get('db.password'),
database: config.get('db.database'),
port: config.get('db.port')
port: config.get('db.port'),
application_name: os.hostname()
},
debug: ['silly', 'debug'].includes(config.get('server.logLevel')),
log: {
Expand Down
7 changes: 5 additions & 2 deletions app/src/components/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ class QueueManager {
checkQueue() {
if (!this.isBusy && !this.toClose) {
objectQueueService.queueSize().then(size => {
if (size > 0) this.processNextJob();
if (size > 0) {
log.verbose(`There are ${size} jobs in the queue to process`, { function: 'checkQueue' });
this.processNextJob();
}
}).catch((err) => {
log.error(`Error encountered while checking queue: ${err.message}`, { function: 'checkQueue', error: err });
});
Expand Down Expand Up @@ -93,7 +96,7 @@ class QueueManager {
this.isBusy = true;
job = response[0];

log.verbose(`Started processing job id ${job.id}`, { function: 'processNextJob', job: job });
log.info(`Started processing job id ${job.id}`, { function: 'processNextJob', job: job });

const objectId = await syncService.syncJob(job.path, job.bucketId, job.full, job.createdBy);

Expand Down
10 changes: 9 additions & 1 deletion app/src/components/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,15 @@ const utils = {
return data;
} catch (err) {
log.error(err.message, { function: 'getBucket' });
throw new Problem(404, { detail: err.message });
if (err.name === 'NotFoundError') {
throw new Problem(404, { detail: `bucketId ${bucketId} not found` });
}
else if (err.name == 'KnexTimeoutError') {
throw new Problem(504, { detail: 'Database timeout' });
}
else {
throw new Problem(500, { detail: err.message });
}
}
},

Expand Down
38 changes: 28 additions & 10 deletions app/src/controllers/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,14 @@ const controller = {
// parent + child bucket records already in COMS db
const dbChildBuckets = await bucketService.searchChildBuckets(parentBucket, false, userId);
let dbBuckets = [parentBucket].concat(dbChildBuckets);
log.info(`Found ${dbBuckets.length} bucket records in COMS db for parent bucketId ${bucketId}`,
{ function: 'syncBucketRecursive' });

// 'folders' that exist below (and including) the parent 'folder' in S3
const s3Response = await storageService.listAllObjectVersions({ bucketId: bucketId, precisePath: false });
log.info(`Found ${s3Response.Versions.length} object versions and ${s3Response.DeleteMarkers.length}
delete markers in S3 for bucketId ${bucketId}`, { function: 'syncBucketRecursive' });

const s3Keys = [...new Set([
...s3Response.DeleteMarkers.map(object => formatS3KeyForCompare(object.Key)),
...s3Response.Versions.map(object => formatS3KeyForCompare(object.Key)),
Expand All @@ -78,6 +84,7 @@ const controller = {
userId,
trx
);
log.info(`${syncedBuckets.length} buckets records synced`, { function: 'syncBucketRecursive' });

/**
* Queue objects in all the folders for syncing
Expand Down Expand Up @@ -107,6 +114,8 @@ const controller = {
const userId = await userService.getCurrentUserId(getCurrentIdentity(req.currentUser, SYSTEM_USER), SYSTEM_USER);

const s3Objects = await storageService.listAllObjectVersions({ bucketId: bucketId, filterLatest: true });
log.info(`Found ${s3Objects.Versions.length} object versions and ${s3Objects.DeleteMarkers.length}
delete markers in S3 for bucketId ${bucketId}`, { function: 'syncBucketSingle' });

const response = await utils.trxWrapper(async (trx) => {
return this.queueObjectRecords([bucket], s3Objects, userId, trx);
Expand Down Expand Up @@ -139,26 +148,31 @@ const controller = {
oldDbBuckets.map(dbBucket =>
bucketService.delete(dbBucket.bucketId, trx)
.then(() => {
log.info(`Deleted bucketId ${dbBucket.bucketId} from COMS db as key ${dbBucket.key} not found in S3`,
{ function: 'syncBucketRecords' });
dbBuckets = dbBuckets.filter(b => b.bucketId !== dbBucket.bucketId);
})
)
);
// add current user's permissions to all buckets
await Promise.all(
dbBuckets.map(bucket => {
return bucketPermissionService.addPermissions(
bucket.bucketId,
currentUserParentBucketPerms.map(permCode => ({ userId, permCode })),
undefined,
trx
);
})
);
if (userId !== SYSTEM_USER) {
await Promise.all(
dbBuckets.map(bucket => {
return bucketPermissionService.addPermissions(
bucket.bucketId,
currentUserParentBucketPerms.map(permCode => ({ userId, permCode })),
undefined,
trx
);
})
);
}

// Create buckets only found in S3 in COMS db
const newS3Keys = s3Keys.filter(k => !dbBuckets.map(b => b.key).includes(k));
await Promise.all(
newS3Keys.map(s3Key => {
log.info(`Creating new bucket record in COMS db for S3 key ${s3Key}`, { function: 'syncBucketRecords' });
const data = {
bucketName: s3Key.substring(s3Key.lastIndexOf('/') + 1),
accessKeyId: parentBucket.accessKeyId,
Expand Down Expand Up @@ -202,6 +216,9 @@ const controller = {
bucketId: dbBuckets.map(b => b.bucketId)
}, trx);

log.info(`Found ${dbObjects.data.length} object records in COMS db for ${dbBuckets.length} buckets`,
{ function: 'queueObjectRecords' });

/**
* merge arrays of objects from COMS db and S3 to form an array of jobs with format:
* [ { path: '/images/img3.jpg', bucketId: '123' }, { path: '/images/album1/img1.jpg', bucketId: '456' } ]
Expand Down Expand Up @@ -237,6 +254,7 @@ const controller = {

// merge and remove duplicates
const jobs = [...new Map(objects.map(o => [o.path, o])).values()];
log.info(`Prepared ${jobs.length} jobs to enqueue to object queue`, { function: 'queueObjectRecords' });

// create jobs in COMS db object_queue for each object
// update 'lastSyncRequestedDate' value in COMS db for each bucket
Expand Down
30 changes: 27 additions & 3 deletions app/src/services/objectQueue.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
const { NIL: SYSTEM_USER } = require('uuid');

const { ObjectQueue } = require('../db/models');
const log = require('../components/log')(module.filename);


/**
* Max number of parameters in a prepared statement (this is a Postgres hard-coded limit).
* https://www.postgresql.org/docs/current/limits.html ("query parameters")
*/
const POSTGRES_QUERY_MAX_PARAM_LIMIT = 65535;

/**
* The Object Queue DB Service
Expand Down Expand Up @@ -47,6 +55,8 @@ const service = {
let trx;
try {
trx = etrx ? etrx : await ObjectQueue.startTransaction();
log.info(`Enqueuing ${jobs.length} jobs to object queue (full: ${full},
retries: ${retries}, createdBy: ${createdBy})`, { function: 'enqueue' });

const jobsArray = jobs.map(job => ({
bucketId: job.bucketId,
Expand All @@ -59,11 +69,25 @@ const service = {
// Short circuit when nothing to add or there are missing paths
if (!jobsArray.length || !jobsArray.every(job => !!job.path)) return Promise.resolve(0);

// Only insert jobs in if it does not already exist
const response = await ObjectQueue.query(trx).insert(jobsArray).onConflict().ignore();
const PARAMS_PER_JOB = 6; // 5 params in jobsArray, plus `createdAt` added by the `Timestamps` mixin
const MAX_JOBS_PER_BATCH = Math.floor(POSTGRES_QUERY_MAX_PARAM_LIMIT / PARAMS_PER_JOB);

let totalInserted = 0;

// Split query as necessary if number of query params exceed Postgres hard limit
for (let i = 0; i < jobsArray.length; i += MAX_JOBS_PER_BATCH) {
const batch = jobsArray.slice(i, i + MAX_JOBS_PER_BATCH);

// Only insert jobs in if it does not already exist
const response = await ObjectQueue.query(trx).insert(batch).onConflict().ignore();

totalInserted += response.reduce((acc, job) => job?.id ? acc + 1 : acc, 0);
log.verbose(`Inserted ${response.length} jobs to object queue (batch starting at index ${i})`,
{ function: 'enqueue' });
}

if (!etrx) await trx.commit();
return Promise.resolve(response.reduce((acc, job) => job?.id ? acc + 1 : acc, 0));
return Promise.resolve(totalInserted);
} catch (err) {
if (!etrx && trx) await trx.rollback();
throw err;
Expand Down
26 changes: 25 additions & 1 deletion app/src/services/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const utils = require('../components/utils');

const DELIMITER = '/';

// Cache AWS S3Clients in order to reuse S3 connections
const s3ClientCache = new Map();

// Get app configuration
const defaultTempExpiresIn = parseInt(config.get('server.defaultTempExpiresIn'), 10);

Expand All @@ -51,16 +54,37 @@ const objectStorageService = {
log.error('Unable to generate S3Client due to missing arguments', { function: '_getS3Client' });
}

return new S3Client({
// S3Client already exists for the given credentials
const cacheKey = JSON.stringify([accessKeyId, endpoint, region]);
if (s3ClientCache.has(cacheKey)) {
return s3ClientCache.get(cacheKey);
}

// If new, cache the S3Client before returning
const newClient = new S3Client({
credentials: {
accessKeyId: accessKeyId,
secretAccessKey: secretAccessKey
},
endpoint: endpoint,
forcePathStyle: true,
logger: ['silly', 'debug'].includes(config.get('server.logLevel')) ? log : undefined,
retryMode: 'standard',
requestHandler: {
connectionTimeout: 5000,
requestTimeout: 60000,
httpsAgent: {
keepAlive: true,
keepAliveMsecs: 5000,
maxSockets: 15,
maxFreeSockets: 10,
timeout: 30000
}
},
region: region
});
s3ClientCache.set(cacheKey, newClient);
return newClient;
},

/**
Expand Down
13 changes: 11 additions & 2 deletions app/src/services/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,14 @@ const service = {
// 1. Sync Object
const object = await service.syncObject(path, bucketId, userId, trx)
.then(obj => obj.object);
log.info(`Synchronized object at path ${path} in bucket ${bucketId}`,
{ function: 'syncJob', objectId: object?.id });

// 2. Sync Object Versions
let versions = [];
if (object) {
versions = await service.syncVersions(object, userId, trx);
log.info(`Synchronized ${versions.length} versions for object id ${object.id}`, { function: 'syncJob' });
}

// 3. Sync Version Metadata & Tags
Expand Down Expand Up @@ -238,10 +241,10 @@ const service = {
const comsObject = typeof object === 'object' ? object : await objectService.read(object, trx);

// Check for COMS and S3 Version statuses
const [comsVersions, s3VersionsRaw] = await Promise.allSettled([
const [comsVersions, s3VersionsRaw] = await Promise.all([
versionService.list(comsObject.id, trx),
storageService.listAllObjectVersions({ filePath: comsObject.path, bucketId: comsObject.bucketId })
]).then(settled => settled.map(promise => promise.value));
]);

// Combine S3 DeleteMarkers and Versions into one array
const s3Versions = s3VersionsRaw.DeleteMarkers
Expand Down Expand Up @@ -407,8 +410,10 @@ const service = {

// COMS Tags
const comsTags = comsTagsForVersion[0]?.tagset ?? [];
// log.info(`Found ${comsTags.length} tags in COMS for version id ${comsVersion.id}`, { function: 'syncTags' });
// S3 Tags
const s3Tags = toLowerKeys(s3TagsForVersion?.TagSet ?? []);
// log.info(`Found ${s3Tags.length} tags in S3 for version id ${comsVersion.id}`, { function: 'syncTags' });
/**
* Add coms-id tag to latest version in S3 if not already present
* NOTE: For a sync job the _deriveObjectId() function will have already added
Expand Down Expand Up @@ -501,8 +506,12 @@ const service = {

// COMS Metadata
const comsMetadata = comsMetadataForVersion[0]?.metadata ?? [];
// log.info(`Found ${comsMetadata.length} metadata entries in COMS for version id ${comsVersion.id}`,
// { function: 'syncMetadata' });
// S3 Metadata
const s3Metadata = getKeyValue(s3ObjectHead?.Metadata ?? {});
// log.info(`Found ${s3Metadata.length} metadata entries in S3 for version id ${comsVersion.id}`,
// { function: 'syncMetadata' });

// Dissociate Metadata not in S3
const oldMetadata = [];
Expand Down
Loading
Loading