Python - Asynch Multiprocessing from RabbitMQ Consumer -


i have python program acts consumer rabbitmq. once receives job queue, want program split job using multiprocessing, i'm running issues logistics of multiprocessing.

i've simplified code readability.

my rabbitmq consumer functionality:

connection = pika.blockingconnection(pika.connectionparameters('localhost')) channel = connection.channel()  channel.queue_declare(queue="jobreader", durable=true) logging.info('waiting messages..')   def callback(ch, method, properties, body):     job_info = json.loads(body)      logging.info('start time: ' + time.strftime("%h:%m:%s"))      split_jobs = split_job(job_info)      process_manager.runprocesses(split_jobs)      ch.basic_ack(delivery_tag=method.delivery_tag) 

my multiprocessing functionality:

#!/usr/bin/python  import multiprocessing import other_package   def worker_process(sub_job):     other_package.run_job(sub_job)   def runprocesses(jobs):     processes = []     sub_job in jobs:         p = multiprocessing.process(target=worker_process, args=(sub_job,))         processes.append(p)          p.start() 

naturally, can't if __name__ == '__main__': because within function.

i'm not sure if there workaround multiprocessing, or if i'm approaching wrong way. appreciated.

you can refactor multiprocessing piece initialize state main script:

import process_manager ...  def callback(ch, method, properties, body):     job_info = json.loads(body)     logging.info('start time: ' + time.strftime("%h:%m:%s"))     split_jobs = split_job(job_info)     manager.runprocesses(split_jobs)     ch.basic_ack(delivery_tag=method.delivery_tag)   if __name__ == "__main__":     manager = process_manager.get_manager()     connection = pika.blockingconnection(pika.connectionparameters('localhost'))     channel = connection.channel()      channel.queue_declare(queue="jobreader", durable=true)     logging.info('waiting messages..') 

then process_manager looks this:

import multiprocessing import other_package  def worker_process(sub_job):     other_package.run_job(sub_job)  _manager = none  def get_manager(): # note don't have use singleton here     global _manager     if not _manager:         _manager = manager()     return _manager   class manager(object):     def __init__(self):         self._pool = multiprocessing.pool()      def runprocesses(self, jobs):         self._pool.map_async(worker_process, jobs) 

note use pool instead of spawning process every single job, because won't scale well.


Comments

Popular posts from this blog

css - SVG using textPath a symbol not rendering in Firefox -

Java 8 + Maven Javadoc plugin: Error fetching URL -

order - Notification for user in user account opencart -