Python 3: Multiprocessing API calls with exit condition -


i'm trying write application works through list of database entries, making api call those, return value , if 1 value of apis json response true 5 calls, want have list of 5 calls. database entries couple of thousand entries, want realise multiprocessing. i'm beginner parallelisation , seems can't grasp of how works , how set exit condition. here's got:

from multiprocessing.dummy import pool import requests  def get_api_response(apikey, result, subscription_id):     r = requests.get("https://api.example.com/" + subscription_id)     if r.json()['subscribed'] == true:         result.append(r.json())         return result  def pass_args(args):     foo = get_api_response(*args)     if foo:         return foo  def check_response_amount(result):     if len(result) >= 5:         pool.terminate()   # 1 entry looks that: {"id": 1, "name": "smith", "subscription_id": 123} db_entries = get_db_entries() apikey = 'abcd1234' result = [] request_tuples = [(apikey, result, entry['subscription_id']) entry in db_entries] pool = pool(5) pool_result = pool.map_async(pass_args, request_tuples, callback=check_response_amount) pool_result.wait() pool.close() pool.join() 

the application checks every database entry , returns every api response has subscribed == true without running through callback. tried applying answer question (python multiprocessing exit on condition), couldn't work. can me?

when use map_async, callback won't executed until every work item in iterable has completed. if want callback execute every item in request_tuples, rather after of them done, need use apply_async inside loop instead:

results = [] item in request_tuples:     results.append(pool.apply_async(get_api_response, args=item, callback=check_response_amount))  result in results:     result.wait() 

additionally, calling pool.terminate isn't going work way want; items you've submitted pool going hang forever once call it, make script hang, since you're waiting on them finish before exiting. can work around waiting on pool join, rather waiting on individual task finish.

import time multiprocessing.dummy import pool multiprocessing.pool import terminate  def get_api_response(apikey, result, subscription_id):     url  = ("https://api.example.com/" + str(subscription_id))     time.sleep(2)     result.append(url)     return result  def pass_args(args):     foo = get_api_response(*args)     if foo:         return foo  def check_response_amount(result):     if result , len(result) >= 5:         print("done %s" % result)         pool.terminate()   def get_db_entries():     return [{'subscription_id' : i} in range(100)]  # 1 entry looks that: {"id": 1, "name": "smith", "subscription_id": 123} db_entries = get_db_entries() apikey = 'abcd1234' result = [] request_tuples = [(apikey, result, entry['subscription_id']) entry in db_entries] pool = pool(2) results = [] item in request_tuples:     results.append(pool.apply_async(get_api_response, item, callback=check_response_amount)) pool.close() pool.join() print("done") 

output:

in here in here in here in here in here ... (a bunch more of this)... in here in here done ['https://api.example.com/1', 'https://api.example.com/0', 'https://api.example.com/2', 'https://api.example.com/3', 'https://api.example.com/4', 'https://api.example.com/5'] done 

note result list can end being little bigger want, since terminate call won't stop in-progress tasks.


Comments

Popular posts from this blog

Java 8 + Maven Javadoc plugin: Error fetching URL -

css - SVG using textPath a symbol not rendering in Firefox -

order - Notification for user in user account opencart -