I have been attempting to make a bot to monitor @peakecoin.matic transactions to facilitate transactions to the Polygon blockchain.
I can not figure out a way to make this work @thecrazygm, @eoinstant, @powerpaul, @txracer, @aggroed, @cryptomancer I could use some of that brain power, lol. I am stumped, or exhausted.
To operate the peakecoin monitor app you have to input
python pek_monitor.py monitor
or
python pek_monitor.py all
#!/usr/bin/env python3
import time
import json
import argparse
from typing import Dict, List, Tuple
import requests
ACCOUNT = "peakecoin.matic"
TOKEN_SYMBOL = "PEK"
POLL_SECONDS = 10
REQUEST_TIMEOUT = 12
HE_CONTRACTS_RPC = "https://api.hive-engine.com/rpc/contracts" # for find/aggregate
HE_BLOCKCHAIN_RPC = "https://api.hive-engine.com/rpc" # for blockchain.getBlock, etc.
# ---------- Low-level RPC helpers ----------
def he_contracts_find(table: str, query: Dict, limit: int = 100, offset: int = 0) -> List[Dict]:
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "tokens",
"table": table,
"query": query,
"limit": limit,
"offset": offset,
"indexes": []
}
}
for attempt in range(3):
try:
r = requests.post(HE_CONTRACTS_RPC, json=payload, timeout=REQUEST_TIMEOUT)
r.raise_for_status()
data = r.json()
return data.get("result", [])
except Exception as e:
if attempt == 2:
raise
time.sleep(1.5 * (attempt + 1))
def he_blockchain_rpc(method: str, params) -> Dict:
payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
for attempt in range(3):
try:
r = requests.post(HE_BLOCKCHAIN_RPC, json=payload, timeout=REQUEST_TIMEOUT)
r.raise_for_status()
return r.json()
except Exception:
if attempt == 2:
raise
time.sleep(1.5 * (attempt + 1))
# ---------- History (tokensHistory) ----------
def fetch_transfers_involving(account: str, symbol: str, limit_each: int = 500) -> List[Dict]:
"""
Pull PEK transfers involving account from tokensHistory by querying both directions:
- to == account
- from == account
Filter by operation='transfer' and symbol match.
"""
q_common = {"operation": "transfer", "symbol": symbol}
# Pull "to" transfers
to_rows = he_contracts_find(
table="tokensHistory",
query={**q_common, "to": account},
limit=limit_each,
offset=0
) or []
# Pull "from" transfers
from_rows = he_contracts_find(
table="tokensHistory",
query={**q_common, "from": account},
limit=limit_each,
offset=0
) or []
rows = (to_rows + from_rows)
# Defensive: ensure only our symbol & op
rows = [tx for tx in rows if tx.get("operation") == "transfer" and tx.get("symbol") == symbol]
# Sort newest first by timestamp
rows.sort(key=lambda x: int(x.get("timestamp", 0)), reverse=True)
return rows
def print_recent_7d(account: str, symbol: str, limit_each: int = 500):
now = int(time.time())
seven_days_ago = now - 7 * 24 * 60 * 60
rows = fetch_transfers_involving(account, symbol, limit_each=limit_each)
recent = [tx for tx in rows if int(tx.get("timestamp", 0)) >= seven_days_ago]
print(f"{symbol} transfers involving {account} in the last 7 days (count={len(recent)}):")
for tx in recent:
ts = int(tx.get("timestamp", 0))
direction = "IN " if tx.get("to") == account else "OUT"
print(
f"[{ts}] {direction} qty={tx.get('quantity')} from={tx.get('from')} to={tx.get('to')} "
f"memo={tx.get('memo','')} txid={tx.get('transactionId') or tx.get('txId')}"
)
def print_all_history(account: str, symbol: str, limit_each: int = 200):
rows = fetch_transfers_involving(account, symbol, limit_each=limit_each)
print(f"All {symbol} transfers involving {account} (up to ~{2*limit_each} rows):")
for tx in rows:
ts = int(tx.get("timestamp", 0))
direction = "IN " if tx.get("to") == account else "OUT"
print(
f"[{ts}] {direction} qty={tx.get('quantity')} from={tx.get('from')} to={tx.get('to')} "
f"memo={tx.get('memo','')} txid={tx.get('transactionId') or tx.get('txId')}"
)
# ---------- Blockchain debug ----------
def get_latest_block_number() -> int:
"""
Try multiple shapes:
- {"result":{"blockNumber": N}}
- {"blockNumber": N}
- {"result": N}
"""
resp = he_blockchain_rpc("blockchain.getLatestBlock", [])
# Shape #1
if isinstance(resp, dict):
if "result" in resp:
r = resp["result"]
if isinstance(r, dict) and "blockNumber" in r:
return int(r["blockNumber"])
if isinstance(r, int):
return int(r)
# Some nodes respond with flat field (rare)
if "blockNumber" in resp:
return int(resp["blockNumber"])
raise RuntimeError(f"Unexpected latest block response shape: {json.dumps(resp, ensure_ascii=False)[:4000]}")
def get_block(block_num: int) -> Dict:
resp = he_blockchain_rpc("blockchain.getBlock", [block_num])
return resp.get("result", resp)
def debug_blocks(start: int, end: int):
"""
Print raw block JSON for an inclusive range [start, end].
"""
print(f"DEBUG: dumping raw blocks {start}..{end}")
for b in range(start, end + 1):
blk = get_block(b)
print(f"\n=== BLOCK {b} ===")
print(json.dumps(blk, indent=2, ensure_ascii=False)[:200000]) # cap very large outputs
# ---------- Monitor (optional streaming) ----------
def monitor_incoming(account: str, symbol: str):
"""
Poll tokensHistory for new INCOMING transfers only (to == account).
Uses a timestamp watermark + (txid, logIndex) dedupe.
"""
print(f"Monitoring {account} for received {symbol} transfers...")
seen: set[Tuple[str, int]] = set()
last_seen_ts = 0
while True:
try:
rows = he_contracts_find(
table="tokensHistory",
query={"operation": "transfer", "symbol": symbol, "to": account},
limit=100,
offset=0
) or []
rows.sort(key=lambda x: int(x.get("timestamp", 0)), reverse=True)
for tx in rows:
ts = int(tx.get("timestamp", 0))
if ts <= last_seen_ts:
continue
txid = tx.get("transactionId") or tx.get("txId")
idx = int(tx.get("logIndex", 0)) if "logIndex" in tx else 0
key = (txid or "", idx)
if not txid or key in seen:
continue
print(
f"[{ts}] IN qty={tx.get('quantity')} from={tx.get('from')} "
f"memo={tx.get('memo','')} txid={txid}"
)
seen.add(key)
if rows:
newest_ts = int(rows[0].get("timestamp", last_seen_ts))
if newest_ts > last_seen_ts:
last_seen_ts = newest_ts
# prune the dedupe set occasionally
if len(seen) > 5000:
seen = set(list(seen)[-2000:])
except Exception as e:
print(f"ERROR: {e}")
time.sleep(POLL_SECONDS)
# ---------- CLI ----------
def main():
ap = argparse.ArgumentParser(description="Hive-Engine PEK history + block debug (single file).")
ap.add_argument("--account", default=ACCOUNT, help="account to inspect (default: peakecoin.matic)")
ap.add_argument("--symbol", default=TOKEN_SYMBOL, help="token symbol (default: PEK)")
sub = ap.add_subparsers(dest="cmd", required=True)
sub.add_parser("recent7d", help="print PEK transfers involving account from the last 7 days") \
.add_argument("--limit-each", type=int, default=500)
sub.add_parser("all", help="print all recent PEK transfers involving account (bounded by API limits)") \
.add_argument("--limit-each", type=int, default=200)
mon = sub.add_parser("monitor", help="stream INCOMING transfers to the account")
mon.add_argument("--poll", type=int, default=10)
blk = sub.add_parser("blocks", help="dump raw blocks in a range (inclusive)")
blk.add_argument("--start", type=int, required=True)
blk.add_argument("--end", type=int, required=True)
latest = sub.add_parser("latest", help="print latest block number")
args = ap.parse_args()
if args.cmd == "recent7d":
print_recent_7d(args.account, args.symbol, limit_each=args.limit_each)
elif args.cmd == "all":
print_all_history(args.account, args.symbol, limit_each=args.limit_each)
elif args.cmd == "monitor":
global POLL_SECONDS
POLL_SECONDS = args.poll
monitor_incoming(args.account, args.symbol)
elif args.cmd == "blocks":
debug_blocks(args.start, args.end)
elif args.cmd == "latest":
n = get_latest_block_number()
print(f"Latest block number: {n}")
if __name__ == "__main__":
main()