-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhashtag_fetcher_bot.py
63 lines (50 loc) · 1.88 KB
/
hashtag_fetcher_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import logging
from telegram import Update, ForceReply
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
import tweepy
# Twitter API credentials
consumer_key = "..."
consumer_secret = "..."
access_token = "..."
access_token_secret = "..."
# Telegram Bot Token
telegram_token = '...'
# Setup Twitter API
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO
)
logger = logging.getLogger(__name__)
def start(update: Update, context: CallbackContext) -> None:
"""Send a message when the command /start is issued."""
user = update.effective_user
update.message.reply_markdown_v2(
fr'Hi {user.mention_markdown_v2()}\!',
reply_markup=ForceReply(selective=True),
)
def get_tweets(update: Update, context: CallbackContext) -> None:
"""Get tweets by hashtag."""
hashtag = update.message.text
tweets = api.search(q=hashtag, count=5, tweet_mode="extended") # Searching for recent tweets
for tweet in tweets:
update.message.reply_text(f'Tweet from {tweet.user.screen_name}:\n{tweet.full_text}')
def main() -> None:
"""Start the bot."""
# Create the Updater and pass it your bot's token.
updater = Updater(token=telegram_token)
# Get the dispatcher to register handlers
dispatcher = updater.dispatcher
# on different commands - answer in Telegram
dispatcher.add_handler(CommandHandler("start", start))
dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, get_tweets))
# Start the Bot
updater.start_polling()
# Run the bot until you press Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
if __name__ == '__main__':
main()