forked from andrewsosa/firebase-python-streaming
-
Notifications
You must be signed in to change notification settings - Fork 0
/
firebase_streaming.py
173 lines (133 loc) · 4.81 KB
/
firebase_streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# adapted from firebase/EventSource-Examples/python/chat.py by Shariq Hashme
from sseclient import SSEClient
import requests
import json
import threading
import socket
import ast
def json_to_dict(response):
return ast.literal_eval(json.dumps(response))
class ClosableSSEClient(SSEClient):
def __init__(self, *args, **kwargs):
self.should_connect = True
super(ClosableSSEClient, self).__init__(*args, **kwargs)
def _connect(self):
if self.should_connect:
super(ClosableSSEClient, self)._connect()
else:
raise StopIteration()
def close(self):
self.should_connect = False
self.retry = 0
try:
self.resp.raw._fp.fp._sock.shutdown(socket.SHUT_RDWR)
self.resp.raw._fp.fp._sock.close()
except AttributeError:
pass
class RemoteThread(threading.Thread):
def __init__(self, parent, URL, function):
self.function = function
self.URL = URL
self.parent = parent
super(RemoteThread, self).__init__()
def run(self):
try:
self.sse = ClosableSSEClient(self.URL)
for msg in self.sse:
msg_test = json.loads(msg.data)
if msg_test is None: # keep-alives
continue
msg_data = json_to_dict(msg.data)
msg_event = msg.event
DEBUG = False
if msg_event == 'put':
if msg_test['path'] == "/":
self.parent.admin_id = msg_test['data']['admin_id']
self.parent.notif = msg_test['data']['notif']
self.parent.timestamp = msg_test['data']['timestamp']
elif msg_test['path'] == "/admin_id":
self.parent.admin_id = msg_test['data']
elif msg_test['path'] == "/notif":
self.parent.notif = msg_test['data']
elif msg_test['path'] == "/timestamp":
self.parent.timestamp = msg_test['data']
else:
DEBUG = True
else:
DEBUG = True
if DEBUG:
print("DEBUG event: " + msg_event)
print("DEBUG data: " + msg_data)
self.function(self.parent)
except socket.error:
pass # this can happen when we close the stream
except KeyboardInterrupt:
self.close()
def close(self):
if hasattr(self, 'sse'):
self.sse.close()
def firebaseURL(URL):
if '.firebaseio.com' not in URL.lower():
if '.json' == URL[-5:]:
URL = URL[:-5]
if '/' in URL:
if '/' == URL[-1]:
URL = URL[:-1]
URL = 'https://' + \
URL.split('/')[0] + '.firebaseio.com/' + URL.split('/', 1)[1] + '.json'
else:
URL = 'https://' + URL + '.firebaseio.com/.json'
return URL
if 'http://' in URL:
URL = URL.replace('http://', 'https://')
if 'https://' not in URL:
URL = 'https://' + URL
if '.json' not in URL.lower():
if '/' != URL[-1]:
URL = URL + '/.json'
else:
URL = URL + '.json'
return URL
class EventListener:
def __init__(self, URL, function):
self.admin_id = {}
self.notif = {}
self.timestamp = {}
self.cache = {}
self.remote_thread = RemoteThread(self, firebaseURL(URL), function)
def start(self):
self.remote_thread.start()
def stop(self):
self.remote_thread.close()
self.remote_thread.join()
def wait(self):
self.remote_thread.join()
class FirebaseException(Exception):
pass
class Firebase():
def __init__(self, name):
self.name = name
self.URL = firebaseURL(name)
def child(self, child):
return Firebase(self.name + child + "/")
def put(self, msg):
to_post = json.dumps(msg)
response = requests.put(firebaseURL(self.URL), data=to_post)
if response.status_code != 200:
raise FirebaseException(response.text)
def patch(self, msg):
to_post = json.dumps(msg)
response = requests.patch(firebaseURL(self.URL), data=to_post)
if response.status_code != 200:
raise FirebaseException(response.text)
def get(self):
response = requests.get(firebaseURL(self.URL))
if response.status_code != 200:
raise FirebaseException(response.text)
return json_to_dict(response)
def listener(self, callback=None):
def handle(response):
print(response)
return EventListener(self.name, callback or handle)
def __str__(self):
return self.name