-
Notifications
You must be signed in to change notification settings - Fork 0
/
7) producer.py
33 lines (27 loc) Β· 859 Bytes
/
7) producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#Tashfeen Abbasi
#Laiba Mazhar
#Rafia Khan
#Apache-Kafka-and-Frequent-Item-sets
from kafka import KafkaProducer
import json
import time
#Preprocessed record
def preprocess(record):
return record
#Stream Data
def streaming(file_path, topic_name):
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
with open(file_path, 'r') as infile:
for i in infile:
record = json.loads(i.strip())
processed_record = preprocess(record)
producer.send(topic_name, value=processed_record)
time.sleep(0.1)
producer.flush()
producer.close()
#Entry point
if __name__ == "__main__":
file_path ='/home/laibu/preprocessed_data.json'
topic_name = 'assignment'
streaming(file_path, topic_name)