Skip to content

Commit

Permalink
Queue removal
Browse files Browse the repository at this point in the history
  • Loading branch information
gzukowski committed Mar 18, 2024
1 parent de422e2 commit edd0944
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 42 deletions.
21 changes: 4 additions & 17 deletions python/ChimpInsert/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,6 @@ def __init__(self, audience_id, config, slack_api_url, logger):
self.token_header = {'content-type': 'application/json',}
self.logger = logger

self.queue = asyncio.Queue()
self.processing_task = asyncio.create_task(self.process_queue())

async def process_queue(self):
while True:
info = await self.queue.get()
await self.insert_info(info)
self.queue.task_done()
await asyncio.sleep(TASK_DELAY)

async def post_slackMSG(self, text):
try:
response = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{text}"}))
Expand Down Expand Up @@ -142,7 +132,6 @@ def prepare_member_info(self, email, fname, lname):
return member_info

async def insert_info(self, info):

try:
email, fname, lname = info.split(" ")
except ValueError:
Expand Down Expand Up @@ -198,18 +187,16 @@ async def insert_info(self, info):


async def run(context, input):
try:
try:
slack_api_url = context.config['slack_hook_url']
audience_id = context.config['audience_id']
config = {"api_key": context.config['mailchimp_api'], "server": context.config['mailchimp_server']}
except Exception as error:
raise Exception(f"ChimpInsert: Config not loaded: {error}")

inserter = ChimpInsert(audience_id, config, slack_api_url, context.logger)



async for item in input:
await inserter.queue.put(item)

inserter.processing_task.cancel()
await inserter.insert_info(item)


1 change: 0 additions & 1 deletion python/ChimpInsert/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
scramjet-framework-py
requests
stripe
mailchimp-marketing
53 changes: 33 additions & 20 deletions python/Integ-Master/auth0_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,35 @@ async def refresh_token(self):
self.logger.error("Auth0: Refreshing the token has failed!")
self.logger.error(f"Auth0: {response.text}")
return TokenRefreshResult.FAILURE.value, None



async def process_users(self, last_user, users, buffer_users):
for result, has_more in self.lookahead(users):
if not buffer_users.contains(result["email"]):
self.logger.info(f"Auth0: New user registered in auth0: {result['email']}")
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-user"
)
buffer_users.append(result["email"])
if not has_more:
last_user = result["email"]
return last_user

async def process_verified(self, last_verified, verified, buffer_verified):
for result, has_more in self.lookahead(verified):
if not buffer_verified.contains(result["email"]):
self.logger.info(f"Auth0: User allowed the newsletter: {result['email']}")
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-newsletter"
)
self.logger.info(f"Auth0: User allowed the newsletter: {result['email']} wpiiiiisane")

buffer_verified.append(result["email"])
if not has_more:
last_verified = result["email"]
return last_verified



async def get_auth(self):
code, token = await self.refresh_token()
Expand Down Expand Up @@ -82,27 +110,12 @@ async def get_auth(self):
verified = verified.json()
users = users.json()


if users[-1]["email"] != last_user:
for result, has_more in self.lookahead(users):
if not buffer_users.contains(result["email"]):
self.logger.info(f"Auth0: New user registered in auth0: {result['email']}")
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-user"
)
buffer_users.append(result["email"])
if not has_more:
last_user = result["email"]
last_user = await self.process_users(last_user, users, buffer_users)

if verified[-1]["email"] != last_verified:
for result, has_more in self.lookahead(verified):
if not buffer_verified.contains(result["email"]):
self.logger.info(f"Auth0: User allowed the newsletter: {result['email']}")
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-newsletter"
)
buffer_verified.append(result["email"])
if not has_more:
last_verified = result["email"]
last_verified = await self.process_verified(last_verified, verified, buffer_verified)


await asyncio.sleep(WAIT_TIME_ON_USER)

4 changes: 0 additions & 4 deletions python/Integ-Master/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

from auth0_reader import Auth0
from stripe_reader import Stripe
import stripe
import json
import asyncio
from scramjet.streams import Stream
Expand All @@ -13,8 +12,6 @@
'contentType': 'text/plain'
}



class Master:
def __init__(self, auth0_client, stripe_client, stream) -> None:
self.auth0_client = auth0_client
Expand Down Expand Up @@ -45,7 +42,6 @@ async def run(context, input):
auth0_client = Auth0(run.verified_url, run.users_url , run.api_url, run.data, stream, context.logger)
stripe_client = Stripe (run.stripe_api, stream, context.logger)


asyncio.gather(
Master(auth0_client, stripe_client, stream).read(),
return_exceptions=True,
Expand Down

0 comments on commit edd0944

Please sign in to comment.