Python — Examples
Query Example
from __future__ import annotations
import logging
import os
logging.basicConfig(
level=logging.DEBUG, # Min level
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.FileHandler("sapi.log", mode="a", encoding="utf-8"), # append to file
logging.StreamHandler(), # console
],
)
# Map native levels to Python logging
LEVEL_MAP = {
1: logging.DEBUG, # VERBOSE -> DEBUG
2: logging.DEBUG,
3: logging.INFO, # INFORMATIONAL -> INFO
4: logging.WARNING, # WARNING -> WARNING
5: logging.ERROR, # ERROR -> ERROR
6: logging.CRITICAL, # FATAL -> CRITICAL
}
def log_message(level: int, cat: str, msg: str) -> None:
"""KXS log callback to Python logging."""
py_level = LEVEL_MAP.get(level, logging.INFO)
logging.getLogger(cat or "CLIENT").log(py_level, f"[{cat}] {msg}")
import pykx
from pysapi import (
SAPI,
SAPIConfig,
KxsResponse,
SAPITLS,
SAPIRTConfig,
SAPIRetryPolicy,
)
def read_binary_file_to_bytes(filepath: str) -> bytes:
"""Read a binary file as bytes."""
with open(filepath, "rb") as f:
return f.read()
def main() -> None:
# Optional: direct the native loader / debugging
os.environ["SAPI_DEBUG"] = "1"
os.environ["SAPI_LIB_PATH"] = r"C:\pysapi\libs"
# Set up the connection properties
cfg = SAPIConfig(
instanceName="pysapi",
discoverySvcHosts=["your-discovery-node.com:20000"],
discoverySvcHostsCount=1,
advertiseHosts=["10.10.0.2"],
advertiseHostsCount=1,
listenPorts="10000-10200",
)
# TLS (optional)
cfg.tls = SAPITLS(
CAFile=r"benchmarks\\config\\certs\\ca-cert.pem",
keyFile=r"benchmarks\\config\\certs\\client-private-key.pem",
certFile=r"benchmarks\\config\\certs\\client-cert.pem",
verifyClient=False,
verifyServer=False,
)
cfg.useTLS = True
# Real-time / bridge (optional)
cfg.rtConfig = SAPIRTConfig(useBridge=True)
# Create SAPI instance and register logging
sapi = SAPI()
sapi.register_log_handler(log_message)
try:
# Validate connection
resp = KxsResponse()
if not sapi.connect(cfg, resp):
log_message(5, "CLIENT", "Unable to connect!")
return
log_message(3, "CLIENT", f"Connected to server! endpoint={sapi.get_tcp_endpoint()}")
# Execute query returning dictionary
dict_req = pykx.Dictionary({pykx.SymbolAtom("query"): pykx.CharVector("(`a`b`c)!(1 2 3)")})
dict_resp = KxsResponse()
ok = sapi.execute(req_corr=0, api_name=".kxs.execute", request=dict_req, header=None, response=dict_resp)
if ok and dict_resp.rc == 0:
if isinstance(dict_resp.header, pykx.Dictionary):
for k, v in dict_resp.header.items():
log_message(3, "CLIENT", f"Header {str(k)}={v.py()}")
if isinstance(dict_resp.payload, pykx.Dictionary):
for k, v in dict_resp.payload.items():
log_message(3, "CLIENT", f"Payload {str(k)}={v.py()}")
else:
log_message(5, "CLIENT", f"RC code not OK: rc={dict_resp.rc} ac={dict_resp.ac} ai={dict_resp.ai}")
# Execute query returning table
flip_req = pykx.Dictionary({pykx.SymbolAtom("query"): pykx.CharVector("10#enlist (`a`b`c)!(1 2 3)")})
flip_resp = KxsResponse()
ok = sapi.execute(req_corr=0, api_name=".kxs.execute", request=flip_req, header=None, response=flip_resp)
if ok and flip_resp.rc == 0 and isinstance(flip_resp.payload, pykx.Table):
for col in flip_resp.payload.keys():
vals = flip_resp.payload[col]
log_message(3, "CLIENT", f"{col.py()} = {vals.py()}")
else:
log_message(5, "CLIENT", f"Table query failed: rc={flip_resp.rc}")
finally:
sapi.disconnect()
if __name__ == "__main__":
main()
Publish Example
from __future__ import annotations
import logging
import os
# Logging setup
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.FileHandler("sapi.log", "a", "utf-8"), logging.StreamHandler()],
)
LEVEL_MAP = {1: logging.DEBUG, 2: logging.DEBUG, 3: logging.INFO, 4: logging.WARNING, 5: logging.ERROR, 6: logging.CRITICAL}
def log_message(level: int, cat: str, msg: str) -> None:
logging.getLogger(cat or "CLIENT").log(LEVEL_MAP.get(level, logging.INFO), f"[{cat}] {msg}")
os.environ["PYKX_UNLICENSED"] = "true"
import pykx
from pysapi import SAPI, SAPIConfig, KxsResponse, SAPIRTConfig, SAPIRetryPolicy
def read_binary_file_to_bytes(filepath: str) -> bytes:
with open(filepath, "rb") as f:
return f.read()
def main() -> None:
cfg = SAPIConfig(
instanceName="pysapi",
discoverySvcHosts=["your-discovery-node.com:20000"],
discoverySvcHostsCount=1,
listenPorts="10000-10200",
rtConfig=SAPIRTConfig(useBridge=True),
)
sapi = SAPI()
sapi.register_log_handler(log_message)
# Start connection
response = KxsResponse()
if not sapi.connect(cfg, response):
log_message(5, "CLIENT", "Unable to connect!")
return
log_message(3, "CLIENT", "Connected to server!")
pub_resp = KxsResponse()
publisher = sapi.register_publisher("feed1", pub_resp)
if not publisher:
log_message(5, "CLIENT", f"RegisterPublisher failed rc={pub_resp.rc}")
return
payload_bytes = read_binary_file_to_bytes(r"data\\acme_publish_payload.bin")
payload_obj = pykx.deserialize(payload_bytes)
# Asynchronous publish
async_resp = KxsResponse()
ok = sapi.publish(publisher, req_corr=0, msg_type=100, payload=payload_obj, header=None, retry_policy=None, response=async_resp)
if not ok:
log_message(4, "CLIENT", f"Async publish failed rc={async_resp.rc} ac={async_resp.ac} ai={async_resp.ai}")
# Synchronous publish
retry = SAPIRetryPolicy(maxAttempts=3, timeout=10_000, retryInterval=10_000, retryScaling=1)
sync_resp = KxsResponse()
ok = sapi.publish(publisher, req_corr=0, msg_type=100, payload=payload_obj, header=None, retry_policy=retry, response=sync_resp)
if not ok:
log_message(4, "CLIENT", f"Sync publish failed rc={sync_resp.rc} ac={sync_resp.ac} ai={sync_resp.ai}")
sapi.sapi_unregister_publisher(publisher, KxsResponse())
sapi.disconnect()
if __name__ == "__main__":
main()