Python - Asynch Multiprocessing from RabbitMQ Consumer -
i have python program acts consumer rabbitmq. once receives job queue, want program split job using multiprocessing, i'm running issues logistics of multiprocessing.
i've simplified code readability.
my rabbitmq consumer functionality:
connection = pika.blockingconnection(pika.connectionparameters('localhost')) channel = connection.channel() channel.queue_declare(queue="jobreader", durable=true) logging.info('waiting messages..') def callback(ch, method, properties, body): job_info = json.loads(body) logging.info('start time: ' + time.strftime("%h:%m:%s")) split_jobs = split_job(job_info) process_manager.runprocesses(split_jobs) ch.basic_ack(delivery_tag=method.delivery_tag)
my multiprocessing functionality:
#!/usr/bin/python import multiprocessing import other_package def worker_process(sub_job): other_package.run_job(sub_job) def runprocesses(jobs): processes = [] sub_job in jobs: p = multiprocessing.process(target=worker_process, args=(sub_job,)) processes.append(p) p.start()
naturally, can't if __name__ == '__main__':
because within function.
i'm not sure if there workaround multiprocessing, or if i'm approaching wrong way. appreciated.
you can refactor multiprocessing
piece initialize state main script:
import process_manager ... def callback(ch, method, properties, body): job_info = json.loads(body) logging.info('start time: ' + time.strftime("%h:%m:%s")) split_jobs = split_job(job_info) manager.runprocesses(split_jobs) ch.basic_ack(delivery_tag=method.delivery_tag) if __name__ == "__main__": manager = process_manager.get_manager() connection = pika.blockingconnection(pika.connectionparameters('localhost')) channel = connection.channel() channel.queue_declare(queue="jobreader", durable=true) logging.info('waiting messages..')
then process_manager
looks this:
import multiprocessing import other_package def worker_process(sub_job): other_package.run_job(sub_job) _manager = none def get_manager(): # note don't have use singleton here global _manager if not _manager: _manager = manager() return _manager class manager(object): def __init__(self): self._pool = multiprocessing.pool() def runprocesses(self, jobs): self._pool.map_async(worker_process, jobs)
note use pool
instead of spawning process
every single job, because won't scale well.
Comments
Post a Comment