sync: update from blackroad-operator 2026-03-14

Synced from BlackRoad-OS-Inc/blackroad-operator/orgs/personal/lucidia
BlackRoad OS — Pave Tomorrow.

RoadChain-SHA2048: fe729062952871e7
RoadChain-Identity: alexa@sovereign
RoadChain-Full: fe729062952871e77147cf6d938b799096e87d9024d7005a14c9e209e12e8ad0c825b624c7bc649fc7eeb4c284fdcab8231af77980065cc04d9f36fca479ffc2346ed3c1b73de6f240d8f9485f47c995ad5b81142f7179b84932c67914dff1c08db039349ba28fca36cb57688093bf0199268dd1c2f3448c9383000bc77cc9663066ff57b834370afc8838b18466ea9029908018b961555cccaabf2ce21649cf3cabc7f64bdcc4abdf2da259b210c342835a2cecf92bdd3b4e109b4d6e622f6934e13b2b123607bd61ce3d0f20454c9ab594f9284cffe18716619c52db57ce5f4ee2856cb96e1fa3748fe1fe65435bec297c5ab3ab58d570ec1064aea29931dd
This commit is contained in:
2026-03-14 15:09:52 -05:00
parent f25d5c2836
commit 855585cb0e
1207 changed files with 10061 additions and 349689 deletions

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Dict, Any, Protocol
class OperatorFunc(Protocol):
def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
@dataclass
class Operator:
"""
Represents a symbolic operator in Lucidia's Codex.
Attributes
----------
name : str
Unique identifier for the operator (e.g., "AND", "ELEVATE").
arity : int
Number of positional operands this operator expects.
impl : OperatorFunc
Concrete implementation callable.
description : str
Human-readable description of behavior and intent.
"""
name: str
arity: int
impl: OperatorFunc
description: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
def run(self, *args: Any, **kwargs: Any) -> Any:
if self.arity >= 0 and len(args) != self.arity:
raise ValueError(f"{self.name} expects arity {self.arity}, got {len(args)}")
return self.impl(*args, **kwargs)
class OperatorRegistry:
"""In-memory registry for Codex operators."""
def __init__(self) -> None:
self._ops: Dict[str, Operator] = {}
def register(self, op: Operator) -> None:
key = op.name.upper()
if key in self._ops:
raise KeyError(f"Operator already registered: {op.name}")
self._ops[key] = op
def get(self, name: str) -> Operator:
try:
return self._ops[name.upper()]
except KeyError as e:
raise KeyError(f"Unknown operator: {name}") from e
def call(self, name: str, *args: Any, **kwargs: Any) -> Any:
return self.get(name).run(*args, **kwargs)
# Minimal built-ins
def _op_identity(x: Any) -> Any:
return x
def _op_concat(a: str, b: str) -> str:
return f"{a}{b}"
REGISTRY = OperatorRegistry()
REGISTRY.register(Operator("IDENTITY", 1, _op_identity, "Return input unchanged."))
REGISTRY.register(Operator("CONCAT", 2, _op_concat, "Concatenate two strings."))
if __name__ == "__main__":
print(REGISTRY.call("IDENTITY", {"hello": "world"}))
print(REGISTRY.call("CONCAT", "Lucid", "ia"))