PeakeCoin CrossChain Transfer - hive_bridge

@peakecoin · 2025-08-23 03:14 · Programming & Dev

Attempting to work on and debug our backend cross-chain swapping actions. We try and keep all code accessible.

hive_bridge/main.py

```python

from common.config import HIVE_BRIDGE_ACCOUNT, PEK_TOKEN_SYMBOL from common.logger import logger from common.db import init_db, get_db

from beem import Hive from beem.account import Account from beem.nodelist import NodeList import json as pyjson import re import signal import sys import time import sqlite3

HIVE_NODES_FALLBACK = [ "https://api.hive.blog", "https://anyx.io", "https://rpc.ausbit.dev", "https://hive.roelandp.nl", ]

---- graceful shutdown -------------------------------------------------------

_shutdown = False def _handle_sigint(signum, frame): global _shutdown _shutdown = True signal.signal(signal.SIGINT, _handle_sigint) signal.signal(signal.SIGTERM, _handle_sigint)

---- simple KV state (last processed block) ----------------------------------

def _get_state(key: str, default=None): conn = get_db() try: c = conn.cursor() c.execute("CREATE TABLE IF NOT EXISTS state (key TEXT PRIMARY KEY, value TEXT)") c.execute("SELECT value FROM state WHERE key = ?", (key,)) row = c.fetchone() return row[0] if row else default finally: conn.close()

def _set_state(key: str, value: str): conn = get_db() try: c = conn.cursor() c.execute("INSERT INTO state(key, value) VALUES(?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, str(value))) conn.commit() finally: conn.close()

---- event insert with dedupe ------------------------------------------------

def _insert_event(source: str, tx_id: str, amount: float, to_address: str): conn = get_db() try: c = conn.cursor() # Make sure your common.db init created a UNIQUE index like: # CREATE UNIQUE INDEX IF NOT EXISTS ux_events_source_txid ON events(source, tx_id); c.execute( "INSERT OR IGNORE INTO events (source, tx_id, amount, to_address, status) VALUES (?, ?, ?, ?, ?)", (source, tx_id, amount, to_address, 'pending') ) conn.commit() return c.rowcount > 0 finally: conn.close()

---- validation --------------------------------------------------------------

_polygon_addr_re = re.compile(r"^0x[a-fA-F0-9]{40}$") def _is_polygon_address(s: str) -> bool: return bool(s and _polygon_addr_re.match(s.strip()))

---- hive client w/ rotation -------------------------------------------------

def _make_hive(nodes): # Do not broadcast, short timeouts, auto-retries return Hive(node=nodes, nobroadcast=True, timeout=30, num_retries=3)

def monitor_hive_transfers(poll_interval_sec: int = 2, backoff_max: int = 16): logger.info("Monitoring Hive Engine for PEK → bridge transfers …")

# Build node list (prefer live list, fall back to static)
try:
    nl = NodeList()
    nl.update_nodes()
    hive_nodes = nl.get_nodes(normal=True, appbase=True) or HIVE_NODES_FALLBACK[:]
except Exception:
    hive_nodes = HIVE_NODES_FALLBACK[:]

hive = _make_hive(hive_nodes)
backoff = 1

# Start from last irreversible block to reduce dupes; persist progress
last_processed = _get_state("hive_last_processed_block")
try:
    dgp = hive.get_dynamic_global_properties()
    lib = dgp["last_irreversible_block_num"]
except Exception as e:
    logger.warning(f"Could not fetch DGP at start: {e}")
    lib = None

if last_processed is None:
    # First run: start a bit behind LIB (or head) to be safe
    try:
        head = hive.get_dynamic_global_properties()["head_block_number"]
    except Exception:
        head = None
    start_from = (lib or head or 0) - 5
    last_processed = max(start_from, 0)
    _set_state("hive_last_processed_block", last_processed)
    logger.info(f"Initialized last_processed to {last_processed}")

last_processed = int(last_processed)

while not _shutdown:
    try:
        dgp = hive.get_dynamic_global_properties()
        head = dgp["head_block_number"]
        lib = dgp["last_irreversible_block_num"]

        # Only process up to LIB to avoid reorgs
        target = lib

        if target <= last_processed:
            time.sleep(poll_interval_sec)
            continue

        # Process blocks (cap each loop to avoid huge catch-ups)
        end = min(last_processed + 200, target)
        for block_num in range(last_processed + 1, end + 1):
            block = hive.rpc.get_block(block_num)
            if not block:
                continue

            txs = block.get("transactions", [])
            for tx in txs:
                # NOTE: HIVE "transactions" don't always include tx_id here.
                # We derive tx_id from the operation receipt returned by nodes that provide it,
                # but some RPCs won't. Fallback: compose a pseudo-id for dedupe.
                tx_id = tx.get("transaction_id") or f"{block_num}:{hash(pyjson.dumps(tx, sort_keys=True))}"

                for op in tx.get("operations", []):
                    if not isinstance(op, (list, tuple)) or len(op) != 2:
                        continue
                    op_name, op_data = op
                    if op_name != "custom_json":
                        continue
                    if op_data.get("id") != "ssc-mainnet-hive":
                        continue

                    raw_json = op_data.get("json")
                    try:
                        data = pyjson.loads(raw_json) if isinstance(raw_json, str) else (raw_json or {})
                    except Exception as e:
                        logger.debug(f"custom_json parse error at block {block_num}: {e}")
                        continue

                    # Looking for: tokens.transfer to bridge account with PEK symbol
                    if data.get("contractName") == "tokens" and data.get("contractAction") == "transfer":
                        payload = data.get("contractPayload") or {}
                        if (
                            payload.get("symbol") == PEK_TOKEN_SYMBOL
                            and payload.get("to") == HIVE_BRIDGE_ACCOUNT
                        ):
                            # quantity can be string; memo should be polygon address
                            try:
                                amount = float(payload.get("quantity", 0))
                            except Exception:
                                amount = 0.0
                            memo = (payload.get("memo") or "").strip()

                            if not _is_polygon_address(memo):
                                logger.warning(
                                    f"Ignoring transfer (invalid Polygon memo): memo='{memo}' tx={tx_id} blk={block_num}"
                                )
                                continue

                            if _insert_event("hive", tx_id, amount, memo):
                                logger.info(
                                    f"Bridge event recorded: {amount:g} {PEK_TOKEN_SYMBOL} → {memo} (tx={tx_id}, blk={block_num})"
                                )
                            else:
                                logger.debug(f"Duplicate event ignored (tx={tx_id})")

            last_processed = block_num
            if block_num % 20 == 0:
                _set_state("hive_last_processed_block", last_processed)

        # Persist after each batch
        _set_state("hive_last_processed_block", last_processed)

        # Healthy pass → reset backoff
        backoff = 1
        time.sleep(poll_interval_sec)

    except Exception as e:
        logger.error(f"Hive monitor error: {e}")
        # Rotate nodes
        try:
            hive_nodes.append(hive_nodes.pop(0))
            hive = _make_hive(hive_nodes)
        except Exception as e2:
            logger.error(f"Node rotation failed: {e2}")

        # Backoff (capped)
        time.sleep(backoff)
        backoff = min(backoff * 2, backoff_max)

logger.info("Shutdown requested; saving state …")
_set_state("hive_last_processed_block", last_processed)

if name == "main": # Ensure DB has the structures we need. Your common.db should create: # - events(id PK, source TEXT, tx_id TEXT, amount REAL, to_address TEXT, status TEXT, created_at default now) # - UNIQUE index on (source, tx_id) # - state(key PRIMARY KEY, value TEXT) init_db() monitor_hive_transfers()

#hive-169321 #pimp #neoxian #waivio #proofofbrain #palnet #archon #ctp #lassecash #bee
Payout: 0.000 HBD
Votes: 7
More interactions (upvote, reblog, reply) coming soon.