Skip to content

Commit

Permalink
Create device_communication.py
Browse files Browse the repository at this point in the history
  • Loading branch information
KOSASIH committed May 11, 2024
1 parent 9cc7d57 commit d5cec43
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions models/device_drivers/device_communication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Import necessary libraries
import paho.mqtt.client as mqtt
import coapthon.client as coap
import requests

# Define a class for device communication
class DeviceCommunication:
def __init__(self, device_config):
# Initialize device communication protocol
self.protocol = device_config['protocol']
self.host = device_config['host']
self.port = device_config['port']
self.topic = device_config['topic']

def mqtt_publish(self, message):
# Publish a message using MQTT protocol
client = mqtt.Client()
client.connect(self.host, self.port, 60)
client.publish(self.topic, message)
client.disconnect()

def mqtt_subscribe(self, callback):
# Subscribe to a topic using MQTT protocol
def on_connect(client, userdata, flags, rc):
client.subscribe(self.topic)

def on_message(client, userdata, msg):
callback(msg.payload)

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(self.host, self.port, 60)
client.loop_forever()

def coap_get(self, resource):
# Get a resource using CoAP protocol
client = coap.Client()
client.uri_format = 'coap://{host}:{port}/{resource}'.format(host=self.host, port=self.port, resource=resource)
response = client.get()
return response.payload

def coap_put(self, resource, payload):
# Put a payload to a resource using CoAP protocol
client = coap.Client()
client.uri_format = 'coap://{host}:{port}/{resource}'.format(host=self.host, port=self.port, resource=resource)
response = client.put(payload)
return response.payload

def http_get(self, url):
# Get a URL using HTTP protocol
response = requests.get(url)
return response.text

def http_post(self, url, data):
# Post data to a URL using HTTP protocol
response = requests.post(url, data=data)
return response.text

0 comments on commit d5cec43

Please sign in to comment.