Skip to content
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
fd7fce9
create classfiles
e-buerger May 4, 2022
dad4372
adapting code for passing precommit
e-buerger May 4, 2022
f59a516
adapt code for passing precommit
e-buerger May 4, 2022
a19747c
adapt code for passing precommit
e-buerger May 4, 2022
7588dde
implement a few functions
e-buerger May 5, 2022
56af3aa
change type of db columns
e-buerger May 5, 2022
566dbe1
satisfy pre-commit
e-buerger May 5, 2022
37ae4a4
implement exec method of PythonJob
e-buerger May 9, 2022
5529827
suppres bandit warnigns and add comments
e-buerger May 9, 2022
5f17ab3
remove __init__.py from dao directory
e-buerger May 9, 2022
cd6f426
remove context manager
e-buerger May 9, 2022
792ce11
fix session error in job_dao.py
e-buerger May 9, 2022
4eeafcf
change db commands
e-buerger May 9, 2022
af7d7a9
change db commands
e-buerger May 9, 2022
a1d9e61
change db commands
e-buerger May 9, 2022
1993d54
fix import errors by using absolute path and an __init__ file
e-buerger May 10, 2022
ae53782
remove content from exec method
e-buerger May 10, 2022
211f348
add license headers
e-buerger May 10, 2022
46f313e
add license headers
e-buerger May 10, 2022
c5253c3
implement PyExecSession
e-buerger May 13, 2022
97c902a
implement unit tests for job_dao
e-buerger May 16, 2022
6491949
satisfy pipeline
e-buerger May 16, 2022
c344999
satisfy pipeline
e-buerger May 16, 2022
d93859c
add license header
e-buerger May 16, 2022
c862e18
update README
e-buerger May 17, 2022
37b711c
update doc strings; change job_dao functions for better unit tests
e-buerger May 17, 2022
bf7255d
change engine type
e-buerger May 17, 2022
f671e43
draft of OOP structure for python-centric jobs
e-buerger May 4, 2022
c286f37
refactor
KerstenBreuer May 19, 2022
5ab918d
resolve merge conflicts
e-buerger May 20, 2022
9370930
restructure modules
e-buerger May 23, 2022
e5e559b
remove unneccassery class
e-buerger May 23, 2022
86b72dd
satisfy pipeline
e-buerger May 23, 2022
9856610
update doc strings
e-buerger May 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 28 additions & 136 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,143 +1,35 @@



# Microservice Repository Template

This repo is a template for creating a new microservice.

The directories, files, and their structure herein are recommendations
from the GHGA Dev Team.

## Naming Conventions
The github repository contains only lowercase letters, numbers, and hyphens "-",
e.g.: `my-microservice`

The python package (and thus the source repository) contains underscores "_"
instead of hyphens, e.g.: `exec_manager`

The command-line script that is used to run the service, the docker repository
(published to docker hub), and the helm chart (not part of this repository) use the
same pattern as the repository name, e.g.: `my-microservice`
## Adapt to your service
This is just a template and needs some adaption to your specific use case.

Please search for **"please adapt"** comments. They will indicate all locations
that need modification. Once the adaptions are in place, please remove these #
comments.

The following should serve as a template for the final repo's README,
please adapt it accordingly (e.g. replace all occurences of `my-microservice` or `exec_manager` with the final package name and don't forget to adapt the links):

---

**\# please adapt the links of following badges:**
![tests](https://github.com/ghga-de/my-microservice/actions/workflows/unit_and_int_tests.yaml/badge.svg)
[![Coverage Status](https://coveralls.io/repos/github/ghga-de/my-microservice/badge.svg?branch=main)](https://coveralls.io/github/ghga-de/my-microservice?branch=main)
# My-Microservice

A description explaining the use case of this service.

## Documentation:

An extensive documentation can be found [here](...) (coming soon).

## Quick Start
### Installation
We recommend using the provided Docker container.

A pre-build version is available at [docker hub](https://hub.docker.com/repository/docker/ghga/my-microservice):
```bash
# Please feel free to choose the version as needed:
docker pull ghga/my-microservice:<version>
```

Or you can build the container yourself from the [`./Dockerfile`](./Dockerfile):
```bash
# Execute in the repo's root dir:
# (Please feel free to adapt the name/tag.)
docker build -t ghga/my-microservice:<version> .
```

For production-ready deployment, we recommend using Kubernetes, however,
for simple use cases, you could execute the service using docker
on a single server:
```bash
# The entrypoint is preconfigured:
docker run -p 8080:8080 ghga/my-microservice:<version>
```

If you prefer not to use containers, you may install the service from source:
```bash
# Execute in the repo's root dir:
pip install .

# to run the service:
my-microservice
```

### Configuration:
The [`./example-config.yaml`](./example-config.yaml) gives an overview of the available configuration options.
Please adapt it and choose one of the following options for injecting it into the service:
- specify the path to via the `exec_manager_CONFIG_YAML` env variable
- rename it to `.exec_manager.yaml` and place it into one of the following locations:
- the current working directory were you are execute the service (on unix: `./.exec_manager.yaml`)
- your home directory (on unix: `~/.exec_manager.yaml`)

The config yaml will be automatically parsed by the service.

**Important: If you are using containers, the locations refer to paths within the container.**

All parameters mentioned in the [`./example-config.yaml`](./example-config.yaml)
could also be set using environment variables or file secrets.

For naming the environment variables, just prefix the parameter name with `exec_manager_`,
e.g. for the `host` set an environment variable named `exec_manager_HOST`
(you may use both upper or lower cases, however, it is standard to define all env
variables in upper cases).

To using file secrets please refer to the
[corresponding section](https://pydantic-docs.helpmanual.io/usage/settings/#secret-support)
of the pydantic documentation.


## Development
For setting up the development environment, we rely on the
[devcontainer feature](https://code.visualstudio.com/docs/remote/containers) of vscode
in combination with Docker Compose.

To use it, you have to have Docker Compose as well as vscode with its "Remote - Containers" extension (`ms-vscode-remote.remote-containers`) installed.
Then open this repository in vscode and run the command
`Remote-Containers: Reopen in Container` from the vscode "Command Palette".

This will give you a full-fledged, pre-configured development environment including:
- infrastructural dependencies of the service (databases, etc.)
- all relevant vscode extensions pre-installed
- pre-configured linting and auto-formating
- a pre-configured debugger
- automatic license-header insertion

Moreover, inside the devcontainer, there are two convenience commands available
(please type them in the integrated terminal of vscode):
- `dev_install` - install the service with all development dependencies,
installs pre-commit, and applies any migration scripts to the test database
(please run that if you are starting the devcontainer for the first time
or if you added any python dependencies to the [`./setup.cfg`](./setup.cfg))
- `dev_launcher` - starts the service with the development config yaml
(located in the `./.devcontainer/` dir)

If you prefer not to use vscode, you could get a similar setup (without the editor specific features)
by running the following commands:
``` bash
# Execute in the repo's root dir:
cd ./.devcontainer

# build and run the environment with docker-compose
docker-compose up

# attach to the main container:
# (you can open multiple shell sessions like this)
docker exec -it devcontainer_app_1 /bin/bash
# Execution Manager for WorkflUX

The execution manager manages the execution of jobs which will be runned with workflUX. There will be three types to execute a workflow: by Python, Bash or WES.

## Execution Profiles
Yet, there is only the python exec profile but in future there will be the bash exec profile and the WES exec profile as well. The execution contains four steps: prepare, exec, eval, finalize. But only the exec step is required and the others are optional.
- __prepare:__
This step will be executed before the actual workflow execution. For example there can be load required python or conda environments.
- __exec:__
This step will execute the actual workflow and is the only required step. At the end of this step, the status of the job should be updated depending on the exit code of the job execution.
- __eval:__
This step can evaluate the success of the workflow execution. But the exit code in the exec step should be used to set the new status (FAILED or SUCCEDED) of the job.

- __finalize:__
This step will be executed at the end of the whole job execution. It can be used for cleaning up temporary files.


### Python
For the python exec profile you have to implement the exec method from the PythonJob class. Therefore you create a new python file which contains a class that inherit the PythonJob class. Then you implement at least the exec method.
After that you have to create yaml file which looks like the file below:
```yaml
EXEC_PROFILES:
NAMEOFEXECPROFILE:
type: python
max_retries: 2 # please adat this number
py_module: ./python_script_with_implemented_methods.py
py_class: ClassOfImplementedMethods
```
```max_retries``` gives an numeric value for the maximum retries when the execution (consisting of the four steps) fails.

## License
This repository is free to use and modify according to the [Apache 2.0 License](./LICENSE).
2 changes: 1 addition & 1 deletion exec_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Short description of package""" # Please adapt to package
"""backend""" # Please adapt to package
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""backend""" # Please adapt to package
"""
A package managing execution of jobs in a way that is agnostic to
- the workflow execution environment
- the language used to describe the workflow
"""


__version__ = "0.1.0"
14 changes: 14 additions & 0 deletions exec_manager/dao/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2021 - 2022 Universität Tübingen, DKFZ and EMBL
# for the German Human Genome-Phenome Archive (GHGA)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright muss noch angepasst werden

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
26 changes: 11 additions & 15 deletions exec_manager/dao/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,22 @@

"""Defines all database specific ORM models"""

from sqlalchemy import JSON, Boolean, Column, Integer, String

from sqlalchemy import JSON, Column, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm.decl_api import DeclarativeMeta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Du kannst den UUID Typ von SQLalchemy benutzen:

Suggested change
from sqlalchemy.orm.decl_api import DeclarativeMeta
import uuid
from sqlalchemy.orm.decl_api import DeclarativeMeta
from sqlalchemy.dialects.postgresql import UUID


Base: DeclarativeMeta = declarative_base()
metadata = Base.metadata


class ExampleObjectA(Base):
"""An example object stored in the DB"""

__tablename__ = "visas"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
some_json_details = Column(JSON, nullable=False)

class DBJob(Base):
"""An job object stored in the DB"""

class ExampleObjectB(Base):
"""Another example object stored in the DB"""
__tablename__ = "job"

__tablename__ = "table_b"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
active = Column(Boolean, nullable=False)
job_id = Column(String, primary_key=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Und hier dann:

Suggested change
job_id = Column(String, primary_key=True)
job_id = Column(UUID(as_uuid=True), default=uuid.uuid4, primary_key=True)

So wird die UUID automatisch vergeben.

job_status = Column(String, nullable=False)
exec_profile = Column(JSON, nullable=False)
workflow = Column(JSON, nullable=False)
inputs = Column(JSON, nullable=False)
153 changes: 153 additions & 0 deletions exec_manager/dao/job_dao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright 2021 - 2022 Universität Tübingen, DKFZ and EMBL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A DAO is ideally an abstract class or an interface such as:

from abc import ABC, abstractmethod

class JobDAO(ABC):
    
    @abstractmethod
    def create(exec_profile, workflow, inputs, ...) -> str:
        ...

    @abstractmethod
    def get(job_id) -> Job:
         ...
```

Und eine spezielle implementation, z.B. für SQL:
```
class SqlJobDAO(ABC):
    
    def __init__(db_url: str):
          self._engine = create_engine(db_url)
          ...

    @abstractmethod
    def create(exec_profile, workflow, inputs, ...) -> str:
        # an actual sql query here

    @abstractmethod
    def get(job_id) -> Job:
         # another sql query here
```

# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""class for job dao"""

import json
from uuid import UUID, uuid4

from sqlalchemy import create_engine, insert, select, update
from sqlalchemy.engine import Engine

from exec_manager.dao.db_models import DBJob, metadata
from exec_manager.exec_profile import ExecProfile
from exec_manager.exec_profile_type import ExecProfileType
from exec_manager.job import Job
from exec_manager.job_status_type import JobStatusType
from exec_manager.wf_lang_type import WfLangType

DB_ENGINE = create_engine("sqlite+pysqlite://")
metadata.create_all(DB_ENGINE)


def create_job_dao(
job_status: JobStatusType,
exec_profile: ExecProfile,
workflow: dict,
inputs: dict,
db_engine: Engine = DB_ENGINE,
) -> UUID:
"""
Inserts a job into the database.

Parameters
----------
job_status: JobStatusType
current status of the job; initially it is JobStatusType.NOTSTARTED
exec_profile: ExecProfile
exec profile of this job
workflow
the jobs workflow
inputs: dict
the input parameters of the job
engine: engine
db engine where the connection will be established (default is sqlite with pysqlite)

Returns
-------
UUID
"""
job_id = generate_job_id()
job_id_str = str(job_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Siehe die Vorschläge im db_model. Dort habe ich einen default generator für die id eingefügt. Dann musst du dich hier nicht mehr drum kümmern.

job_status_str = job_status.value
exec_profile_json = json.dumps(
{
"exec_profile_type": exec_profile.exec_profile_type.value,
"wf_lang": exec_profile.wf_lang.value,
}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wenn das ExecProfile auf einem pydantic model beruht kann man einfach schreiben:

Suggested change
exec_profile_json = json.dumps(
{
"exec_profile_type": exec_profile.exec_profile_type.value,
"wf_lang": exec_profile.wf_lang.value,
}
)
exec_profile_dict = exec_profile.to_dict()

Das hier returned ein dict. Also kein JSON. Aber die Conversion zu JSON kann man auch sqlalchemy machen lassen.

inputs_json = json.dumps(inputs)
with db_engine.connect() as connection:
connection.execute(
insert(DBJob.__table__).values(
(job_id_str, job_status_str, exec_profile_json, workflow, inputs_json)
)
)
return job_id


def get_job(job_id: UUID, db_engine: Engine = DB_ENGINE) -> Job:
"""
Returns a job by his job id.

Parameters
----------
job_id: UUID
id of the job
engine: engine
db engine where the connection will be established (default is sqlite with pysqlite)

Returns
-------
Job
"""
with db_engine.connect() as connection:
cursor = connection.execute(
select([DBJob.job_id, DBJob.job_status, DBJob.exec_profile]).where(
DBJob.job_id == str(job_id)
)
)
result = cursor.fetchall()
job_status = JobStatusType(result[0][1])
exec_profile = json.loads(result[0][2])
exec_profile = ExecProfile(
ExecProfileType(exec_profile["exec_profile_type"]),
WfLangType(exec_profile["wf_lang"]),
)
return Job(job_id, job_status, exec_profile)


def update_job_status(
job_id: UUID, new_job_status: JobStatusType, db_engine: Engine = DB_ENGINE
) -> None:
"""
Updates a jobs status by his job id.

Parameters
----------
job_id: UUID
id of the job
new_job_status: JobStatusType
new status of the job; cannot be JobStatusType.NOTSTARTED
engine: engine
db engine where the connection will be established (default is sqlite with pysqlite)

Returns
-------
None
"""
with db_engine.connect() as connection:
connection.execute(
update(DBJob.__table__)
.where(DBJob.job_id == str(job_id))
.values(job_status=new_job_status.value)
)


def generate_job_id() -> UUID:
"""
Generates a unique job id.

Parameters
----------

Returns
-------
UUID
"""
job_id = uuid4()
# while get_job(job_id) is not None:
# job_id = uuid4()
return job_id
Loading