-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathwiki_to_kafka.py
36 lines (29 loc) · 1.16 KB
/
wiki_to_kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import json
import sseclient
import datetime
import requests
from confluent_kafka import Producer
def with_requests(url, headers):
return requests.get(url, stream=True, headers=headers)
def acked(err, msg):
if err is not None:
print(f"Failed to deliver message: {msg.value()}: {err.str()}")
def json_serializer(obj):
if isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
raise f"Type {type(obj)} not serializable"
url = 'https://stream.wikimedia.org/v2/stream/recentchange'
headers = {'Accept': 'text/event-stream'}
response = with_requests(url, headers)
client = sseclient.SSEClient(response)
producer = Producer({'bootstrap.servers': 'localhost:9092'})
events_processed = 0
for event in client.events():
stream = json.loads(event.data)
payload = json.dumps(stream, default=json_serializer, ensure_ascii=False).encode('utf-8')
producer.produce(topic='wiki-events', key=str(stream['meta']['id']),
value=payload, callback=acked)
events_processed += 1
if events_processed % 100 == 0:
print(f"{str(datetime.datetime.now())} Flushing after {events_processed} events")
producer.flush()