Skip to content

Commit

Permalink
Add connect message validation (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
Qizot committed Aug 8, 2023
1 parent 7426ea3 commit e742ebe
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The package can be installed by adding `membrane_rtmp_plugin` to your list of de
```elixir
def deps do
[
{:membrane_rtmp_plugin, "~> 0.14.1"}
{:membrane_rtmp_plugin, "~> 0.15.0"}
]
end
```
Expand Down
44 changes: 26 additions & 18 deletions lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,37 @@ defmodule Membrane.RTMP.MessageHandler do
{:cont, %{state | message_parser: parser}}
end

defp do_handle_client_message(%Messages.Connect{}, _header, state) do
chunk_size = state.message_parser.chunk_size
@validation_stage :connect
defp do_handle_client_message(%Messages.Connect{} = msg, _header, state) do
case MessageValidator.validate_connect(state.validator, msg) do
{:ok, _msg} = result ->
chunk_size = state.message_parser.chunk_size

[
%Messages.WindowAcknowledgement{size: @windows_acknowledgment_size},
%Messages.SetPeerBandwidth{size: @peer_bandwidth_size},
# stream begin type
%Messages.UserControl{event_type: 0x00, data: <<0, 0, 0, 0>>},
# by default the ffmpeg server uses 128 chunk size
%Messages.SetChunkSize{chunk_size: chunk_size}
]
|> Enum.each(&send_rtmp_payload(&1, state.socket, chunk_size))
[
%Messages.WindowAcknowledgement{size: @windows_acknowledgment_size},
%Messages.SetPeerBandwidth{size: @peer_bandwidth_size},
# stream begin type
%Messages.UserControl{event_type: 0x00, data: <<0, 0, 0, 0>>},
# by default the ffmpeg server uses 128 chunk size
%Messages.SetChunkSize{chunk_size: chunk_size}
]
|> Enum.each(&send_rtmp_payload(&1, state.socket, chunk_size))

{[tx_id], message_parser} = MessageParser.generate_tx_ids(state.message_parser, 1)
{[tx_id], message_parser} = MessageParser.generate_tx_ids(state.message_parser, 1)

tx_id
|> Responses.connection_success()
|> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3)
tx_id
|> Responses.connection_success()
|> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3)

Responses.on_bw_done()
|> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3)

Responses.on_bw_done()
|> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3)
{:cont,
validation_action(%{state | message_parser: message_parser}, @validation_stage, result)}

{:cont, %{state | message_parser: message_parser}}
{:error, _reason} = error ->
{:halt, {:error, :stream_validation, validation_action(state, @validation_stage, error)}}
end
end

# According to ffmpeg's documentation, this command should make the server release channel for a media stream
Expand Down
6 changes: 6 additions & 0 deletions lib/membrane_rtmp_plugin/rtmp/source/message_validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ defprotocol Membrane.RTMP.MessageValidator do

@type validation_result_t :: {:ok, term()} | {:error, reason :: any()}

@doc """
Validates the `t:Membrane.RTMP.Messages.Connect.t/0` message.
"""
@spec validate_connect(t(), Messages.Connect.t()) :: validation_result_t()
def validate_connect(impl, message)

@doc """
Validates the `t:Membrane.RTMP.Messages.ReleaseStream.t/0` message.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ defmodule Membrane.RTMP.MessageValidator.Default do
end

defimpl Membrane.RTMP.MessageValidator, for: Membrane.RTMP.MessageValidator.Default do
@impl true
def validate_connect(_impl, _message), do: {:ok, "connect success"}

@impl true
def validate_release_stream(_impl, _message), do: {:ok, "release stream success"}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
defmodule Membrane.RTMP.Messages.Connect do
@moduledoc false
@moduledoc """
Defines the RTMP `connect` command.
"""

@behaviour Membrane.RTMP.Message

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.RTMP.Mixfile do
use Mix.Project

@version "0.14.1"
@version "0.15.0"
@github_url "https://github.com/membraneframework/membrane_rtmp_plugin"

def project do
Expand Down
3 changes: 3 additions & 0 deletions test/support/test_validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ end
defimpl Membrane.RTMP.MessageValidator, for: Support.TestValidator do
alias Membrane.RTMP.Messages

@impl true
def validate_connect(_impl, _message), do: {:ok, "connect success"}

@impl true
def validate_publish(%Support.TestValidator{stream_key: nil}, _message),
do: {:ok, "stream allowed"}
Expand Down

0 comments on commit e742ebe

Please sign in to comment.