Skip to content

Python — Examples

Query Example

from __future__ import annotations

import logging
import os

logging.basicConfig(
    level=logging.DEBUG,  # Min level
    format="%(asctime)s %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    handlers=[
        logging.FileHandler("sapi.log", mode="a", encoding="utf-8"),  # append to file
        logging.StreamHandler(),  # console
    ],
)

# Map native levels to Python logging
LEVEL_MAP = {
    1: logging.DEBUG,       # VERBOSE -> DEBUG
    2: logging.DEBUG,
    3: logging.INFO,        # INFORMATIONAL -> INFO
    4: logging.WARNING,     # WARNING -> WARNING
    5: logging.ERROR,       # ERROR -> ERROR
    6: logging.CRITICAL,    # FATAL -> CRITICAL
}

def log_message(level: int, cat: str, msg: str) -> None:
    """KXS log callback to Python logging.""" 
    py_level = LEVEL_MAP.get(level, logging.INFO)
    logging.getLogger(cat or "CLIENT").log(py_level, f"[{cat}] {msg}")


import pykx

from pysapi import (
    SAPI,
    SAPIConfig,
    KxsResponse,
    SAPITLS,
    SAPIRTConfig,
    SAPIRetryPolicy,
)


def read_binary_file_to_bytes(filepath: str) -> bytes:
    """Read a binary file as bytes.""" 
    with open(filepath, "rb") as f:
        return f.read()


def main() -> None:
    # Optional: direct the native loader / debugging
    os.environ["SAPI_DEBUG"] = "1"
    os.environ["SAPI_LIB_PATH"] = r"C:\pysapi\libs"

    # Set up the connection properties
    cfg = SAPIConfig(
        instanceName="pysapi",
        discoverySvcHosts=["your-discovery-node.com:20000"],
        discoverySvcHostsCount=1,
        advertiseHosts=["10.10.0.2"],
        advertiseHostsCount=1,
        listenPorts="10000-10200",
    )

    # TLS (optional)
    cfg.tls = SAPITLS(
        CAFile=r"benchmarks\\config\\certs\\ca-cert.pem",
        keyFile=r"benchmarks\\config\\certs\\client-private-key.pem",
        certFile=r"benchmarks\\config\\certs\\client-cert.pem",
        verifyClient=False,
        verifyServer=False,
    )
    cfg.useTLS = True

    # Real-time / bridge (optional)
    cfg.rtConfig = SAPIRTConfig(useBridge=True)

    # Create SAPI instance and register logging
    sapi = SAPI()
    sapi.register_log_handler(log_message)

    try:
        # Validate connection
        resp = KxsResponse()
        if not sapi.connect(cfg, resp):
            log_message(5, "CLIENT", "Unable to connect!")
            return

        log_message(3, "CLIENT", f"Connected to server! endpoint={sapi.get_tcp_endpoint()}")

        # Execute query returning dictionary
        dict_req = pykx.Dictionary({pykx.SymbolAtom("query"): pykx.CharVector("(`a`b`c)!(1 2 3)")})
        dict_resp = KxsResponse()

        ok = sapi.execute(req_corr=0, api_name=".kxs.execute", request=dict_req, header=None, response=dict_resp)
        if ok and dict_resp.rc == 0:
            if isinstance(dict_resp.header, pykx.Dictionary):
                for k, v in dict_resp.header.items():
                    log_message(3, "CLIENT", f"Header {str(k)}={v.py()}")
            if isinstance(dict_resp.payload, pykx.Dictionary):
                for k, v in dict_resp.payload.items():
                    log_message(3, "CLIENT", f"Payload {str(k)}={v.py()}")
        else:
            log_message(5, "CLIENT", f"RC code not OK: rc={dict_resp.rc} ac={dict_resp.ac} ai={dict_resp.ai}")

        # Execute query returning table
        flip_req = pykx.Dictionary({pykx.SymbolAtom("query"): pykx.CharVector("10#enlist (`a`b`c)!(1 2 3)")})
        flip_resp = KxsResponse()

        ok = sapi.execute(req_corr=0, api_name=".kxs.execute", request=flip_req, header=None, response=flip_resp)
        if ok and flip_resp.rc == 0 and isinstance(flip_resp.payload, pykx.Table):
            for col in flip_resp.payload.keys():
                vals = flip_resp.payload[col]
                log_message(3, "CLIENT", f"{col.py()} = {vals.py()}")
        else:
            log_message(5, "CLIENT", f"Table query failed: rc={flip_resp.rc}")

    finally:
        sapi.disconnect()


if __name__ == "__main__":
    main()

Publish Example

from __future__ import annotations

import logging
import os

# Logging setup
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    handlers=[logging.FileHandler("sapi.log", "a", "utf-8"), logging.StreamHandler()],
)
LEVEL_MAP = {1: logging.DEBUG, 2: logging.DEBUG, 3: logging.INFO, 4: logging.WARNING, 5: logging.ERROR, 6: logging.CRITICAL}

def log_message(level: int, cat: str, msg: str) -> None:
    logging.getLogger(cat or "CLIENT").log(LEVEL_MAP.get(level, logging.INFO), f"[{cat}] {msg}")

os.environ["PYKX_UNLICENSED"] = "true"
import pykx

from pysapi import SAPI, SAPIConfig, KxsResponse, SAPIRTConfig, SAPIRetryPolicy

def read_binary_file_to_bytes(filepath: str) -> bytes:
    with open(filepath, "rb") as f:
        return f.read()

def main() -> None:
    cfg = SAPIConfig(
        instanceName="pysapi",
        discoverySvcHosts=["your-discovery-node.com:20000"],
        discoverySvcHostsCount=1,
        listenPorts="10000-10200",
        rtConfig=SAPIRTConfig(useBridge=True),
    )

    sapi = SAPI()
    sapi.register_log_handler(log_message)

    # Start connection
    response = KxsResponse()
    if not sapi.connect(cfg, response):
        log_message(5, "CLIENT", "Unable to connect!")
        return

    log_message(3, "CLIENT", "Connected to server!")

    pub_resp = KxsResponse()
    publisher = sapi.register_publisher("feed1", pub_resp)
    if not publisher:
        log_message(5, "CLIENT", f"RegisterPublisher failed rc={pub_resp.rc}")
        return

    payload_bytes = read_binary_file_to_bytes(r"data\\acme_publish_payload.bin")
    payload_obj = pykx.deserialize(payload_bytes)

    # Asynchronous publish
    async_resp = KxsResponse()
    ok = sapi.publish(publisher, req_corr=0, msg_type=100, payload=payload_obj, header=None, retry_policy=None, response=async_resp)
    if not ok:
        log_message(4, "CLIENT", f"Async publish failed rc={async_resp.rc} ac={async_resp.ac} ai={async_resp.ai}")

    # Synchronous publish
    retry = SAPIRetryPolicy(maxAttempts=3, timeout=10_000, retryInterval=10_000, retryScaling=1)
    sync_resp = KxsResponse()
    ok = sapi.publish(publisher, req_corr=0, msg_type=100, payload=payload_obj, header=None, retry_policy=retry, response=sync_resp)
    if not ok:
        log_message(4, "CLIENT", f"Sync publish failed rc={sync_resp.rc} ac={sync_resp.ac} ai={sync_resp.ai}")

    sapi.sapi_unregister_publisher(publisher, KxsResponse())
    sapi.disconnect()

if __name__ == "__main__":
    main()