PeakeCoin Matic - pek_monitor

@paulmoon410 · 2025-08-23 16:29 · Synergy Builders

I have been attempting to make a bot to monitor @peakecoin.matic transactions to facilitate transactions to the Polygon blockchain.

I can not figure out a way to make this work @thecrazygm, @eoinstant, @powerpaul, @txracer, @aggroed, @cryptomancer I could use some of that brain power, lol. I am stumped, or exhausted.

To operate the peakecoin monitor app you have to input

 python pek_monitor.py monitor
or
 python pek_monitor.py all

#!/usr/bin/env python3
import time
import json
import argparse
from typing import Dict, List, Tuple
import requests

ACCOUNT = "peakecoin.matic"
TOKEN_SYMBOL = "PEK"
POLL_SECONDS = 10
REQUEST_TIMEOUT = 12
HE_CONTRACTS_RPC = "https://api.hive-engine.com/rpc/contracts"  # for find/aggregate
HE_BLOCKCHAIN_RPC = "https://api.hive-engine.com/rpc"           # for blockchain.getBlock, etc.

# ---------- Low-level RPC helpers ----------
def he_contracts_find(table: str, query: Dict, limit: int = 100, offset: int = 0) -> List[Dict]:
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "tokens",
            "table": table,
            "query": query,
            "limit": limit,
            "offset": offset,
            "indexes": []
        }
    }
    for attempt in range(3):
        try:
            r = requests.post(HE_CONTRACTS_RPC, json=payload, timeout=REQUEST_TIMEOUT)
            r.raise_for_status()
            data = r.json()
            return data.get("result", [])
        except Exception as e:
            if attempt == 2:
                raise
            time.sleep(1.5 * (attempt + 1))

def he_blockchain_rpc(method: str, params) -> Dict:
    payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
    for attempt in range(3):
        try:
            r = requests.post(HE_BLOCKCHAIN_RPC, json=payload, timeout=REQUEST_TIMEOUT)
            r.raise_for_status()
            return r.json()
        except Exception:
            if attempt == 2:
                raise
            time.sleep(1.5 * (attempt + 1))

# ---------- History (tokensHistory) ----------
def fetch_transfers_involving(account: str, symbol: str, limit_each: int = 500) -> List[Dict]:
    """
    Pull PEK transfers involving account from tokensHistory by querying both directions:
    - to == account
    - from == account
    Filter by operation='transfer' and symbol match.
    """
    q_common = {"operation": "transfer", "symbol": symbol}

    # Pull "to" transfers
    to_rows = he_contracts_find(
        table="tokensHistory",
        query={**q_common, "to": account},
        limit=limit_each,
        offset=0
    ) or []

    # Pull "from" transfers
    from_rows = he_contracts_find(
        table="tokensHistory",
        query={**q_common, "from": account},
        limit=limit_each,
        offset=0
    ) or []

    rows = (to_rows + from_rows)
    # Defensive: ensure only our symbol & op
    rows = [tx for tx in rows if tx.get("operation") == "transfer" and tx.get("symbol") == symbol]
    # Sort newest first by timestamp
    rows.sort(key=lambda x: int(x.get("timestamp", 0)), reverse=True)
    return rows

def print_recent_7d(account: str, symbol: str, limit_each: int = 500):
    now = int(time.time())
    seven_days_ago = now - 7 * 24 * 60 * 60
    rows = fetch_transfers_involving(account, symbol, limit_each=limit_each)
    recent = [tx for tx in rows if int(tx.get("timestamp", 0)) >= seven_days_ago]

    print(f"{symbol} transfers involving {account} in the last 7 days (count={len(recent)}):")
    for tx in recent:
        ts = int(tx.get("timestamp", 0))
        direction = "IN " if tx.get("to") == account else "OUT"
        print(
            f"[{ts}] {direction} qty={tx.get('quantity')} from={tx.get('from')} to={tx.get('to')} "
            f"memo={tx.get('memo','')} txid={tx.get('transactionId') or tx.get('txId')}"
        )

def print_all_history(account: str, symbol: str, limit_each: int = 200):
    rows = fetch_transfers_involving(account, symbol, limit_each=limit_each)
    print(f"All {symbol} transfers involving {account} (up to ~{2*limit_each} rows):")
    for tx in rows:
        ts = int(tx.get("timestamp", 0))
        direction = "IN " if tx.get("to") == account else "OUT"
        print(
            f"[{ts}] {direction} qty={tx.get('quantity')} from={tx.get('from')} to={tx.get('to')} "
            f"memo={tx.get('memo','')} txid={tx.get('transactionId') or tx.get('txId')}"
        )

# ---------- Blockchain debug ----------
def get_latest_block_number() -> int:
    """
    Try multiple shapes:
    - {"result":{"blockNumber": N}}
    - {"blockNumber": N}
    - {"result": N}
    """
    resp = he_blockchain_rpc("blockchain.getLatestBlock", [])
    # Shape #1
    if isinstance(resp, dict):
        if "result" in resp:
            r = resp["result"]
            if isinstance(r, dict) and "blockNumber" in r:
                return int(r["blockNumber"])
            if isinstance(r, int):
                return int(r)
        # Some nodes respond with flat field (rare)
        if "blockNumber" in resp:
            return int(resp["blockNumber"])
    raise RuntimeError(f"Unexpected latest block response shape: {json.dumps(resp, ensure_ascii=False)[:4000]}")

def get_block(block_num: int) -> Dict:
    resp = he_blockchain_rpc("blockchain.getBlock", [block_num])
    return resp.get("result", resp)

def debug_blocks(start: int, end: int):
    """
    Print raw block JSON for an inclusive range [start, end].
    """
    print(f"DEBUG: dumping raw blocks {start}..{end}")
    for b in range(start, end + 1):
        blk = get_block(b)
        print(f"\n=== BLOCK {b} ===")
        print(json.dumps(blk, indent=2, ensure_ascii=False)[:200000])  # cap very large outputs

# ---------- Monitor (optional streaming) ----------
def monitor_incoming(account: str, symbol: str):
    """
    Poll tokensHistory for new INCOMING transfers only (to == account).
    Uses a timestamp watermark + (txid, logIndex) dedupe.
    """
    print(f"Monitoring {account} for received {symbol} transfers...")
    seen: set[Tuple[str, int]] = set()
    last_seen_ts = 0

    while True:
        try:
            rows = he_contracts_find(
                table="tokensHistory",
                query={"operation": "transfer", "symbol": symbol, "to": account},
                limit=100,
                offset=0
            ) or []
            rows.sort(key=lambda x: int(x.get("timestamp", 0)), reverse=True)

            for tx in rows:
                ts = int(tx.get("timestamp", 0))
                if ts <= last_seen_ts:
                    continue
                txid = tx.get("transactionId") or tx.get("txId")
                idx = int(tx.get("logIndex", 0)) if "logIndex" in tx else 0
                key = (txid or "", idx)
                if not txid or key in seen:
                    continue

                print(
                    f"[{ts}] IN qty={tx.get('quantity')} from={tx.get('from')} "
                    f"memo={tx.get('memo','')} txid={txid}"
                )
                seen.add(key)

            if rows:
                newest_ts = int(rows[0].get("timestamp", last_seen_ts))
                if newest_ts > last_seen_ts:
                    last_seen_ts = newest_ts

            # prune the dedupe set occasionally
            if len(seen) > 5000:
                seen = set(list(seen)[-2000:])

        except Exception as e:
            print(f"ERROR: {e}")

        time.sleep(POLL_SECONDS)

# ---------- CLI ----------
def main():
    ap = argparse.ArgumentParser(description="Hive-Engine PEK history + block debug (single file).")
    ap.add_argument("--account", default=ACCOUNT, help="account to inspect (default: peakecoin.matic)")
    ap.add_argument("--symbol", default=TOKEN_SYMBOL, help="token symbol (default: PEK)")

    sub = ap.add_subparsers(dest="cmd", required=True)

    sub.add_parser("recent7d", help="print PEK transfers involving account from the last 7 days") \
        .add_argument("--limit-each", type=int, default=500)

    sub.add_parser("all", help="print all recent PEK transfers involving account (bounded by API limits)") \
        .add_argument("--limit-each", type=int, default=200)

    mon = sub.add_parser("monitor", help="stream INCOMING transfers to the account")
    mon.add_argument("--poll", type=int, default=10)

    blk = sub.add_parser("blocks", help="dump raw blocks in a range (inclusive)")
    blk.add_argument("--start", type=int, required=True)
    blk.add_argument("--end", type=int, required=True)

    latest = sub.add_parser("latest", help="print latest block number")

    args = ap.parse_args()

    if args.cmd == "recent7d":
        print_recent_7d(args.account, args.symbol, limit_each=args.limit_each)
    elif args.cmd == "all":
        print_all_history(args.account, args.symbol, limit_each=args.limit_each)
    elif args.cmd == "monitor":
        global POLL_SECONDS
        POLL_SECONDS = args.poll
        monitor_incoming(args.account, args.symbol)
    elif args.cmd == "blocks":
        debug_blocks(args.start, args.end)
    elif args.cmd == "latest":
        n = get_latest_block_number()
        print(f"Latest block number: {n}")

if __name__ == "__main__":
    main()
#hive-186392 #pimp #neoxian #waivio #proofofbrain #palnet #archon #ctp #bee #leo
Payout: 0.000 HBD
Votes: 11
More interactions (upvote, reblog, reply) coming soon.