Defining watchers and checkers examples
Example: watcher modifier that creates intervals that record maximal speed
OSC2 code: watcher modifier that creates intervals that record maximal speed
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
# Define custom watcher data that records the maximum speed during
# an interval
struct speed_above_data inherits any_watcher_data:
max_speed: speed
record(max_speed, unit: kph)
# Define a watcher type that create intervals
# when the speed of a vehicle exceeds some speed limit,
# and records the maximal exceeding speed
watcher modifier speed_above(speed_above_data):
v: vehicle
limit: speed
on @top.w_clk:
if(data == null):
if(v.state.speed > limit):
start_interval()
else:
if(v.state.speed <= limit):
end_interval()
on @i_clock:
if(data.max_speed < v.state.speed):
data.max_speed = v.state.speed
global modifier vehicle.speed_watchers:
# Instantiate the watcher for all the vehicle actors
watcher speed_above_w is speed_above(actor, 30kph)
extend top.main:
on @sut.car.speed_watchers.speed_above_w.i_clock:
logger.log_info("$(sut.car.state.speed)")
on @sut.car.speed_watchers.speed_above_w.i_end:
logger.log_info("MAX: $(sut.car.speed_watchers.speed_above_w.data.max_speed)")
do serial:
sut.car.drive(duration: [5sec..10sec]) with:
speed([20kph..25kph], at: start)
speed([35kph..40kph], at: end)
sut.car.drive(duration: [5sec..10sec]) with:
speed([20kph..25kph], at: end)
Example: watcher declaration with custom data type
watcher declaration with custom data type
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
# Watcher type that uses default data type (any_watcher_data)
watcher modifier speed_above:
v: vehicle
limit: speed
on @top.w_clk:
if(data == null):
if(v.state.speed > limit):
start_interval()
else:
if(v.state.speed <= limit):
end_interval()
# Define custom watcher data
struct speed_above_data inherits any_watcher_data:
max_speed: speed
global modifier vehicle.speed_watchers:
# Assign custom data type at declaration
watcher speed_above_w(speed_above_data) is speed_above(v: actor, limit: 30kph)
# Add additional sampling logic for the custom data attributes
on @speed_above_w.i_clock:
if (speed_above_w.data.max_speed < actor.state.speed):
speed_above_w.data.max_speed = actor.state.speed
scenario sut.increase_then_decrease_speed:
do serial:
sut.car.drive(duration: [5sec..10sec]) with:
speed([20kph..25kph], at: start)
speed([35kph..40kph], at: end)
sut.car.drive(duration: [5sec..10sec]) with:
speed([20kph..25kph], at: end)
extend top.main:
do sut.increase_then_decrease_speed()
Example: using the passive_w watcher operator
using the passive_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M75_FTX_highway_long_single_road.xodr"
scenario sut.drive_side_by_side:
npc: npc_vehicle
do parallel():
sut.car.drive()
npc.drive() with:
lane(side_of: sut.car)
position(distance: [15..20]m, behind: sut.car, at: start)
position(distance: [15..20]m, ahead_of: sut.car, at: end)
struct lon_overlap_data inherits any_watcher_data:
var min_lat_dist: length = 1km
cover(min_lat_dist, unit: m, range: [0..5], every: 1)
extend sut.drive_side_by_side:
watcher lon_overlap(lon_overlap_data) is passive_w()
on @top.w_clk:
var lon_dist := sut.car.road_distance(npc, closest, closest, lon, lane)
if lon_dist == 0m and lon_overlap.data == null:
lon_overlap.start_interval()
if lon_overlap.data != null:
var abs_lat_dist := math.abs(sut.car.road_distance(npc, closest, closest, lat, lane))
if abs_lat_dist < lon_overlap.data.min_lat_dist:
lon_overlap.data.min_lat_dist = abs_lat_dist
if lon_dist != 0m:
lon_overlap.end_interval()
extend top.main:
do drive_side_by_side: sut.drive_side_by_side()
Example: using the while_w watcher operator
using the while_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set implicits_kind = none
set test_drain_time = 0s
global modifier vehicle.state_watchers:
watcher speed_above_w is while_w(actor.state.speed > 30kph)
extend top.main:
do serial:
sut.car.drive(duration: 5second) with:
speed(25kph, at: start)
speed(35kph, at: end)
sut.car.drive(duration: 5second) with:
speed(25kph, at: end)
sut.car.drive(duration: 10second) with:
keep_speed()
Example: using the not_w watcher operator
using the not_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set implicits_kind = none
set test_drain_time = 0s
global modifier vehicle.state_watchers:
watcher speed_above_w is while_w(actor.state.speed > 30kph)
watcher speed_below_w is not_w(speed_above_w)
extend top.main:
do serial:
sut.car.drive(duration: 5second) with:
speed(25kph, at: start)
speed(35kph, at: end)
sut.car.drive(duration: 5second) with:
speed(25kph, at: end)
sut.car.drive(duration: 10second) with:
keep_speed()
Example: using the and_w watcher operator
using the and_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set implicits_kind = none
set test_drain_time = 0s
global modifier vehicle.state_watchers:
watcher speed_above_w is while_w(actor.state.speed > 30kph)
watcher driving_close_to_left_lane_boundary_w is while_w(actor.state.msp_pos.lane_position.lat_offset > 20cm)
watcher speed_above_and_close_to_left_w is and_w(speed_above_w, driving_close_to_left_lane_boundary_w)
extend top.main:
do parallel(overlap: equal):
sut_speed_behavior: serial:
sut.car.drive(duration: 6second) with:
speed(25kph, at: start)
speed(35kph, at: end)
sut.car.drive(duration: 6second) with:
speed(25kph, at: end)
sut.car.drive(duration: 6second) with:
speed(35kph, at: end)
sut_lateral_movement: serial:
sut.car.drive(duration: 9second) with:
lateral(0m, line: left, at: start)
lateral(0m, line: right, at: end)
sut.car.drive(duration: 9second) with:
lateral(0m, line: left, at: end)
Example: using the or_w watcher operator
using the or_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set implicits_kind = none
set test_drain_time = 0s
global modifier vehicle.state_watchers:
watcher driving_close_to_left_lane_boundary_w is while_w(actor.state.msp_pos.lane_position.lat_offset > 20cm)
watcher driving_close_to_right_lane_boundary_w is while_w(actor.state.msp_pos.lane_position.lat_offset < -20cm)
watcher driving_close_to_lane_edge_w is or_w(driving_close_to_left_lane_boundary_w, driving_close_to_right_lane_boundary_w)
extend top.main:
do parallel(overlap: equal):
sut_speed_behavior: serial:
sut.car.drive() with:
speed(30kph)
sut_lateral_movement: serial:
sut.car.drive(duration: 6second) with:
lateral(0m, line: left, at: start)
lateral(0m, line: right, at: end)
sut.car.drive(duration: 6second) with:
lateral(0m, line: left, at: end)
sut.car.drive(duration: 6second) with:
lateral(0m, line: right, at: end)
Example: using the between_w watcher operator
using the between_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set test_drain_time=0second
set min_test_time=0second
global modifier vehicle.vehicle_watchers:
# Emit an interval between switch lane start and switch lane end
watcher switch_lane_w is between_w(
x: @actor.switch_lane_start, y: @actor.switch_lane_end)
# Emit zero-time intervals for switch lane start & end
watcher upon_switch_lane_start is upon_w(ev: @actor.switch_lane_start)
watcher upon_switch_lane_end is upon_w(ev: @actor.switch_lane_end)
scenario sut.switch_lanes:
do serial:
# Start driving at lane 1
sut.car.drive() with:
lane(1, at: start)
duration(3s, run_mode: best_effort)
speed(50kph, at: start)
# Swicth to lane 2
sut.car.drive() with:
lane(2, at: end)
duration([2..5]s, run_mode: best_effort)
# Switch to lane 1
sut.car.drive() with:
lane(1, at: end)
duration([2..5]s, run_mode: best_effort)
# Continue driving on lane 1
sut.car.drive() with:
keep_lane()
duration([2..5]s, run_mode: best_effort)
extend top.main:
do sut.switch_lanes()
Example: using the between_w watcher operator with custom data
using the between_w watcher operator with custom data
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set test_drain_time=0second
set min_test_time=0second
enum switch_lane_ending: [ended, aborted]
struct switch_lane_watcher_data inherits any_watcher_data:
ending_type: switch_lane_ending
record(ending_type)
global modifier vehicle.vehicle_watchers:
# Emit the same event, but with different data, for switch lane end
# and switch lane abort
event switch_lane_end_or_abort(ending_type: switch_lane_ending)
on @actor.switch_lane_end:
emit switch_lane_end_or_abort(ended)
on @actor.switch_lane_abort:
emit switch_lane_end_or_abort(aborted)
# Emit an interval between switch lane start and switch lane end or abort
watcher switch_lane_w(switch_lane_watcher_data) is between_w(
x: @actor.switch_lane_start, y: @switch_lane_end_or_abort) with:
it.data.ending_type = switch_lane_end_or_abort.event_data().ending_type
on @switch_lane_w.i_end:
call logger.log_info("Switch lane interval ended. Ending type: $(switch_lane_w.data.ending_type)")
scenario sut.switch_lanes:
do serial:
# Start driving at lane 1
sut.car.drive() with:
lane(1, at: start)
duration(3s, run_mode: best_effort)
speed(50kph, at: start)
# Swicth to lane 2
sut.car.drive() with:
lane(2, at: end)
duration([2..5]s, run_mode: best_effort)
# Switch to lane 1
sut.car.drive() with:
lane(1, at: end)
duration([2..5]s, run_mode: best_effort)
# Continue driving on lane 1
sut.car.drive() with:
keep_lane()
duration([2..5]s, run_mode: best_effort)
extend top.main:
do sut.switch_lanes()
Example: between_w with back-to-back events
between_w with back-to-back events
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set test_drain_time=0second
set min_test_time=0second
global modifier vehicle.function_watchers:
# Emitted in the simulation step when the turn signal is turned on
event turn_signal_on
# Emitted in the simulation step when the ADAS function enters 'engaged' state
event function_engaged
watcher function_w is between_w(
x: @turn_signal_on, y: @function_engaged)
watcher upon_turn_signal_on is upon_w(ev: @turn_signal_on)
watcher upon_function_engaged is upon_w(ev: @function_engaged)
scenario sut.function_mock:
do parallel(overlap: equal):
ego_behavior: sut.car.drive()
function_behavior: serial:
# Emit both events at the same time
wait elapsed(1s)
emit sut.car.function_watchers.turn_signal_on
emit sut.car.function_watchers.function_engaged
wait elapsed(1s)
# Emit turn_signal_on, then turn_signal_on and function_engaged
# at the same time, then function_engaged
emit sut.car.function_watchers.turn_signal_on
wait elapsed(1s)
emit sut.car.function_watchers.turn_signal_on
emit sut.car.function_watchers.function_engaged
wait elapsed(1s)
emit sut.car.function_watchers.function_engaged
wait elapsed(1s)
extend top.main:
do sut.function_mock()
Example: using the above_w watcher operator
using the above_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set implicits_kind = none
set test_drain_time = 0s
global modifier vehicle.state_watchers:
watcher speed_above_w is above_w(sample_type: speed, sample_expression: actor.state.speed, threshold: 30kph, tolerance: 2kph)
extend top.main:
do serial:
sut.car.drive(duration: 5second) with:
speed(25kph, at: start)
speed(35kph, at: end)
sut.car.drive(duration: 5second) with:
speed(25kph, at: end)
sut.car.drive(duration: 10second) with:
keep_speed()
Example: using the below_w watcher operator
using the below_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set implicits_kind = none
set test_drain_time = 0s
global modifier vehicle.state_watchers:
watcher speed_below_w is below_w(sample_type: speed, sample_expression: actor.state.speed, threshold: 30kph, tolerance: 3kph)
extend top.main:
do serial:
sut.car.drive(duration: 5second) with:
speed(35kph, at: start)
speed(25kph, at: end)
sut.car.drive(duration: 5second) with:
speed(35kph, at: end)
sut.car.drive(duration: 10second) with:
keep_speed()
Example: using the upon_w watcher operator
using the upon_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set test_drain_time=0second
set min_test_time=0second
global modifier vehicle.vehicle_watchers:
watcher switch_lane_start_w is upon_w(ev: @actor.switch_lane_start)
scenario sut.switch_lanes:
do serial:
# Start driving at lane 1
sut.car.drive() with:
lane(1, at: start)
duration(3s, run_mode: best_effort)
# Swicth to lane 2
sut.car.drive() with:
lane(2, at: end)
duration([2..5]s, run_mode: best_effort)
# Switch to lane 1
sut.car.drive() with:
lane(1, at: end)
duration([2..5]s, run_mode: best_effort)
extend top.main:
do sut.switch_lanes()
Example: using the upon_w watcher operator with custom data
using the upon_w watcher operator with custom data
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set test_drain_time=0second
set min_test_time=0second
struct switch_lane_watcher_data inherits any_watcher_data:
event_data: switch_lane_data
global modifier vehicle.vehicle_watchers:
watcher switch_lane_start_w(switch_lane_watcher_data) is upon_w(ev: @actor.switch_lane_start) with:
it.data.event_data = actor.switch_lane_start.event_data().data
on @switch_lane_start_w.i_end:
logger.log_info("$(actor) started switching to lane: " + \
"$(switch_lane_start_w.data.event_data.to_lane_pos.lane)")
scenario sut.switch_lanes:
do serial:
# Start driving at lane 1
sut.car.drive() with:
lane(1, at: start)
duration(3s, run_mode: best_effort)
# Swicth to lane 2
sut.car.drive() with:
lane(2, at: end)
duration([2..5]s, run_mode: best_effort)
# Switch to lane 1
sut.car.drive() with:
lane(1, at: end)
duration([2..5]s, run_mode: best_effort)
extend top.main:
do sut.switch_lanes()
Example: checker that creates issues when speed limit exceeded
OSC2 code: checker that creates issues when speed limit exceeded
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
extend issue_kind: [too_fast] #define a new issue kind
# Define a watcher type that create intervals
# when the speed of a vehicle exceeds some speed limit
watcher modifier speed_above:
v: vehicle
limit: speed
var max_speed: speed
on @top.w_clk:
if(data == null):
if(v.state.speed > limit):
start_interval()
else:
if(v.state.speed <= limit):
end_interval()
on @i_clock:
if(max_speed < v.state.speed):
max_speed = v.state.speed
global modifier sut_vehicle.speed_checkers:
checker check_speed_above is speed_above(actor, 30kph) with:
it.sut_error(kind: too_fast,
details: "Vehicle $(it.v) exceeded maximal speed $(it.limit) reaching up to $(it.max_speed)")
extend top.main:
do serial:
sut.car.drive(duration: [5sec..10sec]) with:
speed([20kph..25kph], at: start)
speed([35kph..40kph], at: end)
sut.car.drive(duration: [5sec..10sec]) with:
speed([20kph..25kph], at: end)
Example: using set_issue to modify checker issue severity
OSC2 code: using set_issue to modify checker issue severity
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"
extend issue_kind: [too_fast] #define a new issue kind
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
# Define a watcher type that create intervals
# when the speed of a vehicle exceeds some speed limit
watcher modifier speed_above:
v: vehicle
limit: speed
var max_speed: speed
on @top.w_clk:
if(data == null):
if(v.state.speed > limit):
start_interval()
else:
if(v.state.speed <= limit):
end_interval()
on @i_clock:
if(max_speed < v.state.speed):
max_speed = v.state.speed
global modifier vehicle.speed_checkers:
checker check_speed_above is speed_above(actor, 30kph) with:
it.sut_error(kind: too_fast,
details: "Vehicle $(it.v) exceeded maximal speed $(it.limit) reaching up to $(it.max_speed)")
extend top.main:
do serial:
sut.car.drive(duration: [5sec..10sec]) with:
speed([20kph..25kph], at: start)
speed([35kph..40kph], at: end)
sut.car.drive(duration: [5sec..10sec]) with:
speed([20kph..25kph], at: end)
extend top.main:
set_issue(target_checker: sut.car.speed_checkers.check_speed_above, severity: info)
Example: using set_issue to modify checker issue severity and category
OSC2 code: using set_issue to modify checker issue severity and category
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set implicits_kind = none
set test_drain_time = 0s
struct speed_above_data inherits any_watcher_data:
max_speed: speed
watcher modifier speed_above(speed_above_data):
v: vehicle
limit: speed
on @top.w_clk:
if(data == null):
if(v.state.speed > limit):
start_interval()
else:
if(v.state.speed <= limit):
end_interval()
on @i_clock:
if (data.max_speed < v.state.speed):
data.max_speed = v.state.speed
extend issue_kind: [too_fast]
global modifier vehicle.speed_checkers:
checker check_speed_above is speed_above(actor, 30kph) with:
it.issue(kind: too_fast, severity: error_continue,
category: other,
details: "Vehicle $(it.v) exceeded maximal speed $(it.limit) " + \
"reaching up to $(it.data.max_speed)")
extend sut_vehicle.speed_checkers:
set_issue(target_checker: speed_checkers.check_speed_above, severity: error, category: sut)
extend top.main:
npc: vehicle
do parallel(overlap: equal):
sut_behavior: serial:
sut.car.drive(duration: 10second) with:
speed(25kph)
sut.car.drive(duration: 5second) with:
speed(35kph, at: end)
sut.car.drive(duration: 5second) with:
speed(25kph, at: end)
sut.car.drive(duration: 10second) with:
keep_speed()
npc_behavior: serial:
npc.drive(duration: 5second) with:
speed(25kph, at: start)
speed(35kph, at: end)
npc.drive(duration: 5second) with:
speed(25kph, at: end)
npc.drive() with:
keep_speed()
Example: using the condition parameter of set_issue
OSC2 code: using the condition parameter of set_issue
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
set implicits_kind = none
set test_drain_time = 0s
extend issue_kind: [speed_above]
global modifier vehicle.state_checkers:
checker check_speed_above is above_w(
sample_type: speed, sample_expression: actor.state.speed, threshold: 30kph, tolerance: 2kph)
on @check_speed_above.i_end:
check_speed_above.other_error(kind: speed_above,
details: "Speed of actor $(actor) was above 30kph")
set_issue(target_checker: check_speed_above, condition: actor.is_dut, severity: warning)
extend top.main:
do serial:
sut.car.drive(duration: 5second) with:
speed(25kph, at: start)
speed(35kph, at: end)
sut.car.drive(duration: 5second) with:
speed(25kph, at: end)
sut.car.drive(duration: 10second) with:
keep_speed()
Example: defining a checker for longitudinal acceleration while LSS is engaged
OSC2 code: Defining a checker for longitudinal acceleration while LSS is engaged
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"
extend test_config:
set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
extend sut_vehicle:
# Mock LSS interface
var lss_is_engaged_flag := false
def lss_is_engaged() -> bool is expression lss_is_engaged_flag
var lss_is_warning_flag := false
def lss_is_warning() -> bool is expression lss_is_warning_flag
extend issue_kind: [lss_lon_acceleration]
struct inhibition_on_longitudinal_acceleration_data inherits any_watcher_data:
max_acceleration: acceleration
modifier sut_vehicle.lss_lon_acceleration:
LSS_MAX_LON_ACC: acceleration with: keep(default it == 2mpsps)
@required
lss_is_engaged_or_warning: any_watcher_behaviour
issue_severity: issue_severity with:
keep(default it == error)
watcher lon_acceleration_above_threshold is while_w(
actor.state != null and actor.state.local_acceleration.lon > LSS_MAX_LON_ACC)
checker check_lss_lon_acceleration(inhibition_on_longitudinal_acceleration_data) is and_w(
lss_is_engaged_or_warning, lon_acceleration_above_threshold) with:
it.sut_issue(
kind: lss_lon_acceleration, severity: issue_severity,
details: "Longitudinal acceleration of SUT vehicle ($(actor)) was higher than maximum " + \
"allowed value while LSS is engaged ($(LSS_MAX_LON_ACC)). Maximum recorded " + \
"value of longitudinal acceleration: $(it.data.max_acceleration)")
# Sample maximum acceleration during the interval
on @check_lss_lon_acceleration.i_clock:
if (check_lss_lon_acceleration.data.max_acceleration < actor.state.local_acceleration.lon):
check_lss_lon_acceleration.data.max_acceleration = actor.state.local_acceleration.lon
global modifier sut_vehicle.lss_checks:
watcher lss_is_engaged is while_w(actor.lss_is_engaged())
watcher lss_is_warning is while_w(actor.lss_is_warning())
watcher lss_is_engaged_or_warning is or_w(lss_is_engaged, lss_is_warning)
lss_lon_acceleration: lss_lon_acceleration(lss_is_engaged_or_warning: lss_is_engaged_or_warning)
scenario sut.three_change_lanes:
do serial:
first_change_lane: sut.car.drive() with:
speed([30..40]kph, at: start)
duration([4..5]s)
change_lane()
first_drive_straight: sut.car.drive() with:
duration([3..5]s)
second_change_lane: sut.car.drive() with:
duration([4..5]s)
change_lane()
second_drive_straight: sut.car.drive() with:
keep_lane()
duration([3..5]s)
third_change_lane: sut.car.drive() with:
duration([4..5]s)
change_lane()
extend top.main:
do a: sut.three_change_lanes()
# Mock LSS behavior: whenever SUT is changing lanes,
# sut.car.lss_is_engaged() or sut.car.lss_is_warning() or return true
var first_change_lane := false
on @sut.car.switch_lane_start:
if (first_change_lane == false):
sut.car.lss_is_warning_flag = true
first_change_lane = true
else:
sut.car.lss_is_engaged_flag = true
on @sut.car.switch_lane_end:
sut.car.lss_is_engaged_flag = false
sut.car.lss_is_warning_flag = false
# Set the longitudinal acceleration threshold to a small value
keep(sut.car.lss_checks.lss_lon_acceleration.LSS_MAX_LON_ACC == 0.1mpsps)
# Change severity of checker issues to error_continue
set_issue(target_checker: sut.car.lss_checks.lss_lon_acceleration.check_lss_lon_acceleration, severity: error_continue)
Example: defining a checker for BSM alarms
OSC2 code: Defining a checker for BSM alarms
import "$FTX/config/sim/sumo_default.osc"
import "$FTX/env/basic/exe_platforms/sumo_dsp/config/sumo_dsp_config.osc"
import "$FTX/packages/adas/BSM/scenarios/common/bsm_config/bsm_config.osc"
watcher modifier bsm_zone_monitor:
sut_veh: sut_vehicle
side: av_side
len: length
wdth: length
# Return true whenever there is an object in the left / right BSM zone
def is_obj_in_zone()-> bool is:
var ref_point: distance_reference = side == left ? left_center : right_center
var sign := side == left ? 1 : -1
result = false
for obj in traffic.physical_objects:
if obj != sut_veh:
var lon_dist := sut_veh.local_distance(obj, ref_point, closest_compound, lon)
var lat_dist := sut_veh.local_distance(obj, ref_point, closest_compound, lat)
if lon_dist in [-len..0m] and lat_dist * sign in [0m..wdth]:
result = true
on @top.w_clk:
var obj_in_zone := is_obj_in_zone()
if data == null and obj_in_zone:
start_interval()
elif data != null and not obj_in_zone:
end_interval()
extend issue_kind: [bsm_check]
# Wrapper modifier for all the checking logic related to a BSM zone (left or right)
modifier sut_vehicle.bsm_zone_checks:
side: av_side with:
keep(default it == left)
len: length with:
keep(default it == 10m)
wdth: length with:
keep(default it == 2.5m)
# Returns true if the turn signal towards 'side' is active
def is_turn_signal_active() -> bool is:
var m_turn_side: turn_signal_state = (side == left) ? left_on : right_on
return actor.state.vehicle_indicators.turn_signal_state == m_turn_side
# Returns true if the BSM alert that corresponds to 'side' is active
def is_bsm_alert_active() -> bool is expression \
(side == left) ? (actor.state.vehicle_indicators.bsm_left_side_alert == true) : \
(actor.state.vehicle_indicators.bsm_right_side_alert == true)
watcher zone_monitor is bsm_zone_monitor(sut_veh: actor, side: side, len: len, wdth: wdth)
var turn_signal_active := sample(is_turn_signal_active(), @top.clk)
watcher turn_indication is while_w(turn_signal_active)
watcher should_issue_bsm_alert is and_w(zone_monitor, turn_indication)
var bsm_alert_active := sample(is_bsm_alert_active(), @top.clk)
watcher bsm_alert_off is while_w(not bsm_alert_active)
checker check_bsm_alert is and_w(should_issue_bsm_alert, bsm_alert_off)
on @check_bsm_alert.i_end if (check_bsm_alert.data.start_time != check_bsm_alert.data.end_time):
check_bsm_alert.sut_warning(bsm_check,
"BSM alert is off when turn signal is on while there are objects in the $(side) BSM zone")
global modifier sut_vehicle.bsm_checks:
left_bsm_checks: bsm_zone_checks(side: left, len: 10m, wdth: 2.5m)
right_bsm_checks: bsm_zone_checks(side: right, len: 10m, wdth: 2.5m)
struct constant_speed_shape inherits any_position_shape:
target_speed: speed with:
keep(default it == 60kph)
def duration() -> time is only:
return 25s
def compute() is only:
lon = target_speed * t
lat = 0meter
extend top.main:
left_car: vehicle
right_car: vehicle
left_shape: constant_speed_shape
right_shape: constant_speed_shape
r: odr_road with: keep(it.odr_id == 0 and it.sub_id == -1)
do serial():
sut.car.request_BSM_mode(state: active)
parallel(start_to_start: 0s):
serial():
wait (sut.car.local_distance(left_car, left_center, closest_compound, lon) > -8m)
sut.car.turn_signal(turn_signal_state: left_on)
wait elapsed(3s)
sut.car.turn_signal(turn_signal_state: off)
wait elapsed(2s)
sut.car.turn_signal(turn_signal_state: right_on)
parallel(overlap: full, duration: 25s):
sut.car.drive() with:
speed(50kph, at: start)
keep_speed()
lane(2)
along(r, start_offset: 200m, at: start)
left_car.drive() with:
speed(60kph, at: start)
lane(1)
position(30m, behind: sut.car, at: start, run_mode: best_effort)
shape(quantity: position, request: both, shape_object: left_shape)
right_car.drive() with:
speed(60kph, at: start)
lane(3)
position(40m, behind: sut.car, at: start, run_mode: best_effort)
shape(quantity: position, request: both, shape_object: right_shape)
# ---
extend test_config:
set implicits_kind = none
set test_drain_time = 0s
set map = "$FTX/packages/maps/straight_long_road.xodr"