From da55a041c739f05c63a21378db67b0a0c704e9b0 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Thu, 5 Mar 2026 17:58:25 +0100 Subject: [PATCH] feat: removed JobDB JobParameters table --- .../Systems/WorkloadManagement/index.rst | 4 +- .../Agent/JobCleaningAgent.py | 9 +-- .../Agent/test/Test_Agent_JobCleaningAgent.py | 8 ++- .../WorkloadManagementSystem/DB/JobDB.py | 65 ------------------- .../WorkloadManagementSystem/DB/JobDB.sql | 10 --- .../Utilities/JobParameters.py | 24 ++----- 6 files changed, 16 insertions(+), 104 deletions(-) diff --git a/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst b/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst index 7ec3701e700..58778dc911a 100644 --- a/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst +++ b/docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst @@ -100,5 +100,5 @@ It is based on layered architecture and is based on DIRAC architecture: SandboxMetadataDB class is a front-end to the metadata for sandboxes. * JobParametersDB - JobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters. - It is used in most of the WMS components and is based on Elastic/OpenSearch. + JobParametersDB class is a front-end to the OpenSearch based index providing Job Parameters. + It is used in most of the WMS components and is based on OpenSearch. diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index d8287c07dd4..cd4714872a3 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -1,4 +1,4 @@ -""" The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle. +"""The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle. This agent will take care of: - removing all jobs that are in status JobStatus.DELETED @@ -22,6 +22,7 @@ than 0. """ + import datetime import os @@ -40,7 +41,7 @@ from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE -from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB class JobCleaningAgent(AgentModule): @@ -298,8 +299,8 @@ def deleteJobOversizedSandbox(self, jobIDList): failed = {} successful = {} - jobIDs = [int(jobID) for jobID in jobIDList] - result = getJobParameters(jobIDs, "OutputSandboxLFN") + jobIDList = [int(jobID) for jobID in jobIDList] + result = JobParametersDB().getJobParameters(jobIDList, ["OutputSandboxLFN"]) if not result["OK"]: return result osLFNDict = result["Value"] diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py index ae098c9f756..ff7ba1d2660 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py @@ -1,5 +1,5 @@ -""" Test class for Job Cleaning Agent -""" +"""Test class for Job Cleaning Agent""" + from unittest.mock import MagicMock import pytest @@ -128,13 +128,15 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected): mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__") mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getJobAttributes", return_value=S_OK("")) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB", return_value=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone) mocker.patch( "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"]) ) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params) + mockJobParamsDB = mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobParametersDB") + mockJobParamsDB.return_value.getJobParameters.return_value = params jobCleaningAgent = JobCleaningAgent() jobCleaningAgent.log = gLogger diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 829bda4a2c0..ee7af1b0e21 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -110,56 +110,6 @@ def getDistinctJobAttributes(self, attribute, condDict=None, older=None, newer=N "Jobs", attribute, condDict=condDict, older=older, newer=newer, timeStamp=timeStamp ) - ############################################################################# - def getJobParameters(self, jobID, paramList=None): - """Get Job Parameters defined for jobID. - Returns a dictionary with the Job Parameters. - If parameterList is empty - all the parameters are returned. - """ - jobIDList = [jobID] if isinstance(jobID, (str, int)) else jobID - - resultDict = {} - if paramList: - if isinstance(paramList, str): - paramList = paramList.split(",") - paramNameList = [] - for pn in paramList: - ret = self._escapeString(pn) - if not ret["OK"]: - return ret - paramNameList.append(ret["Value"]) - cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format( - ",".join(str(int(j)) for j in jobIDList), - ",".join(paramNameList), - ) - result = self._query(cmd) - if result["OK"]: - if result["Value"]: - for res_jobID, res_name, res_value in result["Value"]: - try: - res_value = res_value.decode(errors="replace") # account for use of BLOBs - except AttributeError: - pass - resultDict.setdefault(int(res_jobID), {})[res_name] = res_value - - return S_OK(resultDict) # there's a slim chance that this is an empty dictionary - else: - return S_ERROR("JobDB.getJobParameters: failed to retrieve parameters") - - else: - result = self.getFields("JobParameters", ["JobID", "Name", "Value"], {"JobID": jobID}) - if not result["OK"]: - return result - - for res_jobID, res_name, res_value in result["Value"]: - try: - res_value = res_value.decode(errors="replace") # account for use of BLOBs - except AttributeError: - pass - resultDict.setdefault(int(res_jobID), {})[res_name] = res_value - - return S_OK(resultDict) # there's a slim chance that this is an empty dictionary - ############################################################################# def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1): """Get Attic Job Parameters defined for a job with jobID. @@ -274,16 +224,6 @@ def getJobAttribute(self, jobID, attribute): return result return S_OK(result["Value"].get(attribute)) - ############################################################################# - @deprecated("Use JobParametersDB instead") - def getJobParameter(self, jobID, parameter): - """Get the given parameter of a job specified by its jobID""" - - result = self.getJobParameters(jobID, [parameter]) - if not result["OK"]: - return result - return S_OK(result.get("Value", {}).get(int(jobID), {}).get(parameter)) - ############################################################################# def getJobOptParameter(self, jobID, parameter): """Get optimizer parameters for the given job.""" @@ -1023,7 +963,6 @@ def removeJobFromDB(self, jobIDs): for table in [ "InputData", - "JobParameters", "AtticJobParameters", "HeartBeatLoggingInfo", "OptimizerParameters", @@ -1101,10 +1040,6 @@ def rescheduleJob(self, jobID): return ret e_jobID = ret["Value"] - res = self._update(f"DELETE FROM JobParameters WHERE JobID={e_jobID}") - if not res["OK"]: - return res - # Delete optimizer parameters if not self._update(f"DELETE FROM OptimizerParameters WHERE JobID={e_jobID}")["OK"]: return S_ERROR("JobDB.removeJobOptParameter: operation failed.") diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql index 1bc7ec83b58..01c638c7415 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql @@ -79,16 +79,6 @@ CREATE TABLE `InputData` ( FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS `JobParameters`; -CREATE TABLE `JobParameters` ( - `JobID` INT(11) UNSIGNED NOT NULL, - `Name` VARCHAR(100) NOT NULL, - `Value` TEXT NOT NULL, - PRIMARY KEY (`JobID`,`Name`), - FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; - -- ------------------------------------------------------------------------------ DROP TABLE IF EXISTS `OptimizerParameters`; CREATE TABLE `OptimizerParameters` ( diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py index 92b95f3a51d..3d472733f14 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py @@ -154,10 +154,8 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di :rtype: dict """ from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB - from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB elasticJobParametersDB = JobParametersDB() - jobDB = JobDB() if vo: # a user is connecting, with a proxy res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName) @@ -165,8 +163,10 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di return res parameters = res["Value"] else: # a service is connecting, no proxy, e.g. StalledJobAgent + from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB + q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})" - res = jobDB._query(q) + res = JobDB()._query(q) if not res["OK"]: return res if not res["Value"]: @@ -184,23 +184,7 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di if not res["OK"]: return res parameters.update(res["Value"]) - - # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends - res = jobDB.getJobParameters(jobIDs, parName) - if not res["OK"]: - return res - parametersM = res["Value"] - - # and now combine - final = dict(parametersM) - # if job in JobDB, update with parameters from ES if any - for jobID in final: - final[jobID].update(parameters.get(jobID, {})) - # if job in ES and not in JobDB, take ES - for jobID in parameters: - if jobID not in final: - final[jobID] = parameters[jobID] - return S_OK(final) + return S_OK(parameters) def getAvailableRAM(siteName=None, gridCE=None, queue=None):