Added a unit test for multithreaded alignments

This commit is contained in:
2025-02-06 18:01:50 +00:00
parent 85946eb110
commit fe999f1cab
3 changed files with 29 additions and 4 deletions

View File

@@ -7,7 +7,6 @@ from queue import Queue
from autobigs.engine.structures.alignment import AlignmentStats, PairwiseAlignment
class AsyncBiopythonPairwiseAlignmentEngine(AbstractContextManager):
def __enter__(self):
self._thread_pool = ThreadPoolExecutor(self._max_threads, thread_name_prefix="async-pairwise-alignment")
@@ -52,8 +51,8 @@ class AsyncBiopythonPairwiseAlignmentEngine(AbstractContextManager):
async def next_completed(self) -> Union[tuple[PairwiseAlignment, dict[str, Any]], None]:
if self._work_complete.empty() and len(self._work_left):
return None
future_now = await asyncio.wrap_future(self._work_complete.get())
return future_now
completed_alignment = await asyncio.wrap_future(self._work_complete.get())
return completed_alignment
def __exit__(self, exc_type, exc_value, traceback):
self.shutdown()