Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Planner node #6

Merged
merged 13 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.vscode

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
128 changes: 128 additions & 0 deletions ff_path_planning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Freeflyer Path Planning

# Introduction

Please read both the Quickstart and Caveats.

## Quickstart

To run the path planning node
```bash
$ ros2 run ff_path_planning path_planning
```

To run the example client call
```bash
$ ros2 run ff_path_planning examples:service_call
```

## Caveats

* this node depends on [https://github.com/StanfordASL/pmpc](https://github.com/StanfordASL/pmpc) so that package must be correctly installed
* the `pmpc` installation can take up to 20 min!
* the `pmpc` depends on Julia, so:
* installation might take up 10 - 30 min
* first 1-2 runs (`ros2 run ...`) might take 1 - 5 min
* subsequent runs should run much faster
* *note*: the path planning module supports hot-reloading of dynamics and cost, no need to restart it

---

# Path Planning Service

This node defines the following service
```
string dynamics
string cost "default_cost"
float64[] x0
float64 t0
float64 dt 0.25
int64 horizon 20
int64 xdim -1
int64 udim -1
int64 max_it 20
string params_json
---
float64[] times
float64[] states
float64[] controls
float64[] feedback
```

---

# Defining Custom Dynamics and Costs

The path planning node **automatically discovers** new dynamics and costs and
supports hot-reloading of those when you edit the code. Start the path planning
node once and develop as much as you want!

Both the dynamics and cost function accept an optional `params` argument, a
JSON-serializable object (e.g., a dictionary of values), which can be sent with
every request via a JSON-string `params_json`.

## Dynamics

Place a custom Python file in `./ff_path_planning/dynamics`. The name of the file (minus `.py`) is the string with which you can reference your dynamics.

*Example: `./ff_path_planning/dynamics/example_dynamics.py` becomes `dynamics = example_dynamics`.*

The file
* must define a `f_fx_fu_fn(x, u, params=None)` function, which **batch** evaluates the
* `xp`, the next state, `shape = [N, xdim]`
* `fx`, the state linearization, `shape = [N, xdim, xdim]`
* `fu`, the control linearization, `shape = [N, xdim, udim]`
* *optionally* can define `xdim` or `XDIM` global variable
* *optionally* can define `udim` or `UDIM` global variable

Argument `params` refers to the optional problem parameters provided every service call is `params_json`.

```python
# ./ff_path_planning/dynamics/example_dynamics.py
XDIM, UDIM = 2, 1

def f_fx_fu_fn(x, u, params=None):
...
```

## Cost

Place a custom Python file in `./ff_path_planning/costs`. The name of the file (minus `.py`) is the string with which you can reference your cost.

*Example: `./ff_path_planning/costs/example_cost.py` becomes `cost = example_cost`.*

The file
* must define a `cost(N=None, xdim=None, udim=None, params=None)` function that defines cost parameters as per [`pmpc`](https://github.com/StanfordASL/pmpc) and returns a dictionary

```python
# ./ff_path_planning/costs/example_costs.py
def cost(N=None, xdim=None, udim=None, params=None):
...
return dict(...)
```

---

# Getting Started

## How to start developing?

Copy and modify `./ff_path_planning/examples/path_planning.py`

Take a look at `//freeflyer2/freeflyer/launch/sim_path_plnner.launch.py` which can be launched via

```bash
$ # launch path planning node separately to gain cleaner output
$ ros2 run ff_path_planning path_planning
$ # then, launch the simulation example
$ ros2 launch freeflyer sim_path_planner.launch.py
```

## Examples

* a full path planning example: `./ff_path_planning/examples/path_planning.py`
* run via `ros2 run ff_path_planning examples:path_planning`
* calling the path planning service: `./ff_path_planning/examples/service_call.py`
* run via `ros2 run ff_path_planning examples:service_call`
* setting a static goal for the path-following controller: `./ff_path_planning/examples/setting_goal.py`
* run via `ros2 run ff_path_planning examples:setting_goal`
4 changes: 4 additions & 0 deletions ff_path_planning/ff_path_planning/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .path_planning import main

if __name__ == "__main__":
main()
Empty file.
6 changes: 6 additions & 0 deletions ff_path_planning/ff_path_planning/costs/default_cost.py
rdyro marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pmpc import Problem

def cost(N: int, xdim: int, udim: int, params = None):
# let's just use the default cost builder from pmpc
p = Problem(N=N, xdim=xdim, udim=udim)
return dict(Q=p.Q, R=p.R, X_ref=p.X_ref, U_ref=p.U_ref)
15 changes: 15 additions & 0 deletions ff_path_planning/ff_path_planning/costs/my_cost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pmpc import Problem
import numpy as np

def cost(N: int, xdim: int, udim: int, params = None):
# let's just use the default cost builder from pmpc
p = Problem(N=N, xdim=xdim, udim=udim)
if "Q" in params:
p.Q = np.array(params["Q"])
if "R" in params:
p.R = np.array(params["R"])
if "X_ref" in params:
p.X_ref = np.array(params["X_ref"])
if "U_ref" in params:
p.U_ref = np.array(params["U_ref"])
return dict(Q=p.Q, R=p.R, X_ref=p.X_ref, U_ref=p.U_ref)
Empty file.
14 changes: 14 additions & 0 deletions ff_path_planning/ff_path_planning/dynamics/single_integrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import numpy as np

A, B = np.array([[1.0]]), np.array([[1.0]])
XDIM, UDIM = A.shape[-1], B.shape[-1]


def f_fx_fu_fn(x, u, params=None):
params = dict() if params is None else params
rdyro marked this conversation as resolved.
Show resolved Hide resolved
Bp = B * params["dt"] if "dt" in params else B
assert x.shape[-1] == 1 and u.shape[-1] == 1
xp = (A @ x[..., None])[..., 0] + (Bp @ u[..., None])[..., 0]
fx = np.tile(A, x.shape[:-1] + (1, 1))
fu = np.tile(Bp, x.shape[:-1] + (1, 1))
return xp, fx, fu
14 changes: 14 additions & 0 deletions ff_path_planning/ff_path_planning/dynamics/single_integrator_2D.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import numpy as np

A, B = np.eye(2), np.eye(2)
XDIM, UDIM = A.shape[-1], B.shape[-1]


def f_fx_fu_fn(x, u, params=None):
params = dict() if params is None else params
rdyro marked this conversation as resolved.
Show resolved Hide resolved
Bp = B * params["dt"] if "dt" in params else B
assert x.shape[-1] == XDIM and u.shape[-1] == UDIM
xp = (A @ x[..., None])[..., 0] + (Bp @ u[..., None])[..., 0]
fx = np.tile(A, x.shape[:-1] + (1, 1))
fu = np.tile(Bp, x.shape[:-1] + (1, 1))
return xp, fx, fu
Empty file.
165 changes: 165 additions & 0 deletions ff_path_planning/ff_path_planning/examples/path_planning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#!/usr/bin/env python3

import json

import rclpy
from rclpy.time import Time, Duration
from rclpy.node import Node
from ff_srvs.srv import PathPlan
from ff_msgs.msg import FreeFlyerStateStamped, FreeFlyerState, Twist2D, Pose2D as FF_Pose2D
from geometry_msgs.msg import PoseStamped
from nav_msgs.msg import Path

import numpy as np
from scipy.interpolate import interp1d

####################################################################################################


def to_ros_array(x: np.ndarray):
return x.reshape(-1).tolist()


####################################################################################################


class NavigationNode(Node):
def __init__(self):
"""A path planning service node based on the `pmpc` library."""
super().__init__("simple_goal")
self.ctrl_state_pub = self.create_publisher(FreeFlyerStateStamped, "ctrl/state", 10)
self.state_sub = self.create_subscription(
FreeFlyerStateStamped, "gt/state", self.state_cb, 10
)
self.state, self.goal_pose = None, None
self.path_plan_client = self.create_client(PathPlan, "/path_planning")
while not self.path_plan_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("service not available, waiting again...")

self.marker_pub = self.create_publisher(Path, "path_marker", 10)

self.computing_plan, self.plan = False, None
self.plan_futures = []
self.create_timer(1.0, self.recompute_plan_cb)
self.create_timer(1.0, self.publish_goal_cb)

self.get_logger().info("Navigation node is up")

pub = self.create_publisher(FreeFlyerStateStamped, "state_init", 10)
msg = FreeFlyerStateStamped()
msg.header.stamp = self.get_clock().now().to_msg()
msg.state = FreeFlyerState(twist=Twist2D(), pose=FF_Pose2D())
self.get_logger().info("Publishing initial state")
pub.publish(msg)
self.create_timer(1e-2, self.loop)

def show_path(self) -> None:
"""Publish the path as a path marker."""
if self.plan is None:
return
X = self.plan["X"]
marker = Path()
marker.header.frame_id = "map"
marker.header.stamp = self.get_clock().now().to_msg()
marker.poses = []
t0 = self.plan["t"][0]
for i in range(X.shape[0]):
pose = PoseStamped()
pose.pose.position.x, pose.pose.position.y, pose.pose.position.z = X[i, 0], X[i, 1], 0.5
pose.header.frame_id = marker.header.frame_id

time_now = Time.from_msg(marker.header.stamp)
dt = Duration(seconds=self.plan["t"][i] - t0)
pose.header.stamp = (time_now + dt).to_msg()

marker.poses.append(pose)
self.marker_pub.publish(marker)

def loop(self) -> None:
"""Pull plans if they are available."""

# attempt to parse returned planning service calls
last_future, incomplete_futures = None, []
for i in range(len(self.plan_futures)):
last_future = self.plan_futures[i]
if not last_future.done():
incomplete_futures.append(last_future)
self.plan_futures = incomplete_futures
if last_future is None:
return

# check if the service call is not None
result = last_future.result()
if result is None:
return

# parse the plan from the service call
ts = np.array(result.times)
N = len(ts) - 1
X = np.array(result.states).reshape((N + 1, -1))
U = np.array(result.controls).reshape((N, -1))
L = np.array(result.feedback).reshape((N, X.shape[-1], U.shape[-1]))
self.plan = dict(t=ts, X=X, U=U, L=L)
self.show_path()

def make_goal_pose(self, x: float, y: float) -> FreeFlyerStateStamped:
"""Create a goal pose from the current state, with the given x and y coordinates."""
th = self.state.state.pose.theta
goal_pose = FreeFlyerStateStamped()
goal_pose.header.stamp = self.get_clock().now().to_msg()
goal_pose.state = FreeFlyerState(pose=FF_Pose2D(x=x, y=y, theta=th), twist=Twist2D())
return goal_pose

def state_cb(self, state) -> None:
"""Read the state as it becomes available."""
self.state = state

def get_current_time(self) -> float:
secs, nsecs = self.get_clock().now().seconds_nanoseconds()
return secs + nsecs / 1e9

def publish_goal_cb(self) -> None:
"""Publish the goal pose based on the interpolation of the plan."""
if self.plan is None:
self.get_logger().info("No plan available yet.")
return
current_time = self.get_current_time()
if current_time < self.plan["t"][0] or current_time > self.plan["t"][-1]:
self.get_logger().info("Current time outside of the plan time horizon.")
current_time = np.clip(current_time, self.plan["t"][0], self.plan["t"][-1])
x_goal = interp1d(self.plan["t"], self.plan["X"], axis=0)(current_time)
goal_pose = self.make_goal_pose(x_goal[0], x_goal[1])
self.ctrl_state_pub.publish(goal_pose)
self.get_logger().info("Published goal pose")

def recompute_plan_cb(self) -> None:
request = PathPlan.Request(horizon=20)

request.dynamics = "single_integrator_2D"
request.t0 = self.get_current_time()
x, y = self.state.state.pose.x, self.state.state.pose.y
request.x0 = to_ros_array(np.array([x, y]))

# we'll provide extra arguments to the cost function, which accepts params encoded as a JSON
request.cost = "my_cost"
X_ref = np.tile(np.array([2.5, 2]), (request.horizon, 1)).tolist()
R = np.tile(np.eye(2) * 1e2, (request.horizon, 1, 1)).tolist()
request.params_json = json.dumps(dict(X_ref=X_ref, R=R))

# send a request to the path planner
self.plan_futures.append(self.path_plan_client.call_async(request))


####################################################################################################


def main():
# spin the node up
rclpy.init()
navigation_node = NavigationNode()
rclpy.spin(navigation_node)
rclpy.shutdown()


if __name__ == "__main__":
main()
Loading