Skip to content

Commit 275155f

Browse files
[pipeline] Batched pipeline and storage (#418)
* [Pipeline] Batched pipeline and storage * [Pipeline] Batched pipeline and storage * [storage] add warning `TODO` for optimization --------- Co-authored-by: Sunnyhaze <[email protected]>
1 parent d567051 commit 275155f

File tree

4 files changed

+284
-2
lines changed

4 files changed

+284
-2
lines changed

dataflow/pipeline/Pipeline.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,3 +531,79 @@ def _compiled_forward(self, resume_step: int=0):
531531
self.logger.debug(f"Detected LLM Serving {self.active_llm_serving} ref reduced to 0, cleaning up...")
532532
self.active_llm_serving.cleanup()
533533
self.active_llm_serving = None
534+
535+
536+
class BatchedPipelineABC(PipelineABC):
537+
def __init__(self):
538+
super().__init__()
539+
540+
def _compiled_forward(self, resume_step: int=0, batch_size: int|None=None, resume_from_last: bool=True):
541+
"""
542+
resume_step (int): resume inference from this step
543+
batch_size (int|None): if set, run the pipeline in batch mode with this batch size
544+
resume_from_last (bool): if True, resume from the last successful step and batch
545+
"""
546+
if resume_step > 0 and resume_from_last:
547+
raise ValueError("Cannot set both `resume_step` and `resume_from_last` to True.")
548+
549+
resume_batch = 0
550+
551+
if resume_from_last:
552+
cache_path = os.path.join(self.op_nodes_list[1].storage.cache_path, "last_success_step.txt")
553+
if not os.path.exists(cache_path):
554+
resume_step = 0
555+
resume_batch = 0
556+
self.logger.info(f"No last success step cache found at {cache_path}, starting from step 0.")
557+
else:
558+
with open(cache_path, "r") as f:
559+
line = f.readline().strip()
560+
resume_step, resume_batch = map(int, line.split(","))
561+
self.logger.info(f"Resuming from last success step {resume_step}, batch step {resume_batch}.")
562+
563+
# for loop for each op and its `storage` status
564+
for idx, op_node in enumerate(self.op_nodes_list):
565+
# resume from a expected step
566+
if idx - 1 < resume_step: # minus one since INPUT-DATA Node
567+
continue
568+
569+
self.logger.debug(f"Ready to run {op_node}, with serving={op_node.llm_serving}, active_llm_serving={self.active_llm_serving}")
570+
if op_node.llm_serving != None:
571+
if self.active_llm_serving and self.active_llm_serving is not op_node.llm_serving:
572+
self.logger.debug(f"Detected active LLM Serving {self.active_llm_serving}, new serving {op_node.llm_serving}, cleaning up...")
573+
self.active_llm_serving.cleanup()
574+
self.active_llm_serving = op_node.llm_serving
575+
576+
if op_node.op_obj != None:
577+
if batch_size is not None:
578+
storage = op_node.storage
579+
storage.batch_step = 0 if idx - 1 > resume_step else resume_batch
580+
if storage.batch_size != batch_size:
581+
self.logger.info(f"Overriding storage {storage}'s batch size from {storage.batch_size} to {batch_size} for this run.")
582+
storage.batch_size = batch_size
583+
storage.read() # read to set data count
584+
record_count = storage.record_count
585+
586+
RUN_TIMES = 1 if batch_size is None else ((record_count - 1) // batch_size + 1) - storage.batch_step
587+
if batch_size is not None:
588+
self.logger.info(f"Pipeline will run for {RUN_TIMES} iterations to cover {record_count} records with batch size {batch_size}.")
589+
for _ in range(RUN_TIMES):
590+
op_node.op_obj.run(
591+
storage=op_node.storage,
592+
**op_node.kwargs
593+
)
594+
if batch_size is not None:
595+
op_node.storage.batch_step += 1
596+
if resume_from_last:
597+
resume_batch = op_node.storage.batch_step if batch_size is not None else 0
598+
with open(cache_path, "w") as f:
599+
f.write(f"{idx-1},{resume_batch}\n")
600+
if resume_from_last:
601+
resume_batch = 0 # reset for next op_node
602+
with open(cache_path, "w") as f:
603+
f.write(f"{idx},{resume_batch}\n")
604+
if op_node.llm_serving != None:
605+
self.llm_serving_counter[self.active_llm_serving] -= 1
606+
if self.llm_serving_counter[self.active_llm_serving] == 0:
607+
self.logger.debug(f"Detected LLM Serving {self.active_llm_serving} ref reduced to 0, cleaning up...")
608+
self.active_llm_serving.cleanup()
609+
self.active_llm_serving = None

dataflow/pipeline/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
from .Pipeline import PipelineABC
1+
from .Pipeline import PipelineABC, BatchedPipelineABC
22

33
__all__ = [
44
'PipelineABC',
5+
'BatchedPipelineABC',
56
]

dataflow/utils/storage.py

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import signal
44
import tempfile
55
import weakref
6+
67
from dataflow import get_logger
78
import pandas as pd
89
import json
@@ -811,4 +812,148 @@ def get_keys_from_dataframe(self) -> list[str]:
811812
Returns column names from the dataframe after reading from database.
812813
"""
813814
dataframe = self.read(output_type="dataframe")
814-
return dataframe.columns.tolist() if isinstance(dataframe, pd.DataFrame) else []
815+
return dataframe.columns.tolist() if isinstance(dataframe, pd.DataFrame) else []
816+
817+
818+
class BatchedFileStorage(FileStorage):
819+
"""
820+
批量文件存储,支持按批次读写数据。
821+
"""
822+
def __init__(
823+
self,
824+
first_entry_file_name: str,
825+
cache_path:str="./cache",
826+
file_name_prefix:str="dataflow_cache_step",
827+
cache_type:Literal["jsonl", "csv"] = "jsonl",
828+
batch_size: int = 10000
829+
):
830+
super().__init__(first_entry_file_name, cache_path, file_name_prefix, cache_type)
831+
self.batch_size = batch_size
832+
self.batch_step = 0
833+
if cache_type not in ["jsonl", "csv"]:
834+
raise ValueError(f"BatchedFileStorage only supports 'jsonl' and 'csv' cache types, got: {cache_type}")
835+
836+
def read(self, output_type: Literal["dataframe", "dict"]="dataframe") -> Any:
837+
"""
838+
Read data from current file managed by storage.
839+
840+
Args:
841+
output_type: Type that you want to read to, either "dataframe" or "dict".
842+
Also supports remote datasets with prefix:
843+
- "hf:{dataset_name}{:config}{:split}" => HuggingFace dataset eg. "hf:openai/gsm8k:main:train"
844+
- "ms:{dataset_name}{}:split}" => ModelScope dataset eg. "ms:modelscope/gsm8k:train"
845+
846+
Returns:
847+
Depending on output_type:
848+
- "dataframe": pandas DataFrame
849+
- "dict": List of dictionaries
850+
851+
Raises:
852+
ValueError: For unsupported file types or output types
853+
"""
854+
if self.operator_step == 0 and self.first_entry_file_name == "":
855+
self.logger.info("first_entry_file_name is empty, returning empty dataframe")
856+
empty_dataframe = pd.DataFrame()
857+
return self._convert_output(empty_dataframe, output_type)
858+
859+
file_path = self._get_cache_file_path(self.operator_step)
860+
self.logger.info(f"Reading data from {file_path} with type {output_type}")
861+
862+
if self.operator_step == 0:
863+
source = self.first_entry_file_name
864+
self.logger.info(f"Reading remote dataset from {source} with type {output_type}")
865+
if source.startswith("hf:"):
866+
from datasets import load_dataset
867+
_, dataset_name, *parts = source.split(":")
868+
869+
if len(parts) == 1:
870+
config, split = None, parts[0]
871+
elif len(parts) == 2:
872+
config, split = parts
873+
else:
874+
config, split = None, "train"
875+
876+
dataset = (
877+
load_dataset(dataset_name, config, split=split)
878+
if config
879+
else load_dataset(dataset_name, split=split)
880+
)
881+
dataframe = dataset.to_pandas()
882+
return self._convert_output(dataframe, output_type)
883+
884+
elif source.startswith("ms:"):
885+
from modelscope import MsDataset
886+
_, dataset_name, *split_parts = source.split(":")
887+
split = split_parts[0] if split_parts else "train"
888+
889+
dataset = MsDataset.load(dataset_name, split=split)
890+
dataframe = pd.DataFrame(dataset)
891+
return self._convert_output(dataframe, output_type)
892+
893+
else:
894+
local_cache = file_path.split(".")[-1]
895+
else:
896+
local_cache = self.cache_type
897+
# TODO Code below may be a bottleneck for large files, consider optimizing later
898+
dataframe = self._load_local_file(file_path, local_cache)
899+
self.record_count = len(dataframe)
900+
# 读出当前批次数据
901+
dataframe = dataframe.iloc[
902+
self.batch_step * self.batch_size : (self.batch_step + 1) * self.batch_size
903+
]
904+
return self._convert_output(dataframe, output_type)
905+
906+
def write(self, data: Any) -> Any:
907+
"""
908+
Write data to current file managed by storage.
909+
data: Any, the data to write, it should be a dataframe, List[dict], etc.
910+
"""
911+
def clean_surrogates(obj):
912+
"""递归清理数据中的无效Unicode代理对字符"""
913+
if isinstance(obj, str):
914+
# 替换无效的Unicode代理对字符(如\udc00)
915+
return obj.encode('utf-8', 'replace').decode('utf-8')
916+
elif isinstance(obj, dict):
917+
return {k: clean_surrogates(v) for k, v in obj.items()}
918+
elif isinstance(obj, list):
919+
return [clean_surrogates(item) for item in obj]
920+
elif isinstance(obj, (int, float, bool)) or obj is None:
921+
# 数字、布尔值和None直接返回
922+
return obj
923+
else:
924+
# 其他类型(如自定义对象)尝试转为字符串处理
925+
try:
926+
return clean_surrogates(str(obj))
927+
except:
928+
# 如果转换失败,返回原对象或空字符串(根据需求选择)
929+
return obj
930+
931+
# 转换数据为DataFrame
932+
if isinstance(data, list):
933+
if len(data) > 0 and isinstance(data[0], dict):
934+
# 清洗列表中的每个字典
935+
cleaned_data = [clean_surrogates(item) for item in data]
936+
dataframe = pd.DataFrame(cleaned_data)
937+
else:
938+
raise ValueError(f"Unsupported data type: {type(data[0])}")
939+
elif isinstance(data, pd.DataFrame):
940+
# 对DataFrame的每个元素进行清洗
941+
dataframe = data.map(clean_surrogates)
942+
else:
943+
raise ValueError(f"Unsupported data type: {type(data)}")
944+
945+
file_path = self._get_cache_file_path(self.operator_step + 1)
946+
os.makedirs(os.path.dirname(file_path), exist_ok=True)
947+
self.logger.success(f"Writing data to {file_path} with type {self.cache_type}")
948+
if self.cache_type == "jsonl":
949+
with open(file_path, 'a', encoding='utf-8') as f:
950+
dataframe.to_json(f, orient="records", lines=True, force_ascii=False)
951+
elif self.cache_type == "csv":
952+
if self.batch_step == 0:
953+
dataframe.to_csv(file_path, index=False)
954+
else:
955+
dataframe.to_csv(file_path, index=False, header=False, mode='a')
956+
else:
957+
raise ValueError(f"Unsupported file type: {self.cache_type}, output file should end with jsonl, csv")
958+
959+
return file_path

test/test_batched_pipeline.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import re
2+
from dataflow.pipeline import BatchedPipelineABC
3+
from dataflow.operators.general_text import (
4+
LLMLanguageFilter,
5+
)
6+
from dataflow.operators.text_pt import MetaSampleEvaluator
7+
from dataflow.operators.core_text import PromptedGenerator
8+
from dataflow.serving import APILLMServing_request
9+
from dataflow.utils.storage import BatchedFileStorage
10+
11+
class AutoOPPipeline(BatchedPipelineABC):
12+
13+
def __init__(self):
14+
super().__init__()
15+
self.storage = BatchedFileStorage(
16+
first_entry_file_name="./dataflow/example/GeneralTextPipeline/pt_input.jsonl",
17+
cache_path="./cache_autoop",
18+
file_name_prefix="dataflow_cache_auto_run",
19+
cache_type="jsonl",
20+
batch_size=2
21+
)
22+
self.llm_serving1 = APILLMServing_request(
23+
api_url="http://123.129.219.111:3000/v1/chat/completions",
24+
model_name="gpt-5-mini",
25+
max_workers=100,
26+
)
27+
self.op1 = PromptedGenerator(
28+
llm_serving=self.llm_serving1,
29+
system_prompt="请将以下内容翻译成中文:",
30+
)
31+
self.op2 = PromptedGenerator(
32+
llm_serving=self.llm_serving1,
33+
system_prompt="请将以下内容翻译成韩文:",
34+
)
35+
self.op3 = PromptedGenerator(
36+
llm_serving=self.llm_serving1,
37+
system_prompt="请将以下内容翻译成日语:"
38+
)
39+
40+
def forward(self):
41+
self.op1.run(
42+
self.storage.step(),
43+
input_key='raw_content',
44+
output_key='content_cn1'
45+
)
46+
self.op2.run(
47+
self.storage.step(),
48+
input_key='raw_content',
49+
output_key='content_cn2'
50+
)
51+
self.op3.run(
52+
self.storage.step(),
53+
input_key='raw_content',
54+
output_key='content_cn3'
55+
)
56+
57+
if __name__ == "__main__":
58+
pipeline = AutoOPPipeline()
59+
pipeline.compile()
60+
pipeline.forward(batch_size=2, resume_from_last=True)

0 commit comments

Comments
 (0)