Skip to content

Behavior monitoring examples

Example: behavior monitoring with cover()

OSC2 code: behavior monitoring with cover()
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    do sut.car.drive(duration: 5s)

scenario sut.scenario1:
    car1: vehicle


    speed1: speed
    cover(speed1, unit: kph,
        text: "Absolute speed of ego (in km/h)",
        range: [10..130], every: 10)


    do serial:
        car1.drive()

Example: equivalent cover definitions

OSC2 code: equivalent cover definitions - it for expression
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    do sut.car.drive(duration: 5s)

extend top.main:


    # Example 1: use 'it' for 'expression' or omit if it can be derived from the name
    current_speed: speed with: cover(current_speed, expression: it, unit: kph)
OSC2 code: equivalent cover definitions - expression omitted
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    do sut.car.drive(duration: 5s)

scenario sut.scenario1:
    speed1: speed
    car1: vehicle


    # Example 2: 'expression' is omitted because it can be derived from name
    current_speed: speed
    cover(current_speed, unit: kph)


    do serial:
        car1.drive()

Example: cover() with sample()

OSC2 code: cover with sample()
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    do sut.car.drive(duration: 5s)
scenario sut.cut_in_and_slow:
    car1: vehicle

    do serial:
        slow: car1.drive()


extend sut.cut_in_and_slow:
    var rel_d_slow_end: length = sample(map.abs_distance_between_positions(
       sut.car.state.msp_pos.road_position,
       car1.state.msp_pos.road_position),  @slow.end) with:
           cover(rel_d_slow_end,
                 unit:centimeter,
                 text:"How far ahead is car1 relative to dut at slow end (in cm)",
                 range:[0..6000],
                 every:50)

Example: cross coverage

OSC2 code: cross coverage
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

scenario sut.cut_in_and_slow:
    car1: vehicle  # The "cut-in" car
    side: av_side  # The side of which car1 cuts in, left or right


    do s1: serial():
        start_behind_dut: parallel(duration:[1..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                speed(speed: [30..200]kph)
                position(distance: [5..100]meter, behind: sut.car, at: start)

        change_lane: parallel(duration:[1.5..3]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                l1: lane(side_of: sut.car, side: side, at: start)
                p1: position(time: [0.5..1]second, ahead_of: sut.car, at: start)
                speed([30..200]kph)
                l2: lane(same_as: sut.car, at: end)
                p2: position(time: [1.5..2]second, ahead_of: sut.car, at: end)

        slow: parallel(duration:[3..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                keep_lane()
                acceleration(acceleration: [car1.policy.min_acceleration..car1.policy.min_acceleration/2])

extend top.main:
    do sut.car.drive(duration: 5s)

extend sut.cut_in_and_slow:

    event change_lane_start is @change_lane.start

    var rel_d_cls:= sample(map.abs_distance_between_positions(
       sut.car.state.msp_pos.road_position, car1.state.msp_pos.road_position), @change_lane.start) with:
        cover(rel_d_cls, text: "How far ahead is car1 relative to ego",
            event: change_lane_start,
            unit: centimeter,
            range: [0..1500],
            every: 50)

    var dut_v_cls:= sample(sut.car.state.speed, @change_lane.start) with:
        cover(dut_v_cls, text: "Absolute speed of dut at change_lane start (in km/h)",
            event: change_lane_start,
            unit: kph,
            range: [10..130],
            every: 10)

    var rel_v_cls:= sample(car1.state.speed, @change_lane.start) with:
        cover(rel_v_cls, text: "Absolute speed of car1 at change_lane start (in meter/second)",
            event: change_lane_start,
            unit: kph,
            range: [-33..33],
            every: 1)


    cover(cross_dist_vel, items: [rel_d_cls, dut_v_cls,rel_v_cls],
        event: change_lane_start,
        text: "Cross coverage of relative distance and absolute velocity")

Example: cover with a non-default sampling event

OSC2 code: cover with a non-default sampling event
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    do sut.car.drive(duration: 5s)

scenario sut.scenario1:
    car1: vehicle

    do serial:
        car1.drive()


    var speed1:= sample(car1.state.speed, @start) with:
        cover(speed1, unit:kph)

Example: behavior monitoring with record()

OSC2 code: behavior monitoring with record()
import "$FTX/env/basic/exe_platforms/model_ssp/config/model_sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"

scenario sut.cut_in_and_slow:
    car1: vehicle  # The "cut-in" car
    side: av_side  # The side of which car1 cuts in, left or right


    do serial():
        start_behind_dut: parallel(duration:[1..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                speed(speed: [30..200]kph)
                position(distance: [5..100]meter, behind: sut.car, at: start)

        change_lane: parallel(duration:[1.5..3]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                l1: lane(side_of: sut.car, side: side, at: start)
                p1: position(time: [0.5..1]second, ahead_of: sut.car, at: start)
                l2: lane(same_as: sut.car, at: end)
                p2: position(time: [1.5..2]second, ahead_of: sut.car, at: end)

        slow: parallel(duration:[3..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                keep_lane()
                acceleration(acceleration: [car1.policy.min_acceleration..car1.policy.min_acceleration/2])



extend top.main:
    do sut.cut_in_and_slow()


extend sut.cut_in_and_slow:

    # Sample the time-to-collision KPI at the end of change_lane
    var ttc_at_end_of_change_lane:= sample(sut.car.get_ttc_to_object(car1), @change_lane.end)

    # Record the KPIs into the cut_in_and_slow.end metric group
    record(ttc_at_end_of_change_lane,
        unit:s,
        text: "Time to collision of ego car to cut-in car at end of change_lane")


extend sut.cut_in_and_slow:
    var dut_v_cle:= sample(sut.car.state.speed, @change_lane.end) with:
        cover(dut_v_cle,
            text: "Speed of dut at change_lane end (in kph)",
            unit: kph,
            range: [10..200],
            every: 10)

    record(ttc_dut_vel, items: [ttc_at_end_of_change_lane, dut_v_cle],
        text: "Cross record of TTC and absolute SUT velocity")

Example: range

OSC2 code: range
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"


extend top.main:
    speed2: speed
    cover(speed2, unit: kph,
          range: [10..130])


extend top.main:
    speed3: speed
    cover(speed3, unit: kph,
          range: [10..130], every: 10)

extend top.main:
    do sut.car.drive(duration: 5s)

extend top.main:
    speed1: speed
    cover(speed1, unit: kph)

Example: every

OSC2 code: every
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    speed2: speed
    cover(speed2, unit: kph,
          range: [10..130])


extend top.main:
    speed3: speed
    cover(speed3, unit: kph,
          range: [10..130], every: 10)


extend top.main:
    do sut.car.drive(duration: 5s)

extend top.main:
    speed1: speed
    cover(speed1, unit: kph)

Example: cover() with local event

OSC2 code: cover() with local event
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    do sut.car.drive(duration: 5s)


extend top.main:
    speed1: speed

    event sim_clock is @top.clk

    cover(speed1, unit: kph,
        event: sim_clock,
        range: [10..130], every: 10)

Example: cover() with explanatory text

OSC2 code: cover() with explanatory text
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    event change_lane_start is @top.clk

    do sut.car.drive(duration: 5s)


extend top.main:
    speed1: speed
    cover(speed1, unit: kph,
          event: change_lane_start,
          text: "Absolute speed of ego at change_lane start (in km/h)",
          range: [10..130], every: 10)

Example: cross items

OSC2 code: cross items
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

scenario sut.cut_in_and_slow:
    car1: vehicle  # The "cut-in" car
    side: av_side  # The side of which car1 cuts in, left or right


    do serial():
        start_behind_dut: parallel(duration:[1..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                speed(speed: [30..200]kph)
                position(distance: [5..100]meter, behind: sut.car, at: start)

        change_lane: parallel(duration:[1.5..3]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                l1: lane(side_of: sut.car, side: side, at: start)
                p1: position(time: [0.5..1]second, ahead_of: sut.car, at: start)
                speed([30..200]kph)
                l2: lane(same_as: sut.car, at: end)
                p2: position(time: [1.5..2]second, ahead_of: sut.car, at: end)

        slow: parallel(duration:[3..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                keep_lane()
                acceleration(acceleration: [car1.policy.min_acceleration..car1.policy.min_acceleration/2])

extend sut.cut_in_and_slow:
    event change_lane_start is @change_lane.start

    var dut_v_cls:= sample(sut.car.state.speed, @change_lane_start) with:
        cover(dut_v_cls,
            text: "Speed of dut at change_lane start (in kph)",
            unit: kph,
            range: [10..200],
            every: 10)

    var rel_d_cls:= sample(map.abs_distance_between_positions(sut.car.state.msp_pos.road_position, car1.state.msp_pos.road_position), @change_lane_start) with:
        cover(rel_d_cls,
            text: "How ahead is car1 relative to dut at change_lane start (in centimeter)",
            unit: centimeter,
            range: [0..5000],
            every: 50)

extend top.main:
    do sut.car.drive(duration: 5s)


extend sut.cut_in_and_slow:

    cover(cross_dist_vel,
        items: [rel_d_cls, dut_v_cls],
        text: "Cross coverage of relative distance and absolute velocity")

Example: bucket boundaries

OSC2 code: bucket boundaries
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    do sut.car.drive(duration: 5s)


extend top.main:
    speed1: speed

    cover(speed1, unit: kph, buckets: [1, 2, 6.5, 10])

Example: target for bucket

OSC2 code: target for bucket
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    x: uint
    keep(x in [0..50])


    cover(c5, expression: x, buckets: [bucket(values: [1..4], target: 5),
                                       bucket([4..8]),
                                       bucket([8..50], 2)])


    do sut.car.drive(duration: 5s)

Example: cover() with ignore

OSC2 code: cover() with ignore
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

scenario sut.cut_in_and_slow:
    car1: vehicle  # The "cut-in" car
    side: av_side  # The side of which car1 cuts in, left or right
    dut_lane: av_road_lane_type


    do serial():
        start_behind_dut: parallel(duration:[1..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                speed(speed: [30..200]kph)
                position(distance: [5..100]meter, behind: sut.car, at: start)

        change_lane: parallel(duration:[1.5..3]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                l1: lane(side_of: sut.car, side: side, at: start)
                p1: position(time: [0.5..1]second, ahead_of: sut.car, at: start)
                speed([30..200]kph)
                l2: lane(same_as: sut.car, at: end)
                p2: position(time: [1.5..2]second, ahead_of: sut.car, at: end)

        slow: parallel(duration:[3..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                keep_lane()
                acceleration(acceleration: [car1.policy.min_acceleration..car1.policy.min_acceleration/2])

extend top.main:
    do sut.car.drive(duration: 5s)


extend sut.cut_in_and_slow:
  cover(dut_lane,
    text: "Relative dut lane within road (leftmost/center/rightmost)",
    ignore: (dut_lane == center))

Example: cover() with disable

OSC2 code: cover() with disable
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

scenario sut.cut_in_and_slow:
    car1: vehicle  # The "cut-in" car
    side: av_side  # The side of which car1 cuts in, left or right


    do serial():
        start_behind_dut: parallel(duration:[1..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                speed(speed: [30..200]kph)
                position(distance: [5..100]meter, behind: sut.car, at: start)

        change_lane: parallel(duration:[1.5..3]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                l1: lane(side_of: sut.car, side: side, at: start)
                p1: position(time: [0.5..1]second, ahead_of: sut.car, at: start)
                speed([30..200]kph)
                l2: lane(same_as: sut.car, at: end)
                p2: position(time: [1.5..2]second, ahead_of: sut.car, at: end)

        slow: parallel(duration:[3..5]second, overlap:equal):
            sut.car.drive()
            car1.drive() with:
                keep_lane()
                acceleration(acceleration: [car1.policy.min_acceleration..car1.policy.min_acceleration/2])

extend sut.cut_in_and_slow:
    event change_lane_start is @change_lane.start

    var dut_v_cls:= sample(sut.car.state.speed, @change_lane.start) with:
        cover(dut_v_cls,
            text: "Speed of dut at change_lane start (in kph)",
            unit: kph,
            range: [10..200],
            every: 10)

    var rel_d_cls:= sample(map.abs_distance_between_positions(sut.car.state.msp_pos.road_position, car1.state.msp_pos.road_position), @change_lane.start) with:
        cover(rel_d_cls,
            text: "How ahead is car1 relative to dut at change_lane start (in centimeter)",
            unit: centimeter,
            range: [0..5000],
            every: 50)

extend top.main:
    do sut.car.drive(duration: 5s)


extend sut.cut_in_and_slow:
    cover(override: dut_v_cls, disable: true)

Example: target for item

OSC2 code: target for item
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    do sut.car.drive(duration: 5s)


extend top.main:
    side: av_side

    cover(side, target: 20)

Example: cover() with ignore and disable

OSC2 code: cover() with ignore and disable
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    npc: vehicle
    do parallel(overlap: equal):
        sut.car.drive() with:
            duration(5second)
            speed(40kph, run_mode: best_effort)
        npc.drive() with:
            speed([20..40]kph)

    var speed_diff: speed = sample(sut.car.state.speed - npc.state.speed, @end)


    # Original definition
    cover(speed_diff, unit: kph, range: [1..20], every: 5)

    # New definition adds ignore
    cover(override: speed_diff, ignore: (speed_diff in [10..13]))

    # Now speed_diff is completely disabled
    cover(override: speed_diff, disable: true)

Example: cover() with rename

OSC2 code: cover() with rename
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"
    set min_test_time = 0s

extend top.main:
    var sut_start_speed: speed = sample(sut.car.state.speed, @start)

    do sut.car.drive(duration: 2s) with:
        speed(sut_start_speed, at: start)


    # Original definition
    cover(sut_start_speed, unit: kph, range: [1..20], every: 5)

    # New definition renames item
    cover(override: sut_start_speed, rename: ego_start_speed)

Example: behavior monitoring with trace

OSC2 code: behavior monitoring with trace
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"
import "$FTX_PACKAGES/base_scenarios/scenarios/vehicle_cut_in/vehicle_cut_in_and_slow/vehicle_cut_in_and_slow_top.osc"

extend test_config:

    set map = "$FTX_PACKAGES/maps/hooder.xodr"


extend signal_category: [cut_in_params]  # optional, when new category is needed to enable grouping of multiple signals together

extend sut.vehicle_cut_in_and_slow:
    # cut_in_vehicle_speed is added to the 'npc' category
    trace(cut_in_vehicle_speed, expression: cut_in_vehicle.state.speed, unit: kph, category: npc)
    # dut_speed and speed_diff are not added to any category
    trace(dut_speed, expression: sut.car.state.speed, unit:  kph)
    trace(speed_diff, expression: cut_in_vehicle.state.speed - sut.car.state.speed, unit: kph)
    # cutin_side is added to the 'cut_in_params' category
    trace(cutin_side, expression: vehicle_cut_in_at_essence.gen_cut_in_side, category: cut_in_params, enabled: true)


extend top.main:

    do c: sut.vehicle_cut_in_and_slow()

Example: behavior monitoring with sut_error()

OSC2 code: behavior monitoring with sut_error()
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"


extend top.main:

    on sut.car.state.speed > 100kph:
        call sut_error(assertion, "SUT drove too fast")

Example: behavior monitoring with log()

OSC2 code: behavior monitoring with log()
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/straight_long_road.xodr"

struct my_ints:
    x: int
    y: int

extend top.main:
    var z: my_ints = new  

    do serial:
        sut.car.drive() with: duration([3..7]s)
        call logger.log_info("z.x = $(z.x), z.y = $(z.y)")

Example: behavior monitoring statistics modifier

OSC2 code: behavior monitoring statistics modifier
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"




extend top.main:
    v1: vehicle
    do v1.drive(duration: 10sec)

    avg_speed: statistics(sample_type: speed, measurement: average)

    on @top.clk:
        avg_speed.add_sample(v1.state.speed)

    on @end:
        var avg := avg_speed.consume()
        logger.log_info("v1 average speed: $(avg)")

Example: combining a watcher and statistics for collecting data during an interval

OSC2 code: combining a watcher and statistics for collecting data during an interval
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/M77_FTX_highway_straight_long_road.xodr"

extend top.main:
    my_car: vehicle

    do serial(duration: [10..30]sec):
        drv1: my_car.drive() with:
            speed(40kph, at: start)
        drv2: my_car.drive() with:
            speed(60kph, at: start)
            speed(40kph, at: end)



extend top.main:

    avg_acc_stats: statistics(sample_type: acceleration, measurement: average)

    watcher above_50(my_interval_data) is above_w(sample_type: speed, sample_expression: my_car.state.speed, threshold: 50kph, tolerance: 5kph)

    on @above_50.i_clock:
        avg_acc_stats.add_sample(my_car.state.road_acceleration.lon)

    on @above_50.i_end:
        above_50.data.average_acceleration = avg_acc_stats.consume()

struct my_interval_data inherits above_w_speed_data:
    var average_acceleration: acceleration
    record(average_acceleration, expression: average_acceleration, unit: mpsps)

Example: cover with sample_if

OSC2 code: cover() with sample_if
import "$FTX_BASIC/exe_platforms/sumo_ssp/config/sumo_config.osc"

extend test_config:
    set map = "$FTX_PACKAGES/maps/hooder.xodr"

extend top.main:
    do sut.car.drive() with:
        duration(5second)
        speed([40..80]kph)

    var sut_speed: speed = sample(sut.car.state.speed, @end)


    cover(sut_speed,
        unit: kph,
        range: [0..200],
        every: 10,
        sample_if: sut.car.get_lane_position() == middle)