Skip to content

Commit

Permalink
chore: add acknowledgements
Browse files Browse the repository at this point in the history
The sender may include an event ID in order to request an acknowledgement from the receiver, and the receiver must respond with an ACK packet with the same event ID
  • Loading branch information
Baiguoshuai1 committed Mar 16, 2023
1 parent 341eb11 commit 5f4b5ea
Show file tree
Hide file tree
Showing 17 changed files with 498 additions and 275 deletions.
208 changes: 138 additions & 70 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,59 @@ import (
"github.com/Baiguoshuai1/shadiaosocketio/websocket"
"log"
"net/http"
"time"
)

type Channel struct {
Channel string `json:"channel"`
}

type Message struct {
Id int `json:"id"`
Channel string `json:"channel"`
Text string `json:"text"`
}

type Desc struct {
Text string `json:"text"`
}

func main() {
server := shadiaosocketio.NewServer(*websocket.GetDefaultWebsocketTransport())

server.On(shadiaosocketio.OnConnection, func(c *shadiaosocketio.Channel) {
log.Println("received client", c.Id())

c.Emit("message", Message{10, "main", "using emit"})
log.Println("[server] connected! id:", c.Id())
log.Println("[server]", c.LocalAddr().Network()+" "+c.LocalAddr().String()+
" --> "+c.RemoteAddr().Network()+" "+c.RemoteAddr().String())

c.Join("room")
c.BroadcastTo("room", "/admin", Message{10, "main", "using broadcast"})
server.BroadcastTo("room", "/admin", Message{1, "new members!"})
time.Sleep(100 * time.Millisecond)
c.BroadcastTo("room", "/admin", Message{2, "hello everyone!"})

_ = c.Emit("message", Message{10, "server channel"})

server.BroadcastTo("room", "/admin", Message{1, "boss", "hello everyone!"})
// return [][]byte
result, err := c.Ack("/ackFromServer", time.Second*5, "go", 3)
if err != nil {
log.Println("[server] ack cb err:", err)
return
}
log.Println("[server] ack cb:", result)
})
server.On(shadiaosocketio.OnDisconnection, func(c *shadiaosocketio.Channel, reason websocket.CloseError) {
log.Println("received disconnect", c.Id(), "code:", reason.Code, "text:", reason.Text)
log.Println("[server] received disconnect", c.Id(), "code:", reason.Code, "text:", reason.Text)
})

server.On("message", func(c *shadiaosocketio.Channel, arg1 string, arg2 Message, arg3 int, arg4 bool) {
log.Println("received arg1:", arg1, "arg2:", arg2, "arg3:", arg3, "arg4:", arg4)
log.Println("[server] received message:", "arg1:", arg1, "arg2:", arg2, "arg3:", arg3, "arg4:", arg4)
})

server.On("/admin", func(c *shadiaosocketio.Channel, channel Channel) (int, string) {
log.Println("client joined to", channel.Channel, "id:", c.Id())
return 1, c.Id() + " join success!"
// listen ack event
server.On("/ackFromClient", func(c *shadiaosocketio.Channel, msg Message, num int) (int, Desc, string) {
log.Println("[server] received ack:", msg, num)
return 1, Desc{Text: "resp"}, "server"
})

serveMux := http.NewServeMux()
serveMux.Handle("/socket.io/", server)

log.Println("Starting server...")
log.Println("[server] starting server...")
log.Panic(http.ListenAndServe(":2233", serveMux))
}
```
Expand All @@ -85,133 +96,190 @@ import (
"time"
)

type Channel struct {
Channel string `json:"channel"`
}

type Message struct {
Id int `json:"id"`
Channel string `json:"channel"`
Text string `json:"text"`
}

func sendJoin(c *shadiaosocketio.Client) {
type Desc struct {
Text string `json:"text"`
}

func sendAck(c *shadiaosocketio.Client) {
// return [][]byte
result, err := c.Ack("/admin", time.Second*5, Channel{"admin"})
result, err := c.Ack("/ackFromClient", time.Second*5, Message{Id: 3, Channel: "client channel"}, 4)
if err != nil {
log.Println("sendJoin cb err:", err)
log.Println("[client] ack cb err:", err)
} else {
if len(result.([]interface{})) == 0 {
res := result.([]interface{})

if c.BinaryMessage() {
log.Println("[client] ack cb:", res)
return
}

if len(result.([]interface{})) == 0 {
return
}
var outArg1 int
var outArg2 string
var outArg2 Desc
var outArg3 string

err := json.Unmarshal(result.([]interface{})[0].([]byte), &outArg1)
err := json.Unmarshal(res[0].([]byte), &outArg1)
if err != nil {
log.Println("sendJoin cb err:", err)
log.Println("[client] ack cb err:", err)
return
}
err = json.Unmarshal(result.([]interface{})[1].([]byte), &outArg2)
log.Println("[client] ack cb outArg1:", outArg1)

err = json.Unmarshal(res[1].([]byte), &outArg2)
if err != nil {
log.Println("sendJoin cb err:", err)
log.Println("[client] ack cb err:", err)
return
}
log.Println("[client] ack cb outArg2:", outArg2.Text)

log.Println("sendJoin cb:", outArg1, outArg2)
err = json.Unmarshal(res[2].([]byte), &outArg3)
if err != nil {
log.Println("[client] ack cb err:", err)
return
}
log.Println("[client] ack cb outArg3:", outArg3)
}
}

func sendMsg(c *shadiaosocketio.Client, args ...interface{}) {
func sendMessage(c *shadiaosocketio.Client, args ...interface{}) {
err := c.Emit("message", args...)
if err != nil {
panic(err)
}
}

func createClient() {
func createClient() *shadiaosocketio.Client {
c, err := shadiaosocketio.Dial(
shadiaosocketio.GetUrl("localhost", 2233, false),
*websocket.GetDefaultWebsocketTransport())
if err != nil {
panic(err)
}

err = c.On("message", func(h *shadiaosocketio.Channel, args Message) {
log.Println("--- Got chat message: ", args)
_ = c.On(shadiaosocketio.OnConnection, func(h *shadiaosocketio.Channel) {
log.Println("[client] connected! id:", h.Id())
log.Println("[client]", h.LocalAddr().Network()+" "+h.LocalAddr().String()+
" --> "+h.RemoteAddr().Network()+" "+h.RemoteAddr().String())
})
if err != nil {
panic(err)
}

err = c.On("/admin", func(h *shadiaosocketio.Channel, args Message) {
log.Println("--- Got admin message: ", args)
_ = c.On(shadiaosocketio.OnDisconnection, func(h *shadiaosocketio.Channel, reason websocket.CloseError) {
log.Println("[client] disconnected, code:", reason.Code, "text:", reason.Text)
})
if err != nil {
panic(err)
}

err = c.On(shadiaosocketio.OnDisconnection, func(h *shadiaosocketio.Channel, reason websocket.CloseError) {
log.Println("Disconnected, code:", reason.Code, "text:", reason.Text)
_ = c.On("message", func(h *shadiaosocketio.Channel, args Message) {
log.Println("[client] got chat message:", args)
})
if err != nil {
panic(err)
}

err = c.On(shadiaosocketio.OnConnection, func(h *shadiaosocketio.Channel) {
log.Println("Connected!", h.Id())
_ = c.On("/admin", func(h *shadiaosocketio.Channel, args Message) {
log.Println("[client] got admin message:", args)
})

time.Sleep(1 * time.Second)

sendMsg(c, "cool", &Message{
Id: 99,
Text: "second arg",
// sending ack response
_ = c.On("/ackFromServer", func(h *shadiaosocketio.Channel, arg1 string, arg2 int) (Message, int) {
log.Println("[client] got ack from server:", arg1, arg2)
time.Sleep(3 * time.Second)
return Message{
Id: 5,
Channel: "client",
}, 6
})
sendJoin(c)
if err != nil {
panic(err)
}

return c
}

func main() {
createClient()
c := createClient()

time.Sleep(1 * time.Second)
sendMessage(c, "client", &Message{
Id: 1,
Channel: "client channel",
}, 2)

time.Sleep(1 * time.Second)
sendAck(c)

select {}
}
```

### Javascript client for caller server
### JavaScript Client For Caller Server

```javascript
const io = require("socket.io-client")
const socket = io("ws://127.0.0.1:2233",{transports: ['websocket']})

// listen for messages
socket.on('message', function(msg) {
console.log('received msg:', msg);
console.log('[client] received msg:', msg);
});

// sending ack response
socket.on('/ackFromServer', function(a, b, f) {
console.log('[client] received ack:', a , b);
f({ id: 5, channel: 'js channel' }, 6)
});

socket.on('/admin', function(msg) {
console.log('received admin msg:', msg);
console.log('[client] received admin msg:', msg);
});

socket.on('connect', function () {
console.log('socket connected');
console.log('[client] socket connected');

socket.emit('message', "1", { id: 2, text: "js" }, 3);
socket.emit('message', "js", { id: 1, channel: "js" }, 2);

// ack
socket.emit('/ackFromClient', { id: 3, channel: "js ack" }, 4, (a, b, c) => {
console.log('[client] ack cb:', a, b, c)
});
});

socket.on('disconnect', function (e) {
console.log('socket disconnect', e);
console.log('[client] socket disconnect', e);
});

socket.on('connect_error', function (e) {
console.log('connect_error', e)
console.log('[client] connect_error', e)
});
```

### JavaScript Server
````javascript
const { Server } = require("socket.io");
const customParser = require('socket.io-msgpack-parser');

const io = new Server(2233, {
// parser: customParser
});

io.on("connection", (socket) => {
// ...
socket.on('message', function(arg1, arg2, arg3) {
console.log('[server] received message:', arg1, arg2, arg3)
});

// listen ack event
socket.on('/ackFromClient', function(arg1, arg2, func) {
console.log('[server] received ack:', arg1, arg2)
func(1, { text: 'resp' }, "server")
});

socket.emit('message', { id: 2, channel: 'server channel'});

// emit ack event
socket.emit('/ackFromServer', "go", 3, function(arg1, arg2) {
console.log('[server] ack cb:', arg1, arg2)
});
});

console.log("[server] starting server...")
````

## License

MIT
9 changes: 5 additions & 4 deletions caller.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package shadiaosocketio

import (
"encoding/json"
"errors"
"github.com/Baiguoshuai1/shadiaosocketio/utils"
"reflect"
)

Expand All @@ -18,7 +18,8 @@ var (
ErrorCallerMaxFiveValues = errors.New("f maximum number of values is 5")
)

/**
/*
*
Parses function passed by using reflection, and stores its representation
for further call on message or ack
*/
Expand Down Expand Up @@ -67,12 +68,12 @@ func (c *caller) callFunc(h *Channel, argsType int, args ...interface{}) []refle

var marshal []byte
if argsType == 0 {
marshal, _ = json.Marshal(args[i])
marshal, _ = utils.Json.Marshal(args[i])
} else {
marshal = args[i].([]byte)
}

err := json.Unmarshal(marshal, &data)
err := utils.Json.Unmarshal(marshal, &data)
if err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit 5f4b5ea

Please sign in to comment.