-
Notifications
You must be signed in to change notification settings - Fork 0
Frequently requested design patterns
This page is a collection of sorts, dedicated to showcase design patterns we get asked about often in our support group.
- Requirements
- How to handle updates in several handlers
- How do I enforce users joining a specific channel before using my bot?
- How do I send a message to all users of the bot?
- How do I deal with a media group?
Knowing how to make bots with PTB is enough. That means you should be familiar with Python and with PTB. If you haven't worked on anything with PTB, then please check Introduction to the API.
At some point developing ones bots, most of us face the following question
How do I handle an update before other handlers?
The following sections will give you an idea how to tackle this problem, based on frequent scenarios where this problem arises.
PTB comes with a powerful handler known as TypeHandler. You can understand it as a generic handler. You can use it to handle any class put through the Updater. For example, Type Handlers are used in bots to handle "updates" from Github or other external services.
To add any handler, we use Dispatcher.add_handler. Apart from the handler itself, it takes an optional argument called group
. We can understand groups as numbers which indicate the priority of handlers. A lower group means a higher priority. An update can be processed by (at most) one handler in each group.
Stopping handlers in higher groups from processing an update is achieved using ApplicationHandlerStop. When raising this exception, the Dispatcher is asked to stop sending the updates to handlers in higher groups. Depending on your use case, you may not need to raise it. But it is useful if you want to enable flood handling or limit who can use the bot.
That's it. With these three knowledge nuggets, we can solve the question given in the introduction.
Before working on the problems, we will provide you with a template of code that you can use. All you need to do to follow this guide is change the internals of your callback
and group
as required by the problem you face.
from telegram import Update
from telegram.ext import CallbackContext, ApplicationHandlerStop, TypeHandler, Updater
async def callback(update: Update, context: CallbackContext):
"""Handle the update"""
await do_something_with_this_update(update, context)
raise ApplicationHandlerStop # Only if you DON'T want other handlers to handle this update
updater = Updater(TOKEN)
dispatcher = updater.dispatcher
handler = TypeHandler(Update, callback) # Making a handler for the type Update
dispatcher.add_handler(handler, -1) # Default is 0, so we are giving it a number below 0
# Add other handlers and start your bot.
The code above should be self-explanatory, provided you read the previous section along with the respective documentation. We made a handler for telegram.Update
and added it to a lower group.
In case you don't want to stop other handlers from processing the update, then you should modify your callback
to not raise the exception.
This is a generic use case often used for analytics purpose. For example, if you need to add every user who uses your bot to a database, you can use this method. Simply put, this sort of approach is used to keep track of every update.
async def callback(update: Update, context: CallbackContext):
add_message_to_my_analytics(update.effective_message)
add_user_to_my_database(update.effective_user)
Note the difference in this example compared to the previous ones. Here we don't raise ApplicationHandlerStop
. This type of handlers is known as shallow handler or silent handler. These type of handlers handle the update and also allow it to be handled by other common handlers like CommandHandler
or MessageHandler
. In other words, they don't block the other handlers.
Now let us solve the specific use cases. All you need to do is modify your callback
as required. 😉
To restrict your bot to a set of users or if you don't want it to be available for a specific group of people, you can use a callback
similar to the following. Remember, the process is same if you want to enable/disable the bot for groups or channels.
SPECIAL_USERS = [127376448, 172380183, 1827979793] # Allows users
async def callback(update: Update, context: CallbackContext):
if update.effective_user.user_id in SPECIAL_USERS:
pass
else:
await update.effective_message.reply_text("Hey! You are not allowed to use me!")
raise ApplicationHandlerStop
Here, it should be noted that this approach blocks your bot entirely for a set of users. If all you need is to block a specific functionality, like a special command or privilege, then it will be wise to use Filters.chat, Filters.user.
Don't forget that you can also use decorators or a simple if-else
check.
If you want a more streamlined style of managing permissions (like superuser, admin, users) then ptbcontrib/roles is worth checking out.
The exact definition of rate limit depends on your point of view. You typically should keep record of previous usage of the user and warn them when they cross a limit. Here, for demonstration, we use a method that restricts the usage of the bot for 5 minutes.
from time import time
MAX_USAGE = 5
async def callback(update: Update, context: CallbackContext):
count = context.user_data.get("usageCount", 0)
restrict_since = context.user_data.get("restrictSince", 0)
if restrict_since:
if (time() - restrict_since) >= 60 * 5: # 5 minutes
del context.user_data["restrictSince"]
del context.user_data["usageCount"]
await update.effective_message.reply_text("I have unrestricted you. Please behave well.")
else:
update.effective_message.reply_text("Back off! Wait for your restriction to expire...")
raise ApplicationHandlerStop
else:
if count == MAX_USAGE:
context.user_data["restrictSince"] = time()
update.effective_message.reply_text("Stop flooding! Don't bother me for 5 minutes...")
raise ApplicationHandlerStop
else:
context.user_data["usageCount"] = count + 1
The approach we used is dead lazy. We keep a count of updates from the user and when it reaches maximum limit, we note the time. We proceed to stop handling the updates of that user for 5 minutes. Your effective flood limit strategy and punishment may vary. But the logic remains same.
We have seen how TypeHandler
can be used to give a fluent experience without messing up our code-base. Now you would be able to solve complex use cases from the given examples. But please note that TypeHandler
is not the only option.
If you feel like this approach is too much of trouble, you can use Python's inbuilt decorators.
After sending an (invite) link to the channel to the user, you can use Bot.get_chat_member
to check if the user is an that channel.
Note that:
- the bot needs to be admin in that channel
- the user must have started the bot for this approach to work. If you try to run
get_chat_member
for a user that has not started the bot, the bot can not find the user in a chat, even if it is a member of it.
Otherwise depending on whether the user in the channel, has joined and left again, has been banned, ... (there are multiple situations possible), the method may
- raise an exception and in this case the error message will probably be helpful
- return a
ChatMember
instance. In that case make sure to check theChatMember.status
attribute
Since API 5.1 (PTB v13.4+) you can alternatively use the ChatMember
updates to keep track of users in channels. See chatmemberbot.py
for an example.
If the user has not yet joined the channel, you can ignore incoming updates from that user or reply to them with a corresponding warning. A convenient way to do that is by using TypeHandler. Read this section to learn how to do it.
Let's first point out an easy alternative solution: Instead of sending the messages directly through your bot, you can instead set up a channel to publish the announcements. You can link your users to the channel in a welcome message.
If that doesn't work for you, here we go:
To send a message to all users, you of course need the IDs of all the users. You'll have to keep track of those yourself. The most reliable way for that are the my_chat_member
updates. See chatmemberbot.py
for an example on how to use them.
If you didn't keep track of your users from the beginning, you may have a chance to get the IDs anyway, if you're using persistence. Please have a look at this issue in that case.
Even if you have all the IDs, you can't know if a user has blocked your bot in the meantime. Therefore, you should make sure to wrap your send request in a try-except
clause checking for telegram.error.Unauthorized
errors.
Finally, note that Telegram imposes some limits that restrict you to send ~30 Messages per second. If you have a huge user base and try to notify them all at once, you will get flooding errors. To prevent that, try spreading the messages over a long time range. A simple way to achieve that is to leverage the JobQueue
.
The basic problem behind this question is simple. For the end user, it looks like one message, consisting of several medias, are send to the receiver. For the bot API/bot developer, this is not the case however: Every media is send as one unique message, only linked via the unique Message.media_group_id attribute. So you need some way of determining when to start and to end collecting the media messages.
This basic problem has two basic approaches for handling it, without requiring a more eloberate setup involving databases.
This approach has the upside of looking seamless to the user. The downside is that there is a (low) possibilty that one part of the media group is missed.
The idea behind this approach is to start a timer (and an array with the message object/id in it) when receiving the first media_group message. Until the timer runs out, every incoming message with the same media id will be added to the array. Once the timer runs out, the media group is considered done and can be dealt with according to your situation.
There is a possibilty that a part of the media group is received after the timer ran out. This could be because the network was too slow to deliver the updates to you (more likely with a webhook setup since long polling can receive multiple updates at once) or your server took to long to deal with the update (or a combination of both). The result of this happening need to be determined by you.
- Real life code example TBD
This approach has two upsides: You don't force users to use media groups (so they can e.g. send more media then fits in one group) and you will not miss a media. The downside is that it requires manual interaction from users, and we all know how special users can be :)
The idea behind this approach is to start a upload segement in your code. Either in a ConversationHandler
or with a CommandHandler
. Ask the user then to send all medias. Once they send the first (or the first media group, this doesn't matter in your code), you store the information you need in an array. Then you ask them to send either more or send a command like /finish
/ a specific text message you intercept with a MessageHandler + regex Filter. The point behind this is to have the user finish the addition of media on their own terms. Once they triggered the second handler, you can consider the array finished.
- Wiki of
python-telegram-bot
© Copyright 2015-2025 – Licensed by Creative Commons
- Architecture Overview
- Builder Pattern for
Application
- Types of Handlers
- Working with Files and Media
- Exceptions, Warnings and Logging
- Concurrency in PTB
- Advanced Filters
- Storing data
- Making your bot persistent
- Adding Defaults
- Job Queue
- Arbitrary
callback_data
- Avoiding flood limits
- Webhooks
- Bot API Forward Compatiblity
- Frequently requested design patterns
- Code snippets
- Performance Optimizations
- Telegram Passport
- Bots built with PTB
- Automated Bot Tests