const { SCPBody, PatternStore } = require("scp-protocol")
class CartpoleBody extends SCPBody {
static bodyName = "cartpole"
static tools = {
apply_force: {
description: "push the cart to balance the pole",
parameters: {
direction: { type: "string", enum: ["left", "right"], required: true },
magnitude: { type: "number", min: 0, max: 1, required: true },
},
},
reset: { description: "recenter the cart and pick a small tilt", parameters: {} },
hold: { description: "apply no force this frame", parameters: {} },
}
constructor(opts = {}) {
super({
...opts,
patternStore: new PatternStore({
featureExtractor: (e) => ({
tilt_bucket: Math.abs(e.pole_angle) > 0.4 ? "tilted" : "upright",
drift: e.cart_pos > 0.5 ? "right" : e.cart_pos < -0.5 ? "left" : "center",
}),
}),
})
this.physics = new CartpolePhysics()
}
// -- Tools (one async method per static tool name) --
async apply_force({ direction, magnitude }) {
this.physics.apply(direction, magnitude)
return { applied: direction, magnitude }
}
async reset() { this.physics.reset(); return { ok: true } }
async hold() { this.physics.force = 0; return { ok: true } }
// -- Sensor loop (called by Plexa or by your own runner) --
async tick() {
await super.tick()
this.physics.step()
this.setState({
pole_angle: this.physics.theta,
cart_pos: this.physics.x,
})
// Reflex: emergency damping if the pole is past the danger threshold.
if (Math.abs(this.physics.theta) > 0.5) {
this.physics.apply(this.physics.theta > 0 ? "left" : "right", 0.6)
this.emit("pole_critical", { angle: this.physics.theta }, "CRITICAL")
}
}
// -- Outcome reporting (optional but recommended) --
evaluateOutcome(state) {
if (Math.abs(state.pole_angle) > 0.6) return false
if (Math.abs(state.pole_angle) < 0.1) return true
return null // unknown, skip
}
}
module.exports = { CartpoleBody }