Skip to content

Commit

Permalink
Fix bug w/ argument order in run.py phase_snps (#148)
Browse files Browse the repository at this point in the history
* Fix bug w/ argument order in run.py phase_snps

* using Worker for count_alleles to further debug issue 150 (#151)

Co-authored-by: Vineet Bansal <[email protected]>
  • Loading branch information
mmyers1 and vineetbansal authored Aug 1, 2022
1 parent 2f60bb6 commit 9832b01
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repos:
- id: flake8

- repo: https://github.com/grantjenks/blue.git
rev: v0.9.0
rev: v0.9.1
hooks:
- id: blue
args: [--line-length=120]
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "hatchet"
version = "1.0.1"
version = "1.0.2"
authors = [
{ name="Simone Zaccaria", email="[email protected]" },
{ name="Ben Raphael", email="[email protected]" },
Expand Down
2 changes: 1 addition & 1 deletion src/hatchet/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '1.0.1'
__version__ = '1.0.2'

import os.path
from importlib.resources import path
Expand Down
122 changes: 23 additions & 99 deletions src/hatchet/utils/count_alleles.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import os.path
import shlex
import subprocess as pr
from multiprocessing import Process, Queue, JoinableQueue, Lock, Value
from scipy.stats import beta
import tempfile

import hatchet.utils.ProgressBar as pb
import hatchet.utils.ArgParsing as ap
from hatchet.utils.Supporting import log, logArgs, error, close
from hatchet.utils.multiprocessing import Worker


def main(args=None):
Expand Down Expand Up @@ -172,84 +171,33 @@ def counting(
verbose,
outdir,
):
# Define a Lock and a shared value for log printing through ProgressBar
err_lock = Lock()
counter = Value('i', 0)
progress_bar = pb.ProgressBar(
total=len(samples) * len(chromosomes),
length=40,
lock=err_lock,
counter=counter,
verbose=verbose,
)

# Establish communication queues
tasks = JoinableQueue()
results = Queue()

# Enqueue jobs
jobs_count = 0
work = []
for bam in samples:
for chro in chromosomes:
tasks.put((bam[0], bam[1], chro))
jobs_count += 1

# Setting up the workers
workers = [
AlleleCounter(
tasks,
results,
progress_bar,
bcftools,
reference,
q,
Q,
mincov,
dp,
E,
snplist,
verbose,
outdir,
)
for i in range(min(num_workers, jobs_count))
]

# Add a poison pill for each worker
for i in range(len(workers)):
tasks.put(None)

# Start the workers
for w in workers:
w.start()
work.append((bam[0], bam[1], chro))

# Wait for all of the tasks to finish
tasks.join()

# Get the results
sorted_results = {}
for i in range(jobs_count):
res = results.get()
if len(res) > 0:
sorted_results[res[0][0], res[0][1]] = res

# Close Queues
tasks.close()
results.close()

# Ensure each worker terminates
for w in workers:
w.terminate()
w.join()
worker = AlleleCounter(
bcftools,
reference,
q,
Q,
mincov,
dp,
E,
snplist,
verbose,
outdir,
)

return sorted_results
results = worker.run(work=work, n_instances=num_workers)
results = {(v[0][0], v[0][1]): v for v in results}
return results


class AlleleCounter(Process):
class AlleleCounter(Worker):
def __init__(
self,
task_queue,
result_queue,
progress_bar,
bcftools,
reference,
q,
Expand All @@ -261,10 +209,6 @@ def __init__(
verbose,
outdir,
):
Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.progress_bar = progress_bar
self.bcftools = bcftools
self.reference = reference
self.q = q
Expand All @@ -276,30 +220,10 @@ def __init__(
self.verbose = verbose
self.outdir = outdir

def run(self):
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
self.task_queue.task_done()
break

self.progress_bar.progress(
advance=False,
msg='{} starts on {} for {}'.format(self.name, next_task[1], next_task[2]),
)
snps = self.countAlleles(
bamfile=next_task[0],
samplename=next_task[1],
chromosome=next_task[2],
)
self.progress_bar.progress(
advance=True,
msg='{} ends on {} for {}'.format(self.name, next_task[1], next_task[2]),
)
self.task_queue.task_done()
self.result_queue.put(snps)
return
def work(self, bamfile, samplename, chromosome):
# return 42
snps = self.countAlleles(bamfile=bamfile, samplename=samplename, chromosome=chromosome)
return snps

def countAlleles(self, bamfile, samplename, chromosome):
cmd_mpileup = '{} mpileup {} -Ou -f {} --skip-indels -a INFO/AD -q {} -Q {} -d {} -T {}'.format(
Expand Down
2 changes: 2 additions & 0 deletions src/hatchet/utils/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def run(self, work, n_instances=None, show_progress=True):
n_work = len(work)
if n_instances is None:
n_instances = min(cpu_count(), n_work)
else:
n_instances = min(n_instances, n_work)

if show_progress:
progress_bar = pb.ProgressBar(
Expand Down
2 changes: 1 addition & 1 deletion src/hatchet/utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ def main(args=None):
f'{output}/phase/',
'-L',
]
+ (['-N'] if config.genotype_snps.chr_notation else [])
+ glob.glob(f'{output}/snps/*.vcf.gz')
+ (['-N'] if config.genotype_snps.chr_notation else [])
+ extra_args
)

Expand Down

0 comments on commit 9832b01

Please sign in to comment.