Python 3: Multiprocessing API calls with exit condition -
i'm trying write application works through list of database entries, making api call those, return value , if 1 value of apis json response true
5 calls, want have list of 5 calls. database entries couple of thousand entries, want realise multiprocessing
. i'm beginner parallelisation , seems can't grasp of how works , how set exit condition. here's got:
from multiprocessing.dummy import pool import requests def get_api_response(apikey, result, subscription_id): r = requests.get("https://api.example.com/" + subscription_id) if r.json()['subscribed'] == true: result.append(r.json()) return result def pass_args(args): foo = get_api_response(*args) if foo: return foo def check_response_amount(result): if len(result) >= 5: pool.terminate() # 1 entry looks that: {"id": 1, "name": "smith", "subscription_id": 123} db_entries = get_db_entries() apikey = 'abcd1234' result = [] request_tuples = [(apikey, result, entry['subscription_id']) entry in db_entries] pool = pool(5) pool_result = pool.map_async(pass_args, request_tuples, callback=check_response_amount) pool_result.wait() pool.close() pool.join()
the application checks every database entry , returns every api response has subscribed == true
without running through callback. tried applying answer question (python multiprocessing exit on condition), couldn't work. can me?
when use map_async
, callback won't executed until every work item in iterable has completed. if want callback execute every item in request_tuples
, rather after of them done, need use apply_async
inside loop instead:
results = [] item in request_tuples: results.append(pool.apply_async(get_api_response, args=item, callback=check_response_amount)) result in results: result.wait()
additionally, calling pool.terminate
isn't going work way want; items you've submitted pool going hang forever once call it, make script hang, since you're waiting on them finish before exiting. can work around waiting on pool join, rather waiting on individual task finish.
import time multiprocessing.dummy import pool multiprocessing.pool import terminate def get_api_response(apikey, result, subscription_id): url = ("https://api.example.com/" + str(subscription_id)) time.sleep(2) result.append(url) return result def pass_args(args): foo = get_api_response(*args) if foo: return foo def check_response_amount(result): if result , len(result) >= 5: print("done %s" % result) pool.terminate() def get_db_entries(): return [{'subscription_id' : i} in range(100)] # 1 entry looks that: {"id": 1, "name": "smith", "subscription_id": 123} db_entries = get_db_entries() apikey = 'abcd1234' result = [] request_tuples = [(apikey, result, entry['subscription_id']) entry in db_entries] pool = pool(2) results = [] item in request_tuples: results.append(pool.apply_async(get_api_response, item, callback=check_response_amount)) pool.close() pool.join() print("done")
output:
in here in here in here in here in here ... (a bunch more of this)... in here in here done ['https://api.example.com/1', 'https://api.example.com/0', 'https://api.example.com/2', 'https://api.example.com/3', 'https://api.example.com/4', 'https://api.example.com/5'] done
note result
list can end being little bigger want, since terminate
call won't stop in-progress tasks.
Comments
Post a Comment