Skip to content

Commit 645e6be

Browse files
committed
Merge branch 'main' of github.com:wfcommons/WfCommons
2 parents 288d070 + 5eb906e commit 645e6be

File tree

1 file changed

+20
-3
lines changed

1 file changed

+20
-3
lines changed

wfcommons/wfbench/translator/airflow.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ def __init__(self,
3636
"""Create an object of the translator."""
3737
super().__init__(workflow, logger)
3838

39+
self.sanitized_names = {}
40+
self.seq_num = 0
3941
self.script = f"""
4042
from __future__ import annotations
4143
@@ -73,7 +75,7 @@ def translate(self, output_folder: pathlib.Path, name: Optional[str] = None) ->
7375

7476
for task in self.tasks.values():
7577
self.script += f"""
76-
{task.task_id} = BashOperator(
78+
{self._sanitize_varname(task.task_id)} = BashOperator(
7779
task_id="{task.task_id}",
7880
depends_on_past=False,
7981
bash_command='{self.task_commands[task.task_id]}',
@@ -82,10 +84,11 @@ def translate(self, output_folder: pathlib.Path, name: Optional[str] = None) ->
8284
)
8385
"""
8486
for task in self.tasks.values():
85-
parents = ", ".join(self.task_parents[task.task_id])
87+
# Comma-separated list of the task's parents
88+
parents = ", ".join(map(self._sanitize_varname, self.task_parents[task.task_id]))
8689
if parents:
8790
self.script += f"""
88-
[{parents}] >> {task.task_id}
91+
[{parents}] >> {self._sanitize_varname(task.task_id)}
8992
"""
9093
# write benchmark files
9194
output_folder.mkdir(parents=True)
@@ -99,6 +102,20 @@ def translate(self, output_folder: pathlib.Path, name: Optional[str] = None) ->
99102
# Create the README file
100103
self._write_readme_file(output_folder)
101104

105+
def _sanitize_varname(self, name: str) -> str:
106+
"""
107+
Sanitizes string into a valid variable name.
108+
109+
:param name: The name to sanitize.
110+
:type name: str
111+
"""
112+
if name not in self.sanitized_names:
113+
sanitized_name = '_' + re.sub(r'[^\w]', '_', name) + str(self.seq_num)
114+
self.seq_num += 1
115+
self.sanitized_names[name] = sanitized_name
116+
117+
return self.sanitized_names[name]
118+
102119
def _prep_commands(self, output_folder: pathlib.Path) -> None:
103120
"""
104121
Prepares the bash_command strings for the BashOperators.

0 commit comments

Comments
 (0)