Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mynameborat committed Aug 29, 2023
1 parent d8640ab commit 85b2eed
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -865,9 +865,13 @@ <h1>Samza Configuration Reference</h1>

<tr>
<td class="property" id="task-callback-watermark-timeout-ms">task.callback.watermark.timeout.ms</td>
<td class="default">task.callback.timeout.ms</td>
<td class="default">task.callback.watermark.timeout.ms</td>
<td class="description">
It defines the upper bound on the time taken by the task to process a watermark message. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container. Default is <i>task.callback.timeout.ms</i>.
It defines the upper bound on the time taken by the task to process a watermark message.
When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.
Default is <i>task.callback.timeout.ms</i>. <b>Note:</b> In event of draining state, it is recommended
to keep the <i>task.callback.drain.timeout.ms</i> to be same as <i>task.callback.watermark.timeout.ms</i>
in order to not terminate drain prematurely due to higher latency for watermark processing.
</td>
</tr>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ These are the basic properties for setting up a Samza application.
|job.systemstreampartition.<br>input.expansion.enabled|true|When enabled, this allows stateful jobs to expand or contract their partition count by a multiple of the previous count so that events from an input stream partition are processed on the same task as before. This will prevent erroneous results. This feature is disabled if the configuration is set to false or if the job is stateless. See [SEP-5](https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+Enable+partition+expansion+of+input+streams) for more details.|
|job.security.manager.<br>factory|(none)|This is the factory class used to create the proper SecurityManager to handle security for Samza containers when running in a secure environment, such as Yarn with Kerberos eanbled. Samza ships with one security manager by default:<br><br>`org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory`<br>Supports Samza containers to run properly in a Kerberos enabled Yarn cluster. Each Samza container, once started, will create a SamzaContainerSecurityManager. SamzaContainerSecurityManager runs on its separate thread and update user's delegation tokens at the interval specified by yarn.token.renewal.interval.seconds. See Yarn Security for details.|
|task.callback.timeout.ms|-1(no timeout)|For an AsyncStreamTask, this defines the max allowed time for a processAsync callback to complete. For a StreamTask, this is the max allowed time for a process call to complete. When the timeout happens,the container is shutdown. Default is no timeout.|
|task.callback.watermark.timeout.ms|task.callback.timeout.ms|It defines the upper bound on the time taken by the task to process a watermark message. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.|
|task.callback.watermark.timeout.ms|task.callback.timeout.ms|It defines the upper bound on the time taken by the task to process a watermark message. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container. In event of draining, it is recommended to keep `task.callback.drain.timeout.ms` to be same as `task.callback.watermark.timeout.ms` in order to prevent drain from terminating prematurely due to higher latency for watermark processing.|
|task.chooser.class|`org.apache.samza.`<br>`system.chooser.`<br>`RoundRobinChooserFactory`|This property can be optionally set to override the default [message chooser](../container/streams.html#messagechooser), which determines the order in which messages from multiple input streams are processed. The value of this property is the fully-qualified name of a Java class that implements [MessageChooserFactory](../api/javadocs/org/apache/samza/system/chooser/MessageChooserFactory.html).|
|task.command.class|`org.apache.samza.job.`<br>`ShellCommandBuilder`|The fully-qualified name of the Java class which determines the command line and environment variables for a [container](../container/samza-container.html). It must be a subclass of [CommandBuilder](../api/javadocs/org/apache/samza/job/CommandBuilder.html). This defaults to task.command.class=`org.apache.samza.job.ShellCommandBuilder`.|
|task.drop.deserialization.errors|false|This property is to define how the system deals with deserialization failure situation. If set to true, the system will skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public String getAppRunnerClass() {
public ApplicationApiType getAppApiType() {
return ApplicationApiType.valueOf(get(APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name()).toUpperCase());
}

public boolean isHighLevelApiJob() {
return getAppApiType() == ApplicationApiType.HIGH_LEVEL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,9 @@ public TaskCallback createCallback() {
/*
* Timeout used in the task callback. The value is determined based on the following logic
* 1. If run loop is in draining mode and the envelope is drain, use drainCallbackTimeoutMs
* 2. If the envelope is watermark, use watermarkCallbackTimeoutMs regardless of the modes
* 2. If the envelope is watermark, use watermarkCallbackTimeoutMs regardless of the modes. Setting a lower
* watermark callback timeout during draining mode can cause drain to be unsuccessful prematurely and
* vice-versa.
* 3. Use callbackTimeoutMs otherwise
*/
long timeout = messageCallbackTimeoutMs;
Expand Down

0 comments on commit 85b2eed

Please sign in to comment.