diff --git a/.github/environments/values.dev.yaml b/.github/environments/values.dev.yaml index 2ce7a840..c34e317a 100644 --- a/.github/environments/values.dev.yaml +++ b/.github/environments/values.dev.yaml @@ -5,9 +5,9 @@ features: oidcAuth: true autoscaling: - enabled: false + enabled: true pdb: - enabled: false + enabled: true config: enabled: true diff --git a/.github/environments/values.pr.yaml b/.github/environments/values.pr.yaml index 299fe54c..b5234a3a 100644 --- a/.github/environments/values.pr.yaml +++ b/.github/environments/values.pr.yaml @@ -3,6 +3,11 @@ features: basicAuth: true oidcAuth: true defaultBucket: false + +autoscaling: + enabled: true +pdb: + enabled: true # # # We don't deploy a postgrescluster for PR's diff --git a/.github/environments/values.prod.yaml b/.github/environments/values.prod.yaml index 323b9f69..b427af8f 100644 --- a/.github/environments/values.prod.yaml +++ b/.github/environments/values.prod.yaml @@ -48,9 +48,6 @@ postgres: requests: cpu: 256m memory: 512Mi - limits: - cpu: 512m - memory: 1024Mi sidecars: replicaCertCopy: resources: diff --git a/.github/environments/values.test.yaml b/.github/environments/values.test.yaml index 30a5ac50..68613c29 100644 --- a/.github/environments/values.test.yaml +++ b/.github/environments/values.test.yaml @@ -49,9 +49,6 @@ postgres: requests: cpu: 256m memory: 512Mi - limits: - cpu: 512m - memory: 1024Mi sidecars: replicaCertCopy: resources: diff --git a/app/README.md b/app/README.md index 32ed503b..c920ff1b 100644 --- a/app/README.md +++ b/app/README.md @@ -48,15 +48,17 @@ The following variables enable and enforce the use of Basic Authentication for r The following variables configure the use of a backend database to support user-based access control, tagging and other advanced features -| Config Var | Env Var | Default | Notes | -| ---------- | ------------- | --------- | --------------------------- | -| `database` | `DB_DATABASE` | coms | COMS database name | -| `host` | `DB_HOST` | localhost | Database conection hostname | -| `username` | `DB_USERNAME` | app | Database account username | -| `password` | `DB_PASSWORD` | | Database account password | -| `port` | `DB_PORT` | 5432 | Database connection port | -| `poolMin` | `DB_POOL_MIN` | 2 | avalable connections | -| `poolMax` | `DB_POOL_MAX` | 10 | available connections | +| Config Var | Env Var | Default | Notes | +|----------------------------|---------------------------------|-----------|---------------------------------------------------------------------| +| `acquireConnectionTimeout` | `DB_ACQUIRE_CONNECTION_TIMEOUT` | 90000 | Timeout length on acquiring a database connection (in milliseconds) | +| `database` | `DB_DATABASE` | coms | COMS database name | +| `host` | `DB_HOST` | localhost | Database conection hostname | +| `username` | `DB_USERNAME` | app | Database account username | +| `password` | `DB_PASSWORD` | | Database account password | +| `port` | `DB_PORT` | 5432 | Database connection port | +| `poolMin` | `DB_POOL_MIN` | 2 | avalable connections | +| `poolMax` | `DB_POOL_MAX` | 10 | available connections | + ### Keycloak Variables diff --git a/app/config/custom-environment-variables.json b/app/config/custom-environment-variables.json index 5eed2d7d..caa20b86 100644 --- a/app/config/custom-environment-variables.json +++ b/app/config/custom-environment-variables.json @@ -6,6 +6,7 @@ "username": "BASICAUTH_USERNAME" }, "db": { + "acquireConnectionTimeout": "DB_ACQUIRE_CONNECTION_TIMEOUT", "database": "DB_DATABASE", "host": "DB_HOST", "password": "DB_PASSWORD", diff --git a/app/config/default.json b/app/config/default.json index 0f2ccbc9..2bf962a4 100644 --- a/app/config/default.json +++ b/app/config/default.json @@ -1,5 +1,6 @@ { "db": { + "acquireConnectionTimeout": 90000, "database": "coms", "host": "localhost", "port": "5432", diff --git a/app/knexfile.js b/app/knexfile.js index 09df756b..f1a4aa21 100644 --- a/app/knexfile.js +++ b/app/knexfile.js @@ -1,4 +1,5 @@ const config = require('config'); +const os = require('os'); const { format } = require('date-fns'); const log = require('./src/components/log')(module.filename); @@ -31,13 +32,15 @@ types.setTypeParser(1184, (value) => new Date(value).toISOString()); const logWrapper = (level, msg) => log.log(level, msg); module.exports = { + acquireConnectionTimeout: config.get('db.acquireConnectionTimeout'), client: 'pg', connection: { host: config.get('db.host'), user: config.get('db.username'), password: config.get('db.password'), database: config.get('db.database'), - port: config.get('db.port') + port: config.get('db.port'), + application_name: os.hostname() }, debug: ['silly', 'debug'].includes(config.get('server.logLevel')), log: { diff --git a/app/src/components/queueManager.js b/app/src/components/queueManager.js index 5d1975ab..07aed726 100644 --- a/app/src/components/queueManager.js +++ b/app/src/components/queueManager.js @@ -57,7 +57,10 @@ class QueueManager { checkQueue() { if (!this.isBusy && !this.toClose) { objectQueueService.queueSize().then(size => { - if (size > 0) this.processNextJob(); + if (size > 0) { + log.verbose(`There are ${size} jobs in the queue to process`, { function: 'checkQueue' }); + this.processNextJob(); + } }).catch((err) => { log.error(`Error encountered while checking queue: ${err.message}`, { function: 'checkQueue', error: err }); }); @@ -93,7 +96,7 @@ class QueueManager { this.isBusy = true; job = response[0]; - log.verbose(`Started processing job id ${job.id}`, { function: 'processNextJob', job: job }); + log.info(`Started processing job id ${job.id}`, { function: 'processNextJob', job: job }); const objectId = await syncService.syncJob(job.path, job.bucketId, job.full, job.createdBy); diff --git a/app/src/components/utils.js b/app/src/components/utils.js index 324f2bbc..f74b216e 100644 --- a/app/src/components/utils.js +++ b/app/src/components/utils.js @@ -115,7 +115,15 @@ const utils = { return data; } catch (err) { log.error(err.message, { function: 'getBucket' }); - throw new Problem(404, { detail: err.message }); + if (err.name === 'NotFoundError') { + throw new Problem(404, { detail: `bucketId ${bucketId} not found` }); + } + else if (err.name == 'KnexTimeoutError') { + throw new Problem(504, { detail: 'Database timeout' }); + } + else { + throw new Problem(500, { detail: err.message }); + } } }, diff --git a/app/src/controllers/sync.js b/app/src/controllers/sync.js index bf479780..e715e809 100644 --- a/app/src/controllers/sync.js +++ b/app/src/controllers/sync.js @@ -59,8 +59,14 @@ const controller = { // parent + child bucket records already in COMS db const dbChildBuckets = await bucketService.searchChildBuckets(parentBucket, false, userId); let dbBuckets = [parentBucket].concat(dbChildBuckets); + log.info(`Found ${dbBuckets.length} bucket records in COMS db for parent bucketId ${bucketId}`, + { function: 'syncBucketRecursive' }); + // 'folders' that exist below (and including) the parent 'folder' in S3 const s3Response = await storageService.listAllObjectVersions({ bucketId: bucketId, precisePath: false }); + log.info(`Found ${s3Response.Versions.length} object versions and ${s3Response.DeleteMarkers.length} + delete markers in S3 for bucketId ${bucketId}`, { function: 'syncBucketRecursive' }); + const s3Keys = [...new Set([ ...s3Response.DeleteMarkers.map(object => formatS3KeyForCompare(object.Key)), ...s3Response.Versions.map(object => formatS3KeyForCompare(object.Key)), @@ -78,6 +84,7 @@ const controller = { userId, trx ); + log.info(`${syncedBuckets.length} buckets records synced`, { function: 'syncBucketRecursive' }); /** * Queue objects in all the folders for syncing @@ -107,6 +114,8 @@ const controller = { const userId = await userService.getCurrentUserId(getCurrentIdentity(req.currentUser, SYSTEM_USER), SYSTEM_USER); const s3Objects = await storageService.listAllObjectVersions({ bucketId: bucketId, filterLatest: true }); + log.info(`Found ${s3Objects.Versions.length} object versions and ${s3Objects.DeleteMarkers.length} + delete markers in S3 for bucketId ${bucketId}`, { function: 'syncBucketSingle' }); const response = await utils.trxWrapper(async (trx) => { return this.queueObjectRecords([bucket], s3Objects, userId, trx); @@ -139,26 +148,31 @@ const controller = { oldDbBuckets.map(dbBucket => bucketService.delete(dbBucket.bucketId, trx) .then(() => { + log.info(`Deleted bucketId ${dbBucket.bucketId} from COMS db as key ${dbBucket.key} not found in S3`, + { function: 'syncBucketRecords' }); dbBuckets = dbBuckets.filter(b => b.bucketId !== dbBucket.bucketId); }) ) ); // add current user's permissions to all buckets - await Promise.all( - dbBuckets.map(bucket => { - return bucketPermissionService.addPermissions( - bucket.bucketId, - currentUserParentBucketPerms.map(permCode => ({ userId, permCode })), - undefined, - trx - ); - }) - ); + if (userId !== SYSTEM_USER) { + await Promise.all( + dbBuckets.map(bucket => { + return bucketPermissionService.addPermissions( + bucket.bucketId, + currentUserParentBucketPerms.map(permCode => ({ userId, permCode })), + undefined, + trx + ); + }) + ); + } // Create buckets only found in S3 in COMS db const newS3Keys = s3Keys.filter(k => !dbBuckets.map(b => b.key).includes(k)); await Promise.all( newS3Keys.map(s3Key => { + log.info(`Creating new bucket record in COMS db for S3 key ${s3Key}`, { function: 'syncBucketRecords' }); const data = { bucketName: s3Key.substring(s3Key.lastIndexOf('/') + 1), accessKeyId: parentBucket.accessKeyId, @@ -202,6 +216,9 @@ const controller = { bucketId: dbBuckets.map(b => b.bucketId) }, trx); + log.info(`Found ${dbObjects.data.length} object records in COMS db for ${dbBuckets.length} buckets`, + { function: 'queueObjectRecords' }); + /** * merge arrays of objects from COMS db and S3 to form an array of jobs with format: * [ { path: '/images/img3.jpg', bucketId: '123' }, { path: '/images/album1/img1.jpg', bucketId: '456' } ] @@ -237,6 +254,7 @@ const controller = { // merge and remove duplicates const jobs = [...new Map(objects.map(o => [o.path, o])).values()]; + log.info(`Prepared ${jobs.length} jobs to enqueue to object queue`, { function: 'queueObjectRecords' }); // create jobs in COMS db object_queue for each object // update 'lastSyncRequestedDate' value in COMS db for each bucket diff --git a/app/src/services/objectQueue.js b/app/src/services/objectQueue.js index 8458ac3e..a14fcbe1 100644 --- a/app/src/services/objectQueue.js +++ b/app/src/services/objectQueue.js @@ -1,6 +1,14 @@ const { NIL: SYSTEM_USER } = require('uuid'); const { ObjectQueue } = require('../db/models'); +const log = require('../components/log')(module.filename); + + +/** + * Max number of parameters in a prepared statement (this is a Postgres hard-coded limit). + * https://www.postgresql.org/docs/current/limits.html ("query parameters") +*/ +const POSTGRES_QUERY_MAX_PARAM_LIMIT = 65535; /** * The Object Queue DB Service @@ -47,6 +55,8 @@ const service = { let trx; try { trx = etrx ? etrx : await ObjectQueue.startTransaction(); + log.info(`Enqueuing ${jobs.length} jobs to object queue (full: ${full}, + retries: ${retries}, createdBy: ${createdBy})`, { function: 'enqueue' }); const jobsArray = jobs.map(job => ({ bucketId: job.bucketId, @@ -59,11 +69,25 @@ const service = { // Short circuit when nothing to add or there are missing paths if (!jobsArray.length || !jobsArray.every(job => !!job.path)) return Promise.resolve(0); - // Only insert jobs in if it does not already exist - const response = await ObjectQueue.query(trx).insert(jobsArray).onConflict().ignore(); + const PARAMS_PER_JOB = 6; // 5 params in jobsArray, plus `createdAt` added by the `Timestamps` mixin + const MAX_JOBS_PER_BATCH = Math.floor(POSTGRES_QUERY_MAX_PARAM_LIMIT / PARAMS_PER_JOB); + + let totalInserted = 0; + + // Split query as necessary if number of query params exceed Postgres hard limit + for (let i = 0; i < jobsArray.length; i += MAX_JOBS_PER_BATCH) { + const batch = jobsArray.slice(i, i + MAX_JOBS_PER_BATCH); + + // Only insert jobs in if it does not already exist + const response = await ObjectQueue.query(trx).insert(batch).onConflict().ignore(); + + totalInserted += response.reduce((acc, job) => job?.id ? acc + 1 : acc, 0); + log.verbose(`Inserted ${response.length} jobs to object queue (batch starting at index ${i})`, + { function: 'enqueue' }); + } if (!etrx) await trx.commit(); - return Promise.resolve(response.reduce((acc, job) => job?.id ? acc + 1 : acc, 0)); + return Promise.resolve(totalInserted); } catch (err) { if (!etrx && trx) await trx.rollback(); throw err; diff --git a/app/src/services/storage.js b/app/src/services/storage.js index f00b8b6e..48a4b2e1 100644 --- a/app/src/services/storage.js +++ b/app/src/services/storage.js @@ -27,6 +27,9 @@ const utils = require('../components/utils'); const DELIMITER = '/'; +// Cache AWS S3Clients in order to reuse S3 connections +const s3ClientCache = new Map(); + // Get app configuration const defaultTempExpiresIn = parseInt(config.get('server.defaultTempExpiresIn'), 10); @@ -51,7 +54,14 @@ const objectStorageService = { log.error('Unable to generate S3Client due to missing arguments', { function: '_getS3Client' }); } - return new S3Client({ + // S3Client already exists for the given credentials + const cacheKey = JSON.stringify([accessKeyId, endpoint, region]); + if (s3ClientCache.has(cacheKey)) { + return s3ClientCache.get(cacheKey); + } + + // If new, cache the S3Client before returning + const newClient = new S3Client({ credentials: { accessKeyId: accessKeyId, secretAccessKey: secretAccessKey @@ -59,8 +69,22 @@ const objectStorageService = { endpoint: endpoint, forcePathStyle: true, logger: ['silly', 'debug'].includes(config.get('server.logLevel')) ? log : undefined, + retryMode: 'standard', + requestHandler: { + connectionTimeout: 5000, + requestTimeout: 60000, + httpsAgent: { + keepAlive: true, + keepAliveMsecs: 5000, + maxSockets: 15, + maxFreeSockets: 10, + timeout: 30000 + } + }, region: region }); + s3ClientCache.set(cacheKey, newClient); + return newClient; }, /** diff --git a/app/src/services/sync.js b/app/src/services/sync.js index 9ce37325..8a768a3c 100644 --- a/app/src/services/sync.js +++ b/app/src/services/sync.js @@ -98,11 +98,14 @@ const service = { // 1. Sync Object const object = await service.syncObject(path, bucketId, userId, trx) .then(obj => obj.object); + log.info(`Synchronized object at path ${path} in bucket ${bucketId}`, + { function: 'syncJob', objectId: object?.id }); // 2. Sync Object Versions let versions = []; if (object) { versions = await service.syncVersions(object, userId, trx); + log.info(`Synchronized ${versions.length} versions for object id ${object.id}`, { function: 'syncJob' }); } // 3. Sync Version Metadata & Tags @@ -238,10 +241,10 @@ const service = { const comsObject = typeof object === 'object' ? object : await objectService.read(object, trx); // Check for COMS and S3 Version statuses - const [comsVersions, s3VersionsRaw] = await Promise.allSettled([ + const [comsVersions, s3VersionsRaw] = await Promise.all([ versionService.list(comsObject.id, trx), storageService.listAllObjectVersions({ filePath: comsObject.path, bucketId: comsObject.bucketId }) - ]).then(settled => settled.map(promise => promise.value)); + ]); // Combine S3 DeleteMarkers and Versions into one array const s3Versions = s3VersionsRaw.DeleteMarkers @@ -407,8 +410,10 @@ const service = { // COMS Tags const comsTags = comsTagsForVersion[0]?.tagset ?? []; + // log.info(`Found ${comsTags.length} tags in COMS for version id ${comsVersion.id}`, { function: 'syncTags' }); // S3 Tags const s3Tags = toLowerKeys(s3TagsForVersion?.TagSet ?? []); + // log.info(`Found ${s3Tags.length} tags in S3 for version id ${comsVersion.id}`, { function: 'syncTags' }); /** * Add coms-id tag to latest version in S3 if not already present * NOTE: For a sync job the _deriveObjectId() function will have already added @@ -501,8 +506,12 @@ const service = { // COMS Metadata const comsMetadata = comsMetadataForVersion[0]?.metadata ?? []; + // log.info(`Found ${comsMetadata.length} metadata entries in COMS for version id ${comsVersion.id}`, + // { function: 'syncMetadata' }); // S3 Metadata const s3Metadata = getKeyValue(s3ObjectHead?.Metadata ?? {}); + // log.info(`Found ${s3Metadata.length} metadata entries in S3 for version id ${comsVersion.id}`, + // { function: 'syncMetadata' }); // Dissociate Metadata not in S3 const oldMetadata = []; diff --git a/charts/coms/values.yaml b/charts/coms/values.yaml index 6e698280..c7191847 100644 --- a/charts/coms/values.yaml +++ b/charts/coms/values.yaml @@ -45,7 +45,7 @@ autoscaling: enabled: false minReplicas: 3 - maxReplicas: 8 + maxReplicas: 6 targetCPUUtilizationPercentage: 80 # pod disruption budget.