From edd0944bc8a3bcaff7d62a6c029df8b931197611 Mon Sep 17 00:00:00 2001 From: gzukowski Date: Mon, 18 Mar 2024 21:22:43 +0000 Subject: [PATCH] Queue removal --- python/ChimpInsert/main.py | 21 +++--------- python/ChimpInsert/requirements.txt | 1 - python/Integ-Master/auth0_reader.py | 53 ++++++++++++++++++----------- python/Integ-Master/main.py | 4 --- 4 files changed, 37 insertions(+), 42 deletions(-) diff --git a/python/ChimpInsert/main.py b/python/ChimpInsert/main.py index 1a8c16e..7109f0c 100644 --- a/python/ChimpInsert/main.py +++ b/python/ChimpInsert/main.py @@ -44,16 +44,6 @@ def __init__(self, audience_id, config, slack_api_url, logger): self.token_header = {'content-type': 'application/json',} self.logger = logger - self.queue = asyncio.Queue() - self.processing_task = asyncio.create_task(self.process_queue()) - - async def process_queue(self): - while True: - info = await self.queue.get() - await self.insert_info(info) - self.queue.task_done() - await asyncio.sleep(TASK_DELAY) - async def post_slackMSG(self, text): try: response = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{text}"})) @@ -142,7 +132,6 @@ def prepare_member_info(self, email, fname, lname): return member_info async def insert_info(self, info): - try: email, fname, lname = info.split(" ") except ValueError: @@ -198,7 +187,7 @@ async def insert_info(self, info): async def run(context, input): - try: + try: slack_api_url = context.config['slack_hook_url'] audience_id = context.config['audience_id'] config = {"api_key": context.config['mailchimp_api'], "server": context.config['mailchimp_server']} @@ -206,10 +195,8 @@ async def run(context, input): raise Exception(f"ChimpInsert: Config not loaded: {error}") inserter = ChimpInsert(audience_id, config, slack_api_url, context.logger) - - + async for item in input: - await inserter.queue.put(item) - - inserter.processing_task.cancel() + await inserter.insert_info(item) + diff --git a/python/ChimpInsert/requirements.txt b/python/ChimpInsert/requirements.txt index 7e320b5..7140ab4 100644 --- a/python/ChimpInsert/requirements.txt +++ b/python/ChimpInsert/requirements.txt @@ -1,4 +1,3 @@ scramjet-framework-py requests -stripe mailchimp-marketing diff --git a/python/Integ-Master/auth0_reader.py b/python/Integ-Master/auth0_reader.py index 2f6c298..326cf3e 100644 --- a/python/Integ-Master/auth0_reader.py +++ b/python/Integ-Master/auth0_reader.py @@ -46,7 +46,35 @@ async def refresh_token(self): self.logger.error("Auth0: Refreshing the token has failed!") self.logger.error(f"Auth0: {response.text}") return TokenRefreshResult.FAILURE.value, None - + + + async def process_users(self, last_user, users, buffer_users): + for result, has_more in self.lookahead(users): + if not buffer_users.contains(result["email"]): + self.logger.info(f"Auth0: New user registered in auth0: {result['email']}") + self.stream.write( + result["email"] + " " + result["nickname"] + " auth0-user" + ) + buffer_users.append(result["email"]) + if not has_more: + last_user = result["email"] + return last_user + + async def process_verified(self, last_verified, verified, buffer_verified): + for result, has_more in self.lookahead(verified): + if not buffer_verified.contains(result["email"]): + self.logger.info(f"Auth0: User allowed the newsletter: {result['email']}") + self.stream.write( + result["email"] + " " + result["nickname"] + " auth0-newsletter" + ) + self.logger.info(f"Auth0: User allowed the newsletter: {result['email']} wpiiiiisane") + + buffer_verified.append(result["email"]) + if not has_more: + last_verified = result["email"] + return last_verified + + async def get_auth(self): code, token = await self.refresh_token() @@ -82,27 +110,12 @@ async def get_auth(self): verified = verified.json() users = users.json() - if users[-1]["email"] != last_user: - for result, has_more in self.lookahead(users): - if not buffer_users.contains(result["email"]): - self.logger.info(f"Auth0: New user registered in auth0: {result['email']}") - self.stream.write( - result["email"] + " " + result["nickname"] + " auth0-user" - ) - buffer_users.append(result["email"]) - if not has_more: - last_user = result["email"] + last_user = await self.process_users(last_user, users, buffer_users) if verified[-1]["email"] != last_verified: - for result, has_more in self.lookahead(verified): - if not buffer_verified.contains(result["email"]): - self.logger.info(f"Auth0: User allowed the newsletter: {result['email']}") - self.stream.write( - result["email"] + " " + result["nickname"] + " auth0-newsletter" - ) - buffer_verified.append(result["email"]) - if not has_more: - last_verified = result["email"] + last_verified = await self.process_verified(last_verified, verified, buffer_verified) + + await asyncio.sleep(WAIT_TIME_ON_USER) diff --git a/python/Integ-Master/main.py b/python/Integ-Master/main.py index 8af25ce..0bcfa16 100644 --- a/python/Integ-Master/main.py +++ b/python/Integ-Master/main.py @@ -1,7 +1,6 @@ from auth0_reader import Auth0 from stripe_reader import Stripe -import stripe import json import asyncio from scramjet.streams import Stream @@ -13,8 +12,6 @@ 'contentType': 'text/plain' } - - class Master: def __init__(self, auth0_client, stripe_client, stream) -> None: self.auth0_client = auth0_client @@ -45,7 +42,6 @@ async def run(context, input): auth0_client = Auth0(run.verified_url, run.users_url , run.api_url, run.data, stream, context.logger) stripe_client = Stripe (run.stripe_api, stream, context.logger) - asyncio.gather( Master(auth0_client, stripe_client, stream).read(), return_exceptions=True,