Ana içerik geç

Python — zeq-observer-client.py

The Python form factor of the Observer Agent is a single-file ZeqObserver class that drops into any long-lived Python process — AI/ML training loops, robotics controllers, scientific pipelines, server-side workers. It speaks the same wire contract as the Web JS, Embedded C, and Network Tap form factors.

It is 17 KB, has one dependency (requests), and one optional dependency (cryptography) only when encrypt=True.

Install

curl -O https://zeqapi.com/zeq-observer-client.py

The file is the entire agent — no package, no sub-modules. Drop it in your project root or somewhere on PYTHONPATH.

Five-line drop-in

from zeq_observer_client import ZeqObserver

obs = ZeqObserver(
slug = "zeq07792026349",
key = "zsm_live_2c5e12b5b3f1ad99…",
encrypt = True,
types = ["epoch", "checkpoint", "anomaly"],
)

obs.observe(type="epoch", state_hash=epoch_state_sha, meta={"loss": 0.041})

That's the full surface area for a typical training loop. Heartbeats fire automatically every 8 zeqonds; the outbox queues to disk if the network blips; HF6/HF8/HF12/HF14/HF18 are computed on every emission.

Public API

class ZeqObserver:
def __init__(self, slug, key, *, encrypt=False, types=None,
endpoint="https://zeqapi.com", heartbeat_zeqonds=8,
outbox_path="~/.zeq-observer-outbox.json", outbox_cap=200): ...

def observe(self, *, type, state_hash, meta=None): ...
def flush(self) -> int: ...
def status(self) -> dict: ...
def shutdown(self, *, drain=True) -> None: ...

status() returns the same shape as the Web JS form factor — phase, outbox depth, last zeqond, session id, agent string — so a unified dashboard can render every form factor with the same template.

shutdown(drain=True) blocks until the outbox is empty or 30 zeqonds elapse, whichever comes first. Use it in atexit:

import atexit; atexit.register(obs.shutdown)

Outbox — file-based, atomic-write

The Python form factor persists its outbox to ~/.zeq-observer-outbox.json by default. Every flush rewrites the file via the standard tempfile + os.replace atomic-rename pattern, so a process kill mid-flush cannot corrupt the file.

Cap defaults to 200 events; overflow drops oldest first and the next successful flush carries a synthetic outbox.overflow row with the dropped count in meta.dropped.

The same path can be shared across multiple processes on the same host — the file is flock-protected on Unix, msvcrt.locking on Windows. A multi-worker training loop emitting from each rank produces one ordered queue per host without coordination.

Background threads

The agent spins up two daemon threads in __init__:

  1. Flush thread — wakes every Zeqond (0.777 s), checks if the outbox is non-empty or the heartbeat is due, batches up to 50 events into one POST.
  2. Heartbeat thread — wakes every 8 zeqonds, enqueues a type=heartbeat row carrying outbox depth, last user-event zeqond, and the five HF signals.

Both threads are daemon=True so a sys.exit() will not hang on them. For graceful shutdown, call obs.shutdown() first.

Phase-locked encryption

When encrypt=True, the agent imports cryptography.hazmat.primitives.ciphers lazily and switches every emission to:

nonce = sha256(f"phase-lock|{zeqond}|{slug}".encode()).digest()[:12]
ct = AESGCM(key_bytes).encrypt(nonce, plaintext_json.encode(), aad=None)

The agent sends payload_ct (hex) instead of payload. The receiver recomputes the same nonce from (zeqond, slug) per R3 — Cipher + HZC. Tamper either field and the GCM auth tag fails on the server side.

key_bytes is derived once at startup from the state-machine key plus the canonical HMAC label "ZeqField/PhaseLockedNonce/v1", so the encryption key never appears on the wire and a leaked payload_ct cannot be replayed against the random-IV cipher.

Use cases — the three big ones

AI/ML training loops

Drop in next to your for epoch in range(N) loop and emit per-epoch state. The entangled state becomes a tamper-evident training journal — every loss, every checkpoint, every anomaly is hashlinked into a row whose zeqond proves when it happened.

for epoch, batch in enumerate(loader):
loss = train_step(batch)
obs.observe(
type = "epoch",
state_hash = sha256(f"{epoch}|{loss:.6f}".encode()).hexdigest(),
meta = {"epoch": epoch, "loss": float(loss)},
)

Robotics controllers

A 100 Hz control loop emits a type=control-tick row every n iterations. The cadence-drift signal HF8 becomes the SCADA-grade liveness proof: if the controller's wall-clock drifts past 0.1% of τ_z, the chain row carries the drift explicitly.

Scientific pipelines

A multi-stage pipeline emits type=stage-complete rows. Replaying the chain forward reconstructs the exact ordering of stages — even across machines — because each row's zeqond is monotonic on the canonical chain. R1's UNIQUE INDEX makes that guarantee load-bearing.

File map

  • apps/zeq-dev/public/zeq-observer-client.py — the agent
  • shared/api-core/src/routes/chain.ts — receiver
  • shared/api-core/src/lib/zeqField.ts — phase-locked cipher