Skip to content

Defining watchers and checkers examples

Example: watcher modifier that creates intervals that record maximal speed

OSC2 code: watcher modifier that creates intervals that record maximal speed
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"


# Define custom watcher data that records the maximum speed during
# an interval
struct speed_above_data inherits any_watcher_data:
    max_speed: speed
    record(max_speed, unit: kph)

# Define a watcher type that create intervals
# when the speed of a vehicle exceeds some speed limit,
# and records the maximal exceeding speed
watcher modifier speed_above(speed_above_data):
    v: vehicle
    limit: speed

    on @top.w_clk:
        if(data == null):
            if(v.state.speed > limit):
                start_interval()
        else:
            if(v.state.speed <= limit):
                end_interval()
    on @i_clock:
        if(data.max_speed < v.state.speed):
            data.max_speed = v.state.speed

global modifier vehicle.speed_watchers:
    # Instantiate the watcher for all the vehicle actors
    watcher speed_above_w is speed_above(actor, 30kph)


extend top.main:
    on @sut.car.speed_watchers.speed_above_w.i_clock:
        logger.log_info("$(sut.car.state.speed)")
    on @sut.car.speed_watchers.speed_above_w.i_end:
        logger.log_info("MAX: $(sut.car.speed_watchers.speed_above_w.data.max_speed)")

    do serial:
        sut.car.drive(duration: [5sec..10sec]) with:
            speed([20kph..25kph], at: start)
            speed([35kph..40kph], at: end)
        sut.car.drive(duration: [5sec..10sec]) with:
            speed([20kph..25kph], at: end)

Example: watcher declaration with custom data type

watcher declaration with custom data type
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"


# Watcher type that uses default data type (any_watcher_data)
watcher modifier speed_above:
    v: vehicle
    limit: speed

    on @top.w_clk:
        if(data == null):
            if(v.state.speed > limit):
                start_interval()
        else:
            if(v.state.speed <= limit):
                end_interval()

# Define custom watcher data
struct speed_above_data inherits any_watcher_data:
    max_speed: speed

global modifier vehicle.speed_watchers:
    # Assign custom data type at declaration
    watcher speed_above_w(speed_above_data) is speed_above(v: actor, limit: 30kph)

    # Add additional sampling logic for the custom data attributes
    on @speed_above_w.i_clock:
        if (speed_above_w.data.max_speed < actor.state.speed):
            speed_above_w.data.max_speed = actor.state.speed


scenario sut.increase_then_decrease_speed:
    do serial:
        sut.car.drive(duration: [5sec..10sec]) with:
            speed([20kph..25kph], at: start)
            speed([35kph..40kph], at: end)
        sut.car.drive(duration: [5sec..10sec]) with:
            speed([20kph..25kph], at: end)

extend top.main:
    do sut.increase_then_decrease_speed()

Example: using the passive_w watcher operator

using the passive_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M75_FTX_highway_long_single_road.xodr"


scenario sut.drive_side_by_side:
    npc: npc_vehicle

    do parallel():
        sut.car.drive()
        npc.drive() with:
            lane(side_of: sut.car)
            position(distance: [15..20]m, behind: sut.car, at: start)
            position(distance: [15..20]m, ahead_of: sut.car, at: end)


struct lon_overlap_data inherits any_watcher_data:
    var min_lat_dist: length = 1km
    cover(min_lat_dist, unit: m, range: [0..5], every: 1)

extend sut.drive_side_by_side:

    watcher lon_overlap(lon_overlap_data) is passive_w()

    on @top.w_clk:
        var lon_dist := sut.car.road_distance(npc, closest, closest, lon, lane)

        if lon_dist == 0m and lon_overlap.data == null: 
            lon_overlap.start_interval()
        if lon_overlap.data != null:
            var abs_lat_dist := math.abs(sut.car.road_distance(npc, closest, closest, lat, lane))

            if abs_lat_dist < lon_overlap.data.min_lat_dist:
                lon_overlap.data.min_lat_dist = abs_lat_dist
            if lon_dist != 0m: 
                lon_overlap.end_interval()


extend top.main:
    do drive_side_by_side: sut.drive_side_by_side()

Example: using the while_w watcher operator

using the while_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set implicits_kind = none
    set test_drain_time = 0s


global modifier vehicle.state_watchers:
    watcher speed_above_w is while_w(actor.state.speed > 30kph)


extend top.main:
    do serial:
        sut.car.drive(duration: 5second) with:
            speed(25kph, at: start)
            speed(35kph, at: end)
        sut.car.drive(duration: 5second) with:
            speed(25kph, at: end)
        sut.car.drive(duration: 10second) with:
            keep_speed()

Example: using the not_w watcher operator

using the not_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set implicits_kind = none
    set test_drain_time = 0s


global modifier vehicle.state_watchers:
    watcher speed_above_w is while_w(actor.state.speed > 30kph)
    watcher speed_below_w is not_w(speed_above_w)


extend top.main:
    do serial:
        sut.car.drive(duration: 5second) with:
            speed(25kph, at: start)
            speed(35kph, at: end)
        sut.car.drive(duration: 5second) with:
            speed(25kph, at: end)
        sut.car.drive(duration: 10second) with:
            keep_speed()

Example: using the and_w watcher operator

using the and_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set implicits_kind = none
    set test_drain_time = 0s


global modifier vehicle.state_watchers:
    watcher speed_above_w is while_w(actor.state.speed > 30kph)
    watcher driving_close_to_left_lane_boundary_w is while_w(actor.state.msp_pos.lane_position.lat_offset > 20cm)
    watcher speed_above_and_close_to_left_w is and_w(speed_above_w, driving_close_to_left_lane_boundary_w)


extend top.main:
    do parallel(overlap: equal):
        sut_speed_behavior: serial:
            sut.car.drive(duration: 6second) with:
                speed(25kph, at: start)
                speed(35kph, at: end)
            sut.car.drive(duration: 6second) with:
                speed(25kph, at: end)
            sut.car.drive(duration: 6second) with:
                speed(35kph, at: end)
        sut_lateral_movement: serial:
            sut.car.drive(duration: 9second) with:
                lateral(0m, line: left, at: start)
                lateral(0m, line: right, at: end)
            sut.car.drive(duration: 9second) with:
                lateral(0m, line: left, at: end)

Example: using the or_w watcher operator

using the or_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set implicits_kind = none
    set test_drain_time = 0s


global modifier vehicle.state_watchers:
    watcher driving_close_to_left_lane_boundary_w is while_w(actor.state.msp_pos.lane_position.lat_offset > 20cm)
    watcher driving_close_to_right_lane_boundary_w is while_w(actor.state.msp_pos.lane_position.lat_offset < -20cm)
    watcher driving_close_to_lane_edge_w is or_w(driving_close_to_left_lane_boundary_w, driving_close_to_right_lane_boundary_w)


extend top.main:
    do parallel(overlap: equal):
        sut_speed_behavior: serial:
            sut.car.drive() with:
                speed(30kph)
        sut_lateral_movement: serial:
            sut.car.drive(duration: 6second) with:
                lateral(0m, line: left, at: start)
                lateral(0m, line: right, at: end)
            sut.car.drive(duration: 6second) with:
                lateral(0m, line: left, at: end)
            sut.car.drive(duration: 6second) with:
                lateral(0m, line: right, at: end)

Example: using the between_w watcher operator

using the between_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set test_drain_time=0second
    set min_test_time=0second


global modifier vehicle.vehicle_watchers:
    # Emit an interval between switch lane start and switch lane end
    watcher switch_lane_w is between_w(
            x: @actor.switch_lane_start, y: @actor.switch_lane_end)

    # Emit zero-time intervals for switch lane start & end
    watcher upon_switch_lane_start is upon_w(ev: @actor.switch_lane_start)
    watcher upon_switch_lane_end is upon_w(ev: @actor.switch_lane_end)


scenario sut.switch_lanes:
    do serial:
        # Start driving at lane 1
        sut.car.drive() with:
            lane(1, at: start)
            duration(3s, run_mode: best_effort)
            speed(50kph, at: start)
        # Swicth to lane 2
        sut.car.drive() with:
            lane(2, at: end)
            duration([2..5]s, run_mode: best_effort)
        # Switch to lane 1
        sut.car.drive() with:
            lane(1, at: end)
            duration([2..5]s, run_mode: best_effort)
        # Continue driving on lane 1
        sut.car.drive() with:
            keep_lane()
            duration([2..5]s, run_mode: best_effort)

extend top.main:
    do sut.switch_lanes()

Example: using the between_w watcher operator with custom data

using the between_w watcher operator with custom data
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set test_drain_time=0second
    set min_test_time=0second


enum switch_lane_ending: [ended, aborted]

struct switch_lane_watcher_data inherits any_watcher_data:
    ending_type: switch_lane_ending
    record(ending_type)

global modifier vehicle.vehicle_watchers:
    # Emit the same event, but with different data, for switch lane end
    # and switch lane abort
    event switch_lane_end_or_abort(ending_type: switch_lane_ending)

    on @actor.switch_lane_end:
        emit switch_lane_end_or_abort(ended)

    on @actor.switch_lane_abort:
        emit switch_lane_end_or_abort(aborted)

    # Emit an interval between switch lane start and switch lane end or abort
    watcher switch_lane_w(switch_lane_watcher_data) is between_w(
            x: @actor.switch_lane_start, y: @switch_lane_end_or_abort) with:
        it.data.ending_type = switch_lane_end_or_abort.event_data().ending_type

    on @switch_lane_w.i_end:
        call logger.log_info("Switch lane interval ended. Ending type: $(switch_lane_w.data.ending_type)")


scenario sut.switch_lanes:
    do serial:
        # Start driving at lane 1
        sut.car.drive() with:
            lane(1, at: start)
            duration(3s, run_mode: best_effort)
            speed(50kph, at: start)
        # Swicth to lane 2
        sut.car.drive() with:
            lane(2, at: end)
            duration([2..5]s, run_mode: best_effort)
        # Switch to lane 1
        sut.car.drive() with:
            lane(1, at: end)
            duration([2..5]s, run_mode: best_effort)
        # Continue driving on lane 1
        sut.car.drive() with:
            keep_lane()
            duration([2..5]s, run_mode: best_effort)

extend top.main:
    do sut.switch_lanes()

Example: between_w with back-to-back events

between_w with back-to-back events
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set test_drain_time=0second
    set min_test_time=0second


global modifier vehicle.function_watchers:
    # Emitted in the simulation step when the turn signal is turned on
    event turn_signal_on
    # Emitted in the simulation step when the ADAS function enters 'engaged' state
    event function_engaged

    watcher function_w is between_w(
            x: @turn_signal_on, y: @function_engaged)

    watcher upon_turn_signal_on is upon_w(ev: @turn_signal_on)
    watcher upon_function_engaged is upon_w(ev: @function_engaged)


scenario sut.function_mock:
    do parallel(overlap: equal):
        ego_behavior: sut.car.drive()
        function_behavior: serial:
            # Emit both events at the same time
            wait elapsed(1s)
            emit sut.car.function_watchers.turn_signal_on
            emit sut.car.function_watchers.function_engaged
            wait elapsed(1s)
            # Emit turn_signal_on, then turn_signal_on and function_engaged
            # at the same time, then function_engaged
            emit sut.car.function_watchers.turn_signal_on
            wait elapsed(1s)
            emit sut.car.function_watchers.turn_signal_on
            emit sut.car.function_watchers.function_engaged
            wait elapsed(1s)
            emit sut.car.function_watchers.function_engaged
            wait elapsed(1s)

extend top.main:
    do sut.function_mock()

Example: using the above_w watcher operator

using the above_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set implicits_kind = none
    set test_drain_time = 0s


global modifier vehicle.state_watchers:
    watcher speed_above_w is above_w(sample_type: speed, sample_expression: actor.state.speed, threshold: 30kph, tolerance: 2kph)


extend top.main:
    do serial:
        sut.car.drive(duration: 5second) with:
            speed(25kph, at: start)
            speed(35kph, at: end)
        sut.car.drive(duration: 5second) with:
            speed(25kph, at: end)
        sut.car.drive(duration: 10second) with:
            keep_speed()

Example: using the below_w watcher operator

using the below_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set implicits_kind = none
    set test_drain_time = 0s


global modifier vehicle.state_watchers:
    watcher speed_below_w is below_w(sample_type: speed, sample_expression: actor.state.speed, threshold: 30kph, tolerance: 3kph)


extend top.main:
    do serial:
        sut.car.drive(duration: 5second) with:
            speed(35kph, at: start)
            speed(25kph, at: end)
        sut.car.drive(duration: 5second) with:
            speed(35kph, at: end)
        sut.car.drive(duration: 10second) with:
            keep_speed()

Example: using the upon_w watcher operator

using the upon_w watcher operator
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set test_drain_time=0second
    set min_test_time=0second


global modifier vehicle.vehicle_watchers:
    watcher switch_lane_start_w is upon_w(ev: @actor.switch_lane_start)


scenario sut.switch_lanes:
    do serial:
        # Start driving at lane 1
        sut.car.drive() with:
            lane(1, at: start)
            duration(3s, run_mode: best_effort)
        # Swicth to lane 2
        sut.car.drive() with:
            lane(2, at: end)
            duration([2..5]s, run_mode: best_effort)
        # Switch to lane 1
        sut.car.drive() with:
            lane(1, at: end)
            duration([2..5]s, run_mode: best_effort)

extend top.main:
    do sut.switch_lanes()

Example: using the upon_w watcher operator with custom data

using the upon_w watcher operator with custom data
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_dummy_config.osc"
import "$FTX/env/basic/msp/open_drive.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set test_drain_time=0second
    set min_test_time=0second


struct switch_lane_watcher_data inherits any_watcher_data:
    event_data: switch_lane_data

global modifier vehicle.vehicle_watchers:
    watcher switch_lane_start_w(switch_lane_watcher_data) is upon_w(ev: @actor.switch_lane_start) with:
        it.data.event_data = actor.switch_lane_start.event_data().data

    on @switch_lane_start_w.i_end:
        logger.log_info("$(actor) started switching to lane: " + \ 
            "$(switch_lane_start_w.data.event_data.to_lane_pos.lane)")


scenario sut.switch_lanes:
    do serial:
        # Start driving at lane 1
        sut.car.drive() with:
            lane(1, at: start)
            duration(3s, run_mode: best_effort)
        # Swicth to lane 2
        sut.car.drive() with:
            lane(2, at: end)
            duration([2..5]s, run_mode: best_effort)
        # Switch to lane 1
        sut.car.drive() with:
            lane(1, at: end)
            duration([2..5]s, run_mode: best_effort)

extend top.main:
    do sut.switch_lanes()

Example: checker that creates issues when speed limit exceeded

OSC2 code: checker that creates issues when speed limit exceeded
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"


extend issue_kind: [too_fast] #define a new issue kind

# Define a watcher type that create intervals
# when the speed of a vehicle exceeds some speed limit
watcher modifier speed_above:
    v: vehicle
    limit: speed
    var max_speed: speed

    on @top.w_clk:
        if(data == null):
            if(v.state.speed > limit):
                start_interval()
        else:
            if(v.state.speed <= limit):
                end_interval()
    on @i_clock:
        if(max_speed < v.state.speed):
            max_speed = v.state.speed

global modifier sut_vehicle.speed_checkers:
    checker check_speed_above is speed_above(actor, 30kph) with:
        it.sut_error(kind: too_fast,
            details: "Vehicle $(it.v) exceeded maximal speed $(it.limit) reaching up to $(it.max_speed)")


extend top.main:
    do serial:
        sut.car.drive(duration: [5sec..10sec]) with:
            speed([20kph..25kph], at: start)
            speed([35kph..40kph], at: end)
        sut.car.drive(duration: [5sec..10sec]) with:
            speed([20kph..25kph], at: end)

Example: using set_issue to modify checker issue severity

OSC2 code: using set_issue to modify checker issue severity
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend issue_kind: [too_fast] #define a new issue kind

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"

# Define a watcher type that create intervals
# when the speed of a vehicle exceeds some speed limit
watcher modifier speed_above:
    v: vehicle
    limit: speed
    var max_speed: speed

    on @top.w_clk:
        if(data == null):
            if(v.state.speed > limit):
                start_interval()
        else:
            if(v.state.speed <= limit):
                end_interval()
    on @i_clock:
        if(max_speed < v.state.speed):
            max_speed = v.state.speed

global modifier vehicle.speed_checkers:
    checker check_speed_above is speed_above(actor, 30kph) with:
        it.sut_error(kind: too_fast, 
            details: "Vehicle $(it.v) exceeded maximal speed $(it.limit) reaching up to $(it.max_speed)")

extend top.main:
    do serial:
        sut.car.drive(duration: [5sec..10sec]) with:
            speed([20kph..25kph], at: start)
            speed([35kph..40kph], at: end)
        sut.car.drive(duration: [5sec..10sec]) with:
            speed([20kph..25kph], at: end)


extend top.main:
    set_issue(target_checker: sut.car.speed_checkers.check_speed_above, severity: info)

Example: using set_issue to modify checker issue severity and category

OSC2 code: using set_issue to modify checker issue severity and category
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set implicits_kind = none
    set test_drain_time = 0s

struct speed_above_data inherits any_watcher_data:
    max_speed: speed

watcher modifier speed_above(speed_above_data):
    v: vehicle
    limit: speed

    on @top.w_clk:
        if(data == null):
            if(v.state.speed > limit):
                start_interval()
        else:
            if(v.state.speed <= limit):
                end_interval()

    on @i_clock:
        if (data.max_speed < v.state.speed):
            data.max_speed = v.state.speed

extend issue_kind: [too_fast]


global modifier vehicle.speed_checkers:
    checker check_speed_above is speed_above(actor, 30kph) with:
        it.issue(kind: too_fast, severity: error_continue, 
            category: other,
            details: "Vehicle $(it.v) exceeded maximal speed $(it.limit) " + \
                "reaching up to $(it.data.max_speed)")

extend sut_vehicle.speed_checkers:
    set_issue(target_checker: speed_checkers.check_speed_above, severity: error, category: sut)



extend top.main:
    npc: vehicle
    do parallel(overlap: equal):
        sut_behavior: serial:
            sut.car.drive(duration: 10second) with:
                speed(25kph)
            sut.car.drive(duration: 5second) with:
                speed(35kph, at: end)
            sut.car.drive(duration: 5second) with:
                speed(25kph, at: end)
            sut.car.drive(duration: 10second) with:
                keep_speed()
        npc_behavior: serial:
            npc.drive(duration: 5second) with:
                speed(25kph, at: start)
                speed(35kph, at: end)
            npc.drive(duration: 5second) with:
                speed(25kph, at: end)
            npc.drive() with:
                keep_speed()

Example: using the condition parameter of set_issue

OSC2 code: using the condition parameter of set_issue
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"
    set implicits_kind = none
    set test_drain_time = 0s


extend issue_kind: [speed_above]

global modifier vehicle.state_checkers:
    checker check_speed_above is above_w(
        sample_type: speed, sample_expression: actor.state.speed, threshold: 30kph, tolerance: 2kph)

    on @check_speed_above.i_end:
        check_speed_above.other_error(kind: speed_above, 
            details: "Speed of actor $(actor) was above 30kph")

    set_issue(target_checker: check_speed_above, condition: actor.is_dut, severity: warning)


extend top.main:
    do serial:
        sut.car.drive(duration: 5second) with:
            speed(25kph, at: start)
            speed(35kph, at: end)
        sut.car.drive(duration: 5second) with:
            speed(25kph, at: end)
        sut.car.drive(duration: 10second) with:
            keep_speed()

Example: defining a checker for longitudinal acceleration while LSS is engaged

OSC2 code: Defining a checker for longitudinal acceleration while LSS is engaged
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"

extend sut_vehicle:
    # Mock LSS interface
    var lss_is_engaged_flag := false
    def lss_is_engaged() -> bool is expression lss_is_engaged_flag

    var lss_is_warning_flag := false
    def lss_is_warning() -> bool is expression lss_is_warning_flag


extend issue_kind: [lss_lon_acceleration]

struct inhibition_on_longitudinal_acceleration_data inherits any_watcher_data:
    max_acceleration: acceleration

modifier sut_vehicle.lss_lon_acceleration:
    LSS_MAX_LON_ACC: acceleration with: keep(default it == 2mpsps)

    @required
    lss_is_engaged_or_warning: any_watcher_behaviour

    issue_severity: issue_severity with:
        keep(default it == error)

    watcher lon_acceleration_above_threshold is while_w(
        actor.state != null and actor.state.local_acceleration.lon > LSS_MAX_LON_ACC)

    checker check_lss_lon_acceleration(inhibition_on_longitudinal_acceleration_data) is and_w(
            lss_is_engaged_or_warning, lon_acceleration_above_threshold) with:
        it.sut_issue(
            kind: lss_lon_acceleration, severity: issue_severity,
            details: "Longitudinal acceleration of SUT vehicle ($(actor)) was higher than maximum " + \
                "allowed value while LSS is engaged ($(LSS_MAX_LON_ACC)). Maximum recorded " + \
                "value of longitudinal acceleration: $(it.data.max_acceleration)")

    # Sample maximum acceleration during the interval
    on @check_lss_lon_acceleration.i_clock:
        if (check_lss_lon_acceleration.data.max_acceleration < actor.state.local_acceleration.lon):
            check_lss_lon_acceleration.data.max_acceleration = actor.state.local_acceleration.lon

global modifier sut_vehicle.lss_checks:
    watcher lss_is_engaged is while_w(actor.lss_is_engaged())
    watcher lss_is_warning is while_w(actor.lss_is_warning())
    watcher lss_is_engaged_or_warning is or_w(lss_is_engaged, lss_is_warning)

    lss_lon_acceleration: lss_lon_acceleration(lss_is_engaged_or_warning: lss_is_engaged_or_warning)


scenario sut.three_change_lanes:
    do serial:
        first_change_lane: sut.car.drive() with:
            speed([30..40]kph, at: start)
            duration([4..5]s)
            change_lane()
        first_drive_straight: sut.car.drive() with:
            duration([3..5]s)
        second_change_lane: sut.car.drive() with:
            duration([4..5]s)
            change_lane()
        second_drive_straight: sut.car.drive() with:
            keep_lane()
            duration([3..5]s)
        third_change_lane: sut.car.drive() with:
            duration([4..5]s)
            change_lane()


extend top.main:
    do a: sut.three_change_lanes()

    # Mock LSS behavior: whenever SUT is changing lanes,
    # sut.car.lss_is_engaged() or sut.car.lss_is_warning() or return true

    var first_change_lane := false
    on @sut.car.switch_lane_start:
        if (first_change_lane == false):
            sut.car.lss_is_warning_flag = true
            first_change_lane = true
        else:
            sut.car.lss_is_engaged_flag = true
    on @sut.car.switch_lane_end:
        sut.car.lss_is_engaged_flag = false
        sut.car.lss_is_warning_flag = false

    # Set the longitudinal acceleration threshold to a small value
    keep(sut.car.lss_checks.lss_lon_acceleration.LSS_MAX_LON_ACC == 0.1mpsps)

    # Change severity of checker issues to error_continue
    set_issue(target_checker: sut.car.lss_checks.lss_lon_acceleration.check_lss_lon_acceleration, severity: error_continue)

Example: defining a checker for BSM alarms

OSC2 code: Defining a checker for BSM alarms
import "$FTX/config/sim/sumo_default.osc"
import "$FTX/env/basic/exe_platforms/sumo_dsp/config/sumo_dsp_config.osc"
import "$FTX/packages/adas/BSM/scenarios/common/bsm_config/bsm_config.osc"

watcher modifier bsm_zone_monitor:
    sut_veh: sut_vehicle
    side: av_side
    len: length
    wdth: length

    # Return true whenever there is an object in the left / right BSM zone
    def is_obj_in_zone()-> bool is:
        var ref_point: distance_reference = side == left ? left_center : right_center
        var sign := side == left ? 1 : -1

        result = false
        for obj in traffic.physical_objects:
            if obj != sut_veh:
                var lon_dist := sut_veh.local_distance(obj, ref_point, closest_compound, lon)
                var lat_dist := sut_veh.local_distance(obj, ref_point, closest_compound, lat)

                if lon_dist in [-len..0m] and lat_dist * sign in [0m..wdth]:
                    result = true

    on @top.w_clk:
        var obj_in_zone := is_obj_in_zone()
        if data == null and obj_in_zone:
            start_interval()
        elif data != null and not obj_in_zone:
            end_interval()


extend issue_kind: [bsm_check]

# Wrapper modifier for all the checking logic related to a BSM zone (left or right)
modifier sut_vehicle.bsm_zone_checks:
    side: av_side with:
        keep(default it == left)
    len: length with:
        keep(default it == 10m)
    wdth: length with:
        keep(default it == 2.5m)

    # Returns true if the turn signal towards 'side' is active
    def is_turn_signal_active() -> bool is:
        var m_turn_side: turn_signal_state = (side == left) ? left_on : right_on
        return actor.state.vehicle_indicators.turn_signal_state == m_turn_side

    # Returns true if the BSM alert that corresponds to 'side' is active
    def is_bsm_alert_active() -> bool is expression \
        (side == left) ? (actor.state.vehicle_indicators.bsm_left_side_alert == true) : \
            (actor.state.vehicle_indicators.bsm_right_side_alert == true)


    watcher zone_monitor is bsm_zone_monitor(sut_veh: actor, side: side, len: len, wdth: wdth)

    var turn_signal_active := sample(is_turn_signal_active(), @top.clk)
    watcher turn_indication is while_w(turn_signal_active)

    watcher should_issue_bsm_alert is and_w(zone_monitor, turn_indication)

    var bsm_alert_active := sample(is_bsm_alert_active(), @top.clk)
    watcher bsm_alert_off is while_w(not bsm_alert_active)

    checker check_bsm_alert is and_w(should_issue_bsm_alert, bsm_alert_off)

    on @check_bsm_alert.i_end if (check_bsm_alert.data.start_time != check_bsm_alert.data.end_time):
        check_bsm_alert.sut_warning(bsm_check, 
            "BSM alert is off when turn signal is on while there are objects in the $(side) BSM zone")

global modifier sut_vehicle.bsm_checks:
    left_bsm_checks: bsm_zone_checks(side: left, len: 10m, wdth: 2.5m)
    right_bsm_checks: bsm_zone_checks(side: right, len: 10m, wdth: 2.5m)


struct constant_speed_shape inherits any_position_shape:
    target_speed: speed with:
        keep(default it == 60kph)

    def duration() -> time is only:
        return 25s

    def compute() is only:
        lon = target_speed * t
        lat = 0meter

extend top.main:
    left_car: vehicle
    right_car: vehicle

    left_shape: constant_speed_shape
    right_shape: constant_speed_shape

    r: odr_road with: keep(it.odr_id == 0 and it.sub_id == -1)

    do serial():
        sut.car.request_BSM_mode(state: active)
        parallel(start_to_start: 0s):
            serial():
                wait (sut.car.local_distance(left_car, left_center, closest_compound, lon) > -8m)
                sut.car.turn_signal(turn_signal_state: left_on)
                wait elapsed(3s)
                sut.car.turn_signal(turn_signal_state: off)
                wait elapsed(2s)
                sut.car.turn_signal(turn_signal_state: right_on)
            parallel(overlap: full, duration: 25s):
                sut.car.drive() with:
                    speed(50kph, at: start)
                    keep_speed()
                    lane(2)
                    along(r, start_offset: 200m, at: start)
                left_car.drive() with:
                    speed(60kph, at: start)
                    lane(1)
                    position(30m, behind: sut.car, at: start, run_mode: best_effort)
                    shape(quantity: position, request: both, shape_object: left_shape)
                right_car.drive() with:
                    speed(60kph, at: start)
                    lane(3)
                    position(40m, behind: sut.car, at: start, run_mode: best_effort)
                    shape(quantity: position, request: both, shape_object: right_shape)


# ---

extend test_config:
    set implicits_kind = none
    set test_drain_time = 0s
    set map = "$FTX/packages/maps/straight_long_road.xodr"