-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaltBot_main.py
1142 lines (942 loc) · 53.4 KB
/
altBot_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import argparse
import logging
import os
import re
import time
from datetime import datetime, timedelta
from logging.handlers import TimedRotatingFileHandler
from typing import List, Optional, Set, Union, Tuple
import tweepy
from bot_messages import AUTO_DM_NO_ALT_TEXT, AUTO_REPLY_NO_DM_NO_ALT_TEXT, \
SINGLE_USER_NO_IMAGES_FOUND_REPORT, SINGLE_USER_REPORT, AUTO_REPLY_NO_IMAGES_FOUND, SINGLE_USER_WITH_ALT_TEXT_QUERY,\
HEADER_REPORT, FOOTER_REPORT, SINGLE_USER_NO_ALT_TEXT_QUERY, SINGLE_USER_REPORT_FIRST_PLACE, \
SINGLE_USER_REPORT_SECOND_PLACE, SINGLE_USER_REPORT_THIRD_PLACE, HEADER_REPORT_PERIODIC_FRIENDS, \
HEADER_REPORT_PERIODIC_FOLLOWERS, FOOTER_REPORT_PERIODIC, ALL_ALT_TEXT_USER_PROVIDED, HEADER_ALT_TEXT_USER_PROVIDED, \
SUMMARY_REPORT, UNAVAILABLE_TWEET
from data_access_layer.data_access import DBAccess
try:
from settings_prod import CONSUMER_KEY, CONSUMER_SECRET, KEY, SECRET
except Exception as e:
print('settings_prod not found; running just with settings')
from settings import CONSUMER_KEY, CONSUMER_SECRET, KEY, SECRET
from settings import ACCEPT_DM_TWEET_ID, LOG_LEVEL, LOG_FILENAME, LAST_N_TWEETS, DB_FILE, ALT_BOT_NAME, \
MAX_RECONNECTION_ATTEMPTS, MAX_MENTIONS_TO_PROCESS, MAINTEINER_NAME, MAINTAEINER_ID, LAST_N_MENTIONS,\
MAX_DAYS_TO_REFRESH_TWEETS, LAST_N_TWEETS_MAX, MAX_CHARS_IN_TWEET
class AltBot:
def __init__(self, live: bool = True):
"""
Init the AltBot object which contains all code needed to execute it
:param live: if True, the tweets/favs and DMs are sent. Useful for development
"""
# Authenticate to Twitter
self.auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
self.auth.set_access_token(KEY, SECRET)
self.live = live
self.processed_tweets = set() # type: Set[str]
self.db = DBAccess(DB_FILE)
self.api = None # type: tweepy.API
self.alt_bot_user = None # type: tweepy.models.User
self.connect_api()
self.load_alt_bot_user()
# region: Tweeter API interaction
def connect_api(self) -> None:
"""
Stablish a connection with the Tweeter API. In case some other opperation get the connection closed,
reopen it again
:return: None; self.api is instantiated when succeeds, otherwise raises an arror
"""
i = 0
while i < MAX_RECONNECTION_ATTEMPTS:
try:
self.api = tweepy.API(self.auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
break
except Exception as e:
logging.warning(f'[{i}/{MAX_RECONNECTION_ATTEMPTS}] Can not connect: {e}')
i += 1
if i >= MAX_RECONNECTION_ATTEMPTS:
msg = f'[{i}/{MAX_RECONNECTION_ATTEMPTS}] Can not connect.'
logging.error(msg)
raise Exception(msg)
logging.info('Connected to Tweeter API')
def get_retweeters(self, tweet_id: int, kindly_sleep: float = 15) -> Set[int]:
"""
get the list of user_ids who have retweeted the tweet with id=tweet_it
:param tweet_id: id of thetweet to get its retweeters
:param kindly_sleep: time to sleep to prevent overloading the API, 15 requests in 15 minutes
:return: set of user ids who retweeted the tweet
"""
result = set() # type: Set[int]
logging.info(f'Reading users who RTed this tweet: {tweet_id}')
for page in tweepy.Cursor(self.api.retweeters, id=tweet_id, count=500).pages():
begin = time.time()
for p in page:
result.add(p)
# go to sleep some time to avoid being banned
time.sleep(max(kindly_sleep - (time.time() - begin), 0))
logging.info(f'{len(result)} RTed this tweet: {tweet_id}')
return result
def get_tweet(self, tweet_id: str):
"""
Read particular tweet from the API
:param tweet_id: id of the twet to be read from the API
:return: tweet, as tweepy object
"""
status = self.api.get_status(tweet_id, include_ext_alt_text=True,
include_entities=True, tweet_mode="extended")
return status
def load_alt_bot_user(self):
self.alt_bot_user = self.api.verify_credentials()
if not self.alt_bot_user:
raise Exception('Can not connect: verify credentials')
logging.info('Credentials are ok.')
def get_followers_from_api(self, screen_name: str, kindly_sleep: float = 60) -> Set[Tuple[str, int]]:
"""
Read the followers list for the screen_name user
:param screen_name: user to get its followers
:param kindly_sleep: time to sleep to prevent overloading the API, 15 requests in 15 minutes
:return: yields pair of (screen_name, id)
"""
result = set() # type: Set[Tuple[str, int]]
for page in tweepy.Cursor(self.api.followers, screen_name=screen_name, count=500).pages():
begin = time.time()
for p in page:
result.add((p.screen_name, p.id))
# go to sleep some time to avoid being banned
time.sleep(max(kindly_sleep - (time.time() - begin), 0))
return result
def get_allowed_to_dm_from_api(self) -> Set[int]:
"""
Read the followers list for the screen_name user
:param screen_name: user to get its followers
:return: list of (user_id)
"""
result = self.get_retweeters(ACCEPT_DM_TWEET_ID)
return result
def get_friends_from_api(self, screen_name: str, kindly_sleep: float = 60) -> Set[Tuple[str, int]]:
"""
Read the users being followed for the screen_name user (i.e its friends)
:param screen_name: user to get its friends
:param kindly_sleep: time to sleep to prevent overloading the API, 15 requests in 15 minutes
:return: set of pairs (screen_name, id)
"""
result = set() # type: Set[Tuple[str, int]]
for page in tweepy.Cursor(self.api.friends, screen_name=screen_name, count=500).pages():
begin = time.time()
for p in page:
result.add((p.screen_name, p.id))
# go to sleep some time to avoid being banned
time.sleep(max(kindly_sleep - (time.time() - begin), 0))
return result
def get_last_tweets_for_account(self, screen_name: str, n_tweets: int, include_rts: bool = False) -> List[str]:
"""
get the last n_tweets for @screen_name user, as a list of tweeter_id strings
:param screen_name: name of the account to extract its tweets
:param n_tweets: max number of tweets to extract of accounts, 0 <= n_tweets <= 200
:param include_rts: wether to include re tweets or not
:return: List of ids for last tweets
"""
try:
results = self.api.user_timeline(screen_name=screen_name,
# 200 is the maximum allowed count
count=n_tweets,
include_rts=include_rts,
# Necessary to keep full_text
# otherwise only the first 140 words are extracted
tweet_mode='extended'
)
tweets_ids = [tweet.id_str for tweet in results]
except tweepy.error.TweepError as tpe:
logging.error(f'can not extract tweets for {screen_name}: {tpe}')
tweets_ids = []
return tweets_ids
def fav_tweet(self, tweet_id: str) -> None:
"""
Add a fav (like) to the tweet with id tweet_id
:param tweet_id: id of the tweet to be faved
:return: None
"""
if self.live:
try:
self.api.create_favorite(tweet_id)
except tweepy.error.TweepError as tw_error:
logging.error(f'Can not fav tweet {tweet_id}: {tw_error}')
logging.debug(f'[live={self.live}] - fav {tweet_id}')
def reply(self, reply_to: str, msg: str, tweet_id: str) -> None:
"""
Write a tweet in response to the tweet_id with te message msg
:param reply_to: string containing the user to reply the tweet
:param msg: string containing the message to tweet
:param tweet_id: tweet ID to reply
:return: None
"""
msg = f'@{reply_to} {msg}'
if self.live:
try:
status = self.api.update_status(
status=msg,
in_reply_to_status_id=tweet_id
)
except tweepy.error.TweepError as tw_error:
logging.error(f'Can not send tweet to {reply_to} in reply '
f'to {self.get_tweet_url(reply_to, tweet_id)}: {tw_error}')
logging.debug(f'[live={self.live}] - reply tweet to {tweet_id} in {len(msg)} chars: [{msg}]'.replace("\n", ";"))
def reply_thread(self, reply_to: str, thread_message: List[str], tweet_id: str) -> None:
"""
Write a tweet in response to the tweet_id with te message msg;
:param reply_to: string containing the user to reply the tweet
:param thread_message: list of messages to tweet as a thread
:param tweet_id: tweet ID to reply
:return: None
"""
logging.debug(f'Collapse thread with {len(thread_message)} messages...')
thread_message = self.collapse_text_in_tweets(thread_message)
logging.debug(f'Collapsed thread now contains {len(thread_message)} messages...')
for single_message in thread_message:
msg = single_message # f'@{reply_to} {single_message}'
if self.live:
try:
status = self.api.update_status(
status=msg,
in_reply_to_status_id=tweet_id,
auto_populate_reply_metadata=True
)
tweet_id = status.id
except tweepy.error.TweepError as tw_error:
logging.error(f'Can not send tweet to {reply_to} in reply '
f'to {self.get_tweet_url(reply_to, tweet_id)}: {tw_error}')
logging.debug(
f'[live={self.live}] - reply tweet to {tweet_id} in {len(msg)} chars: [{msg}]'.replace("\n", ";"))
def write_tweet(self, message: str) -> None:
"""
Write a tweet in response to the tweet_id with te message msg;
:param message: message to tweet
:return: None
"""
if self.live:
try:
self.api.update_status(
status=message
)
except tweepy.error.TweepError as tw_error:
logging.error(f'Can not send tweet {message}: {tw_error}')
logging.debug(
f'[live={self.live}] - tweet [{message}] ({len(message)} chars)'.replace("\n", ";"))
def direct_message(self, recipient_name: str, recipient_id: int, msg: str) -> int:
"""
send a direct message with the msg tex to the message_to user
:param recipient_name: user name of user to recieve the DM, just for logging
:param recipient_id: user to recieve the DM
:param msg: message to be send, should contain less than 10k chars
:return:
0 if everything went ok;
1 if could not send message for 349
-1 otherwise
"""
try:
if self.live:
self.api.send_direct_message(recipient_id, msg)
logging.debug(f'[live={self.live}] - send Direct Message to {recipient_id}: [[{msg}]]'.replace("\n", ";"))
ret = 0
except tweepy.error.TweepError as tw_error:
if tw_error.api_code == 349:
# we do not follow the user or DMs are closed or we're blocked
logging.info(f'Can not send message to {recipient_name}: {tw_error}')
ret = 1
else:
logging.error(f'Unknown: Can not send message to {recipient_name}: {tw_error}')
ret = -1
return ret
def follow_user(self, screen_name: str) -> None:
"""
Let the bot follow the user @screen_name
:param screen_name: name of the user to be followed by the bot
:return: None
"""
try:
if self.live:
self.api.create_friendship(screen_name)
logging.debug(f'[live={self.live}] - Now following {screen_name}')
except tweepy.error.TweepError as tw_error:
logging.error(f'Can not follow user {screen_name}: {tw_error}')
def get_mentions(self, since_id) -> List[tweepy.models.Status]:
"""
Get last mentions to the bot since the mention since_id
:param since_id: id o the oldest tweet which mention the bot
:return: List of tweets that mention the bot
"""
# 75 request/15 min
mentions = []
try:
for page in tweepy.Cursor(self.api.mentions_timeline, since_id=since_id, count=LAST_N_MENTIONS).pages():
# begin = time.time()
for p in page:
mentions.append(p)
except tweepy.error.TweepError as tw_error:
logging.error(f'Can not load mentions: an error occurred: {tw_error}')
return mentions
# endregion
# region: main logic
@staticmethod
def split_text_in_tweets(text: str) -> List[str]:
"""
Split the given string text into a list of strings where each string is shorter than MAX_CHARS_IN_TWEET
:param text: text to be split
:return: list of strings where each of them is shorter than MAX_CHARS_IN_TWEET
"""
result = []
words = text.split(' ')
n = 0
j = 0
for i, word in enumerate(words):
n += len(word) + 1 # +1 to consider spaces
if n >= MAX_CHARS_IN_TWEET:
result.append(' '.join(words[j:i]))
j = i
n = len(word)
result.append(' '.join(words[j:]))
assert sum([len(m.replace(' ', '')) for m in result]) == len(text.replace(' ', ''))
assert all([len(m) <= MAX_CHARS_IN_TWEET for m in result])
return result
@staticmethod
def collapse_text_in_tweets(tweets: List[str]) -> List[str]:
"""
Collapse the given list of string text into a shorter list of strings
where each string is shorter than MAX_CHARS_IN_TWEET
:param tweets: list of tweets (strings of len <= 280) to be joined
:return: list of strings where each of them is shorter than MAX_CHARS_IN_TWEET
"""
result = []
n = 0
j = 0
for i, tweet in enumerate(tweets):
n += len(tweet) + 1 # +1 to consider '\n'
if n >= MAX_CHARS_IN_TWEET:
result.append('\n'.join(tweets[j:i]))
j = i
n = len(tweet)
result.append('\n'.join(tweets[j:]))
# remove empty results; usually when first
result = [x for x in result if len(x) > 0]
assert len(result) <= len(tweets)
assert all([len(m) <= MAX_CHARS_IN_TWEET for m in result])
return result
def update_followers_if_needed(self, needed: bool) -> None:
"""
Update local list of followers if needed or #localFollowers != #realFollowers
:param needed: Update the followers local list, no matter if is the same as in real Tweeter
:return: None
"""
n_local_followers = self.db.count_followers()
n_real_followers = self.alt_bot_user.followers_count
logging.info(f'Locally have {n_local_followers} followers currently they are {n_real_followers}. '
f'Needed = {needed}')
if n_local_followers != n_real_followers or needed:
local_followers = self.db.get_followers()
logging.info(f'Updating local followers...')
# need to update
real_followers = self.get_followers_from_api(ALT_BOT_NAME)
new_followers = real_followers - local_followers
lost_followers = local_followers - real_followers
logging.info(f'New followers: {"; ".join([f[0] for f in new_followers])}')
logging.info(f'Lost followers: {"; ".join([f[0] for f in lost_followers])}')
logging.info(f'New followers: {len(new_followers)} Lost followers: {len(lost_followers)} '
f'Win followers: {len(new_followers) - len(lost_followers)}')
self.db.update_followers(new_followers, lost_followers)
def update_allowed_to_dm_if_needed(self, needed: bool) -> None:
"""
Update local list of followers if needed or #localFollowers != #realFollowers
:param needed: Update the followers local list, no matter if is the same as in real Tweeter
:return: None
"""
n_local_allowed_to_dm = self.db.count_allowed_to_dm()
n_real_allowed = self.get_tweet(ACCEPT_DM_TWEET_ID).retweet_count
logging.info(f'Locally have {n_local_allowed_to_dm} allowed_to_dm currently they are {n_real_allowed}. '
f'Needed = {needed}')
if n_local_allowed_to_dm != n_real_allowed or needed:
local_allowed = self.db.get_allowed_to_dm()
logging.info(f'Updating local allowed...')
# need to update
real_allowed = self.get_allowed_to_dm_from_api()
new_allowed = real_allowed - local_allowed
lost_allowed = local_allowed - real_allowed
logging.info(f'New allowed: {len(new_allowed)} Lost allowed: {len(lost_allowed)} '
f'Win allowed: {len(new_allowed) - len(lost_allowed)}')
self.db.update_allowed_to_dm(new_allowed, lost_allowed)
def update_friends_if_needed(self, needed: bool) -> None:
"""
Update local list of followers if needed or #localFollowers != #realFollowers
:param needed: Update the followers local list, no matter if is the same as in real Tweeter
:return: None
"""
n_local_friends = self.db.count_friends()
n_real_friends = self.alt_bot_user.friends_count
logging.info(f'Locally have {n_local_friends} friends currently they are {n_real_friends}. '
f'Needed = {needed}')
if n_local_friends != n_real_friends or needed:
local_friends = self.db.get_friends()
logging.info(f'Updating local friends...')
# need to update
real_friends = self.get_friends_from_api(ALT_BOT_NAME)
new_friends = real_friends - local_friends
lost_friends = local_friends - real_friends
logging.info(f'New friends: {"; ".join([f[0] for f in new_friends])}')
logging.info(f'Lost friends: {"; ".join([f[0] for f in lost_friends])}')
logging.info(f'New friends: {len(new_friends)} Lost friends: {len(lost_friends)} '
f'Win friends: {len(new_friends) - len(lost_friends)}')
self.db.update_friends(new_friends, lost_friends)
@staticmethod
def get_tweet_url(user_screen_name: str, tweet_id: str) -> str:
"""
Return the public url corresponding to the given tweet
:param user_screen_name: screen name of the user who wrote the tweet
:param tweet_id: id of thetweet
:return: public url for the tweet
"""
return f'https://twitter.com/{user_screen_name}/status/{tweet_id}'
def get_alt_text(self, tweet_id: str) -> Union[List[Union[str, None]], int, None]:
"""
This method gets back alt_text from the given tweet_id
:param tweet_id: str identifying a tweet
:return: if the tweet does not contain media, returns None
if the tweet contain images, returns a list with.
Each element of the list contains a string with the alt_text if available,
None otherwise.
Consider a single tweet may contain up to 4 images and each of them can not contain an alt_text.
if tweet can't be read, then return -1
"""
try:
tweet = self.get_tweet(tweet_id)
except tweepy.TweepError as e:
logging.info(f'Can not read tweet {tweet_id}. Exception thrown {e}')
return -1
if hasattr(tweet, 'extended_entities'):
if len(tweet.extended_entities['media']) > 0:
result = [media['ext_alt_text'] for media in tweet.extended_entities['media'] if
media['type'] == 'photo']
logging.debug(f'Tweet {tweet_id} contains extended_entities and media: {result}.')
else:
# This is a tweet without media, not sure if this can happen
logging.debug(f'Tweet {tweet_id} contains extended_entities but not media.')
result = None
else:
# This is a tweet without images or multimedia
logging.debug(f'Tweet {tweet_id} does not contain extended_entities.')
result = None
return result
@staticmethod
def compute_alt_text_score(alt_texts: List[Union[str, None]]) -> float:
"""
Return the portion of alt_texts which in fact contain alt_text
:param alt_texts: non-empty list of alt_texts
:return: score in [0,1]
"""
alt_text_count = [1 if at else 0 for at in alt_texts]
return round(sum(alt_text_count) / len(alt_text_count), 2)
def process_account(self, screen_name: str, user_id: int, follower: bool, allowed_to_be_dmed: bool,
n_tweets: int) -> None:
"""
Process an account checking its last n_tweets:
- If all images in tweet contain alt_text, then it is faved
- If some images in tweet does not contain alt_text, then DM for followers who allowed_to_be_DMed or ignore
- Otherwise ignore it
:param screen_name: account to be processed
:param user_id: user_id to be processed, only used to send DMs (followers)
:param follower: whether or not the screen_name account is a follower
:param allowed_to_be_dmed: whether or not the bot is allowed to contact the user via DM
:param n_tweets: number of tweets to consider
:return: None
"""
last_tweets = self.get_last_tweets_for_account(screen_name, n_tweets)
for tweet_id in last_tweets:
try:
if self.db.tweet_was_processed(tweet_id):
# skip the tweet since it was already processed
continue
logging.info(f'Processing tweet {self.get_tweet_url(screen_name, tweet_id)}')
alt_texts = self.get_alt_text(tweet_id)
if alt_texts == -1:
# the tweet could not be read
logging.debug(f'This tweet can not be read by us: '
f'{self.get_tweet_url(screen_name, tweet_id)}')
self.db.save_processed_tweet(tweet_id)
continue
if alt_texts is None or not alt_texts:
# skip since the tweet does not contain images
logging.debug(f'This tweet is not interesting for us: '
f'{self.get_tweet_url(screen_name, tweet_id)}')
self.db.save_processed_tweet(tweet_id)
continue
alt_text_score = self.compute_alt_text_score(alt_texts)
if alt_text_score == 1:
# all of the images contains alt_text, let's like it
logging.debug(f'All images in tweet contain alt texts: '
f'{self.get_tweet_url(screen_name, tweet_id)}')
self.fav_tweet(tweet_id)
else:
# there are some images without alt_text; alert message needed
if follower and allowed_to_be_dmed:
# if it is a follower who allowed to be DMed by the bot, write a DM
logging.debug(f'Some images ({alt_text_score*100} %) in tweet does not contain alt texts: '
f'{self.get_tweet_url(screen_name, tweet_id)} | '
f'DM the user, this is a follower')
self.direct_message(screen_name, user_id, AUTO_DM_NO_ALT_TEXT.format(
self.get_tweet_url(screen_name, tweet_id)))
else:
# if it is not a follower or is not allowed to be DMed by the bot, just log it
logging.debug(f'Some images ({alt_text_score*100} %) in tweet does not contain alt texts: '
f'{self.get_tweet_url(screen_name, tweet_id)} | '
f'IGNORED: follower: {follower} allowed_to_be_DMed: {allowed_to_be_dmed}')
# Compute user_alt_text_X as param to save each alt_text
user_alt_texts_params = {f'user_alt_text_{idx}': text for idx, text in enumerate(alt_texts, start=1)}
self.db.save_processed_tweet(tweet_id)
self.db.save_processed_tweet_with_with_alt_text_info(screen_name, user_id, tweet_id, len(alt_texts),
alt_text_score, **user_alt_texts_params)
except Exception as e:
logging.error(f'Exception: {e} while processing tweet '
f'https://twitter.com/{screen_name}/status/{tweet_id}', exc_info=True)
def process_followers(self, followers: Set[Tuple[str, int]], users_accepted: Set[int]) -> None:
"""
Process each follower account in followers set with self.process_account, as followers
:param followers: set of followers to be processed
:param users_accepted: set of user ids who accepted to receive DMs
:return: None
"""
n_followers = len(followers)
for i, (follower_screen_name, follower_id) in enumerate(followers):
logging.info(f'[{i}/{n_followers}] Processing follower @{follower_screen_name}...')
try:
self.process_account(follower_screen_name, follower_id, follower=True, n_tweets=LAST_N_TWEETS,
allowed_to_be_dmed=follower_id in users_accepted)
except Exception as e:
logging.error(f'Error while processing follower: {follower_screen_name}:\n{e}')
continue
def process_friends(self, friends: Set[Tuple[str, int]], followers: Set[Tuple[str, int]]) -> None:
"""
Process each friend account in friends set with self.process_account, as friends if they are not in
followers set, otherwise skip their processing
:param friends: set of friends
:param followers: set of followers
:return: None
"""
followers_ids = {f[1] for f in followers} # type: Set[int]
n_friends = len(friends)
for i, (friend_screen_name, friend_id) in enumerate(friends):
if friend_id in followers_ids:
# this friend is also a follower, we can skip it
continue
logging.info(f'[{i}/{n_friends}] Processing friend @{friend_screen_name}...')
try:
self.process_account(friend_screen_name, friend_id, follower=False, n_tweets=LAST_N_TWEETS,
allowed_to_be_dmed=False)
except Exception as e:
logging.error(f'Error while processing follower: {friend_screen_name}:\n{e}')
continue
def process_tweets_in_reply_to_other_tweet(self, mentions: List[tweepy.models.Status]):
for mention in mentions:
# need to check that only the bot is mention here; otherwise ignore it
if self.check_text_only_mention_bot(mention.text, self.alt_bot_user.screen_name):
logging.debug('Processing mention since only the bot was named')
self.process_mention_in_reply_to_tweet(mention)
else:
logging.debug(f'skipping mention since not only the bot was named: {mention.text}')
logging.debug(self.get_tweet_url(mention.author.screen_name, mention.id))
def process_mention_in_reply_to_tweet(self, tweet) -> None:
"""
mention is a tweet which mentioned AltBotUY in reply to another tweet; need to get this another tweet and
check to see if there are images in it, with or without alt_text.
:param tweet:
:return None:
"""
tweet_to_process_screen_name = tweet.in_reply_to_screen_name # type: str
tweet_to_process_user_id = tweet.in_reply_to_user_id # type: int
tweet_to_process_tweet_id = tweet.in_reply_to_status_id # type: int
tweet_to_process_url = self.get_tweet_url(tweet_to_process_screen_name, str(tweet_to_process_tweet_id))
tweet_to_reply_screen_name = tweet.author.screen_name # str
tweet_to_reply_id = tweet.id_str # type: str
if self.db.tweet_was_processed(str(tweet_to_process_tweet_id)):
logging.debug(f'This twit was already processed: {tweet_to_process_url} ; lets check on DB')
# we already processed this tweet; take results from DB
alt_text_info = self.db.get_alt_text_info_from_tweet(str(tweet_to_process_tweet_id))
if alt_text_info is None:
# the tweet does not contain an image
logging.debug(f'Tweet being reply was already processed and does not contain images')
self.reply(tweet_to_reply_screen_name,
AUTO_REPLY_NO_IMAGES_FOUND.format(tweet_to_process_screen_name), tweet_to_reply_id)
else:
alt_text_score = alt_text_info['alt_score'] # type: float
# The tweet contain images, if there were alt_text BUT we don't have them in our DB,
# then need to recover them
if alt_text_score > 0 and all([txt is None for txt in alt_text_info['user_alt_text']]):
# the tweet contain images with alt_text but we didn't have it, so lets download it and check
alt_text_info['user_alt_text'] = self.get_alt_text(str(tweet_to_process_tweet_id))
if user_alt_text != -1:
update_params = {f'user_alt_text_{i}': txt for i, txt in
enumerate(alt_text_info['user_alt_text'], start=1)}
self.db.update_user_alt_text_info(str(tweet_to_process_tweet_id), **update_params)
if alt_text_score > 0:
if alt_text_info['user_alt_text'] != -1:
# there are some alt_texts, let's write the thread as a list of messages
alt_text_messages = [HEADER_ALT_TEXT_USER_PROVIDED.format(screen_name=tweet_to_process_screen_name)]
for template, text in zip(ALL_ALT_TEXT_USER_PROVIDED, alt_text_info['user_alt_text']):
if text is None:
continue
# alt text may contain up to 1000 chars, so we may need to split each into several tweets
alt_text_messages.extend(self.split_text_in_tweets(template.format(alt_text=text)))
else:
# the tweet is not available. Only happens for old tweets in our DBs which
# do not have its alt text on DB but when we tried to recover it, it was no more available
alt_text_messages = [UNAVAILABLE_TWEET.format(screen_name=tweet_to_process_screen_name)]
else:
# no alt_texts were provided
alt_text_messages = []
if alt_text_info['user_alt_text'] == -1:
self.reply_thread(tweet_to_reply_screen_name,
[UNAVAILABLE_TWEET.format(screen_name=tweet_to_process_screen_name)],
tweet_to_reply_id)
elif alt_text_score < 1:
logging.debug(f'Tweet being reply was already processed and NOT all images contain alt_text')
alt_text_messages = [SINGLE_USER_NO_ALT_TEXT_QUERY.format(
tweet_to_process_screen_name)] + alt_text_messages
self.reply_thread(tweet_to_reply_screen_name, alt_text_messages, tweet_to_reply_id)
else:
logging.debug(f'Tweet being reply was already processed and ALL images contain alt_text')
alt_text_messages = [SINGLE_USER_WITH_ALT_TEXT_QUERY.format(
tweet_to_process_screen_name)] + alt_text_messages
self.reply_thread(tweet_to_reply_screen_name, alt_text_messages, tweet_to_reply_id)
else:
# tweet is not in our DB; we need to get it from the API and process accordingly
alt_texts = self.get_alt_text(str(tweet_to_process_tweet_id))
if alt_texts==-1:
# can not download the tweet
logging.debug(f'This tweet is not interesting for us, we can not read it {tweet_to_process_url}')
self.db.save_processed_tweet(str(tweet_to_process_tweet_id))
self.reply(tweet_to_reply_screen_name,
UNAVAILABLE_TWEET.format(screen_name=tweet_to_process_screen_name), tweet_to_reply_id)
elif alt_texts is None or not alt_texts:
# skip since the tweet does not contain images
logging.debug(f'This tweet is not interesting for us: {tweet_to_process_url}')
self.db.save_processed_tweet(str(tweet_to_process_tweet_id))
self.reply(tweet_to_reply_screen_name,
AUTO_REPLY_NO_IMAGES_FOUND.format(tweet_to_process_screen_name), tweet_to_reply_id)
else:
alt_text_score = self.compute_alt_text_score(alt_texts)
alt_text_messages = [HEADER_ALT_TEXT_USER_PROVIDED.format(screen_name=tweet_to_process_screen_name)]
for template, text in zip(ALL_ALT_TEXT_USER_PROVIDED, alt_texts):
if text is None:
continue
# alt text may contain up to 1000 chars, so we may need to split each into several tweets
alt_text_messages.extend(self.split_text_in_tweets(template.format(alt_text=text)))
if alt_text_score == 1:
# all of the images contains alt_text, let's like it
logging.debug(f'All images in tweet contain alt texts: {tweet_to_process_url}')
self.fav_tweet(str(tweet_to_process_tweet_id))
alt_text_messages = [SINGLE_USER_WITH_ALT_TEXT_QUERY.format(
tweet_to_process_screen_name)] + alt_text_messages
self.reply_thread(tweet_to_reply_screen_name, alt_text_messages, tweet_to_reply_id)
else:
# some images with out alt_text; reply the tweet with proper message
logging.debug(f'Some images ({alt_text_score * 100} %) in tweet does not contain '
f'alt texts: {tweet_to_process_url}')
alt_text_messages = [SINGLE_USER_NO_ALT_TEXT_QUERY.format(
tweet_to_process_screen_name)] + alt_text_messages
self.reply_thread(tweet_to_reply_screen_name, alt_text_messages, tweet_to_reply_id)
# also reply to the author if needed
if self.db.is_allowed_to_dm(tweet_to_process_user_id) and self.db.is_follower(
tweet_to_process_user_id):
logging.debug(f'the user is a follower with DMs allowed, so, need to write DM to user')
self.direct_message(tweet_to_process_screen_name, tweet_to_process_user_id,
AUTO_REPLY_NO_DM_NO_ALT_TEXT.format(tweet_to_process_url))
# Compute user_alt_text_X as param to save each alt_text
user_alt_texts_params = {f'user_alt_text_{idx}': text for idx, text in enumerate(alt_texts, start=1)}
# save the processed tweet as processed with images data
self.db.save_processed_tweet_with_with_alt_text_info(tweet_to_process_screen_name,
tweet_to_process_user_id,
str(tweet_to_process_tweet_id),
len(alt_texts), alt_text_score,
**user_alt_texts_params)
# save the processed tweet as processed if needed; notice that the tweet may be already processed
# happens when user A tweets an image,
# user B (bot's follower or friend) reply A's tweet
# the watch use case is run; B's reply is processed
# the mentions use case is run, B's reply must be processed again since
# now we're checking for A's tweet
self.db.save_processed_tweet(str(tweet_to_process_tweet_id), do_not_fail=True)
@staticmethod
def check_text_only_mention_users(text: str) -> bool:
"""
Check if text only contain mentions to any user
:param text: tweet text
:return: True iff tweet only contains users mentioned
"""
# remove named users in text
result = re.sub(r'@[a-z\d_]{1,15}', '', text, flags=re.IGNORECASE)
# remove empty chars and some punctuation before returning
result = re.sub(r'[\s.:,;-]*', '', result)
return len(result) == 0
@staticmethod
def check_text_only_mention_bot(text: str, bot_screen_name: str) -> bool:
"""
Check if text only contain mentions to @bot_screen_name
:param text: tweet text
:param bot_screen_name: bot_screen_name
:return: True iff tweet only contains @bot_screen_name mentioned
"""
# remove named users in text (all users being reply and the bot)
result = re.sub(r'^(@[a-z\d_]{1,15} )*' + f'@{bot_screen_name}', '', text, flags=re.IGNORECASE)
# remove empty chars and some punctuation before returning
result = re.sub(r'[\s.:,;-]*', '', result)
return len(result) == 0
def process_mentioned_users_in_tweet(self, tweet: tweepy.models.Status) -> None:
"""
tweet is an original tweetwhich mentionthe bot; we need to extract other accounts mentioned in the tweet
(up to MAX_MENTIONS_TO_PROCESS), process each of those and reply to tweet with a small report on the usage of
alt_text.
:param tweet: tweet whose mentions are going to be processed
:return: None
"""
report = []
# get users mentioned filtering out the bot user
mentions_without_bot = [user_mentioned for user_mentioned in tweet.entities['user_mentions']
if user_mentioned['screen_name'].lower() != self.alt_bot_user.screen_name.lower()]
n = min(MAX_MENTIONS_TO_PROCESS, len(mentions_without_bot))
tweet_url = self.get_tweet_url(tweet.author.screen_name, tweet.id)
# check the users mentiioned in the tweet
for i, user in enumerate(mentions_without_bot, start=1):
logging.debug(f"[{i}/{n}] processing mentioned user: @{user['screen_name']} ({tweet_url})")
# need to check if tweets we have are fresh enough
last_date = self.db.get_last_tweet_with_info_date(user['id'])
if last_date is None or (datetime.now() - last_date).days > MAX_DAYS_TO_REFRESH_TWEETS:
# the user is not in our DB or there are no recent tweets from him
# lets get some of its tweets
follower = self.db.is_follower(user['id'])
allowed = self.db.is_allowed_to_dm(user['id'])
logging.debug(f"Processing @{user['screen_name']} account since most recent tweet is from {last_date}")
# notice that this line will send the user a DM if needed
self.process_account(user['screen_name'], user['id'], follower, allowed, LAST_N_TWEETS_MAX)
score, n_images = self.db.get_percentage_of_alt_text_usage(user['id'])
logging.debug(f"@{user['screen_name']}: score is {score} in {n_images}")
if score < 0:
# score may still be < 0 if the user didn't posted any image recently
report.append(SINGLE_USER_NO_IMAGES_FOUND_REPORT.format(screen_name=user['screen_name']))
else:
report.append(SINGLE_USER_REPORT.format(screen_name=user['screen_name'],
score=score, n_images=n_images))
if len(report) == MAX_MENTIONS_TO_PROCESS:
logging.info(f'[{i}/{n}] Stop processing mentioned users since {MAX_MENTIONS_TO_PROCESS} already processed')
break
if len(report) > 0:
# report can be empty, for instance, if no user is mentioned but the bot
# reply_to: str, msg: str, tweet_id: str
logging.debug(f'reply with report for mentioned accounts')
# add header and footer to report
report.insert(0, HEADER_REPORT)
report.append(FOOTER_REPORT)
# convert report to string
report = '\n'.join(report)
self.reply(msg=report, reply_to=tweet.author.screen_name, tweet_id=tweet.id_str)
# save the processed tweet as processed if needed; notice that the tweet may be already processed
# happens when user A (bot's follower or friend) tweets mentioning some accounts,
# the watch use case is run; A's tweet is processed
# the mentions use case is run, A's tweet mentioning other accounts must be processed again since
# now we're checking for accounts mentioned in A's tweet
self.db.save_processed_tweet(str(tweet.id), do_not_fail=True)
def process_original_tweets_mentioning_bot(self, tweets: List[tweepy.models.Status]):
"""
process all original tweets that mention the bot: those tweets that only mention the bot and some other accounts
(i.e. no more text than this) a report is given for the mentioned accounts.
:param tweets: list of original tweets to be processed
:return:
"""
for tweet in tweets:
if tweet.author.screen_name.lower() == self.alt_bot_user.screen_name.lower():
logging.debug(f'Skip processing this mention since was written by the bot.')
continue
# here we also need to check if no other text than other mention is included and no media contained
if self.check_text_only_mention_users(tweet.text):
logging.debug(f'Process mention; Only users are mentioned in this tweet: {tweet.text}')
self.process_mentioned_users_in_tweet(tweet)
else:
logging.debug(f'Skip processing mention: Not only users are mentioned in this tweet: {tweet.text}')
logging.debug(self.get_tweet_url(tweet.author.screen_name, tweet.id))
# endregion
# region: use cases
def process_mentions(self) -> None:
"""
process last mentions to the bot
:return: None
"""
last_mention_id = self.db.get_last_mention_id()
mention_tweets = self.get_mentions(last_mention_id)
tweets_in_reply_to_other_mentioning_bot = []
original_tweets_mentioning_bot = []
next_last_mention_id = last_mention_id
for tweet in mention_tweets:
if tweet.in_reply_to_status_id is None:
# this is an original tweet; need to process the mentioned accounts
original_tweets_mentioning_bot.append(tweet)
else:
# this tweet is in reply to some other tweet; need to check this previous tweet
tweets_in_reply_to_other_mentioning_bot.append(tweet)
if tweet.id > next_last_mention_id:
next_last_mention_id = tweet.id
logging.info(f'[USE CASE] Processing original tweets mentioning the bot')
self.process_original_tweets_mentioning_bot(original_tweets_mentioning_bot)
logging.info(f'[USE CASE] Processing tweets that mention the bot AND reply to other tweets')
self.process_tweets_in_reply_to_other_tweet(tweets_in_reply_to_other_mentioning_bot)
self.db.update_last_mention_id(next_last_mention_id)
def watch_for_alt_text_usage_in_followers(self) -> None:
"""
Process all followers of AltBotUY to check for alt_text usage:
- If all images in tweet contain alt_text, then it is faved
- If some images in tweet does not contain alt_text, then DM for followers who accepted to be DMed or ignore
- Otherwise ignore it
Processed tweets are saved for reports
:return: None
"""
allowed_to_be_dmed = self.db.get_allowed_to_dm()
followers = self.db.get_followers()
self.process_followers(followers, allowed_to_be_dmed)
logging.info(f'{len(followers)} followers were processed, {len(allowed_to_be_dmed)} allowed to DM '
f'({len(allowed_to_be_dmed)/len(followers)*100:.2} %)')
def watch_for_alt_text_usage_in_friends(self) -> None:
"""
Process all friends of AltBotUY to check for alt_text usage:
- If all images in tweet contain alt_text, then it is faved
- Otherwise ignore it.
Processed tweets are saved for reports
:return: None
"""
followers = self.db.get_followers()
friends = self.db.get_friends()
self.process_friends(friends, followers)
logging.info(f'{len(friends)} friends were processed.')
def send_message_to_all_followers(self, msg: str) -> None:
"""
Send a DM to every follower
:param msg: string message to the followers or path to the file containing the message
:return: None
"""
if os.path.isfile(msg):
logging.info(f'Reading message from file {msg}')
with open(msg, 'r') as f:
msg = f.read()
logging.info(f'Read message: {msg}')
followers = self.db.get_followers()
msg_sent = 0
for follower_screen_name, follower_id in followers:
if self.direct_message(follower_screen_name, follower_id, msg) == 0:
msg_sent += 1
else:
logging.info(f'Can not write DM to {follower_screen_name}')
logging.info(f'{msg_sent}/{len(followers)} messages sent')