Skip to content

Commit 884cde7

Browse files
committed
fix: better peek handling for fastx records
1 parent 7ebdf34 commit 884cde7

File tree

4 files changed

+86
-72
lines changed

4 files changed

+86
-72
lines changed

ishlib/fastx_search_runner.mojo

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,20 @@ struct FastxSearchRunner[M: Matcher]:
2121
fn run_search[
2222
W: MovableWriter
2323
](mut self, mut writer: BufferedWriter[W]) raises:
24-
# Simple thing first?
24+
# Peek only the first file to determine fastq or not, assume not-binary
25+
var peek = peek_file[record_type = RecordType.FASTX](
26+
self.settings.files[0]
27+
)
2528
for file in self.settings.files:
2629
var f = file[] # force copy
27-
var peek = peek_file[record_type = RecordType.FASTX](f)
28-
if peek.is_binary:
29-
if self.settings.verbose:
30-
Logger.warn("Skipping binary file:", file[])
31-
continue
3230
Logger.debug("Processing", f)
33-
self.run_search_on_file(f, writer)
31+
if peek.is_fastq:
32+
self.run_search_on_file[is_fastq=True](f, writer)
33+
else:
34+
self.run_search_on_file[is_fastq=False](f, writer)
3435

3536
fn run_search_on_file[
36-
W: MovableWriter
37+
W: MovableWriter, *, is_fastq: Bool = False
3738
](mut self, file: Path, mut writer: BufferedWriter[W]) raises:
3839
var reader = FastxReader[read_comment=False](
3940
BufferedReader(GZFile(String(file), "r"))
@@ -74,7 +75,8 @@ struct FastxSearchRunner[M: Matcher]:
7475
writer.write_bytes(reader.seq.as_span())
7576

7677
# Handle FASTQ Case
77-
if len(reader.qual.as_span()) > 0:
78+
@parameter
79+
if is_fastq:
7880
writer.write("\n+\n")
7981
writer.write_bytes(reader.qual.as_span())
8082
writer.write("\n")

ishlib/matcher/striped_semi_global_matcher.mojo

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,28 @@ struct StripedSemiGlobalMatcher(GpuMatcher):
223223
ends_free.target_start,
224224
ends_free.target_end,
225225
)
226+
elif matrix_kind == MatrixKind.ACTGN0:
227+
gpu_align_coarse[
228+
MatrixKind.ACTGN0, max_query_length, max_target_length
229+
](
230+
query,
231+
ref_buffer,
232+
target_ends,
233+
score_result_buffer,
234+
query_end_result_buffer,
235+
ref_end_result_buffer,
236+
basic_matrix_values,
237+
basic_matrix_len,
238+
query_len,
239+
target_ends_len,
240+
thread_count,
241+
gap_open,
242+
gap_extend,
243+
ends_free.query_start,
244+
ends_free.query_end,
245+
ends_free.target_start,
246+
ends_free.target_end,
247+
)
226248
elif matrix_kind == MatrixKind.BLOSUM62:
227249
gpu_align_coarse[
228250
MatrixKind.BLOSUM62, max_query_length, max_target_length

ishlib/parallel_fastx_search_runner.mojo

Lines changed: 39 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ from ishlib.matcher import (
3030
WhereComputed,
3131
)
3232
from ishlib import ByteSpanWriter, RecordType
33-
from ishlib.peek_file import peek_file
33+
from ishlib.peek_file import peek_file, PeekFindings
3434
from ishlib.matcher.alignment.striped_utils import AlignmentResult
3535
from ishlib.vendor.kseq import BufferedReader, FastxReader
3636
from ishlib.vendor.zlib import GZFile
@@ -64,15 +64,12 @@ struct ParallelFastxSearchRunner[M: Matcher]:
6464
fn run_search[
6565
W: MovableWriter
6666
](mut self, mut writer: BufferedWriter[W]) raises:
67-
# Simple thing first?
67+
# Peek only the first file to determine fastq or not, assume not-binary
68+
var peek = peek_file[record_type = RecordType.FASTX](
69+
self.settings.files[0]
70+
)
6871
for file in self.settings.files:
6972
var f = file[] # force copy
70-
var peek = peek_file[record_type = RecordType.FASTX](f)
71-
Logger.debug("Suggested length:", peek.suggested_max_length)
72-
if peek.is_binary:
73-
if self.settings.verbose:
74-
Logger.warn("Skipping binary file:", file[])
75-
continue
7673
Logger.debug("Processing", f)
7774
if peek.is_fastq:
7875
self.run_search_on_file[is_fastq=True](f, writer)
@@ -204,51 +201,39 @@ struct GpuParallelFastxSearchRunner[
204201
W: MovableWriter
205202
](mut self, mut writer: BufferedWriter[W]) raises:
206203
# Peek the first file to get the suggested size, then use that for all of them.
207-
# Still peek each for binary
208-
209-
# First non-binary file
204+
# Assume non-binary
210205
var files = self.settings.files
211206
var first_peek = peek_file[
212207
record_type = RecordType.FASTX, check_record_size=True
213208
](files[0])
214-
if first_peek.is_binary:
215-
for i in range(1, len(files)):
216-
first_peek = peek_file[
217-
record_type = RecordType.FASTX, check_record_size=True
218-
](files[i])
219209

220210
Logger.debug("Suggested length of:", first_peek.suggested_max_length)
221211

222212
# Create ctxs
223-
@parameter
224-
@always_inline
225-
fn choose_max_target_length(suggested_max_length: Int) raises:
226-
alias MAX_TARGET_LENGTHS = List(128, 256, 512, 1024, 2048, 4096)
213+
alias MAX_TARGET_LENGTHS = List(128, 256, 512, 1024, 2048, 4096)
227214

228-
@parameter
229-
for i in range(0, len(MAX_TARGET_LENGTHS)):
230-
alias max_target_length = MAX_TARGET_LENGTHS[i]
231-
if suggested_max_length <= max_target_length:
232-
var ctxs = self.create_ctxs[
233-
max_query_length, max_target_length
234-
]()
235-
self.search_files[
236-
W,
237-
max_query_length=max_query_length,
238-
max_target_length=max_target_length,
239-
](files, ctxs, writer)
240-
return
241-
else:
242-
Logger.warn(
243-
"Longer line lengths than supported, more work will"
244-
" be sent to CPU, consider running with max-gpus set to 0."
245-
)
246-
var ctxs = self.create_ctxs[max_query_length, 4096]()
215+
@parameter
216+
for i in range(0, len(MAX_TARGET_LENGTHS)):
217+
alias max_target_length = MAX_TARGET_LENGTHS[i]
218+
if first_peek.suggested_max_length <= max_target_length:
219+
var ctxs = self.create_ctxs[
220+
max_query_length, max_target_length
221+
]()
247222
self.search_files[
248-
W, max_query_length=max_query_length, max_target_length=4096
249-
](files, ctxs, writer)
250-
251-
choose_max_target_length(first_peek.suggested_max_length)
223+
W,
224+
max_query_length=max_query_length,
225+
max_target_length=max_target_length,
226+
](files, ctxs, writer, first_peek)
227+
return
228+
else:
229+
Logger.warn(
230+
"Longer line lengths than supported, more work will"
231+
" be sent to CPU, consider running with max-gpus set to 0."
232+
)
233+
var ctxs = self.create_ctxs[max_query_length, 4096]()
234+
self.search_files[
235+
W, max_query_length=max_query_length, max_target_length=4096
236+
](files, ctxs, writer, first_peek)
252237

253238
fn create_ctxs[
254239
max_query_length: UInt = 200, max_target_length: UInt = 1024
@@ -281,27 +266,21 @@ struct GpuParallelFastxSearchRunner[
281266
]
282267
],
283268
mut writer: BufferedWriter[W],
269+
read peek: PeekFindings,
284270
) raises:
285271
for i in range(0, len(paths)):
286272
var f = paths[i] # force copy
287273
Logger.debug("Processing", f)
288-
if i > 0:
289-
# Can skip the first peek since we've already checked it.
290-
var peek = peek_file[
291-
record_type = RecordType.FASTX, check_record_size=False
292-
](f)
293-
if peek.is_binary:
294-
if self.settings.verbose:
295-
Logger.warn("Skipping binary file:", paths[i])
296-
continue
297-
if peek.is_fastq:
298-
self.run_search_on_file[
299-
W, max_query_length, max_target_length, is_fastq=True
300-
](f, ctxs, writer)
301-
else:
302-
self.run_search_on_file[
303-
W, max_query_length, max_target_length, is_fastq=False
304-
](f, ctxs, writer)
274+
# Assume that since the user specified FASTX, there are no binary files
275+
# Assume that all files are the same record type
276+
if peek.is_fastq:
277+
self.run_search_on_file[
278+
W, max_query_length, max_target_length, is_fastq=True
279+
](f, ctxs, writer)
280+
else:
281+
self.run_search_on_file[
282+
W, max_query_length, max_target_length, is_fastq=False
283+
](f, ctxs, writer)
305284

306285
fn run_search_on_file[
307286
W: MovableWriter,

tests/test_searcher.mojo

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from pathlib import Path
22
from testing import assert_equal
33
from tempfile import TemporaryDirectory
4-
from utils import StringSlice
54

65
from ExtraMojo.io.buffered import BufferedWriter
76

@@ -166,7 +165,13 @@ ACTGACTGACGACGACGACTAATAGNNNNACTGANNNATCATCTAGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG
166165
var writer = BufferedWriter(open(output, "w"))
167166
do_search(setting[], writer^)
168167
var found = output.read_text()
169-
assert_equal(found.upper(), expected.upper())
168+
assert_equal(
169+
found.upper(),
170+
expected.upper(),
171+
String("max_gpus {}, num_threads {}, algo {}").format(
172+
setting[].max_gpus, setting[].threads, setting[].match_algo
173+
),
174+
)
170175

171176

172177
def test_fasta_blosum62_search():
@@ -216,7 +221,13 @@ MAFSAEDVLKEYDRRRRMEALLLSLYYPNDRKLLDYKEWSPPRVQVECPKAPVEWNNPPSEKGLIVGHFSGIKYKGEKAQ
216221
for setting in settings:
217222
var writer = BufferedWriter(open(output, "w"))
218223
do_search(setting[], writer^)
219-
assert_equal(output.read_text().upper(), expected.upper())
224+
assert_equal(
225+
output.read_text().upper(),
226+
expected.upper(),
227+
String("max_gpus {}, num_threads {}, algo {}").format(
228+
setting[].max_gpus, setting[].threads, setting[].match_algo
229+
),
230+
)
220231

221232

222233
def main():

0 commit comments

Comments
 (0)