閃電網路通道程式設計進階教學:LND、Core Lightning API 完整攻略

深入探討閃電網路通道的程式設計,從 LND 和 Core Lightning 的 API 入手,提供可直接運用的 Python 程式碼範例,涵蓋 HTLC 管理、通道監控、流動性優化與費用計算等進階主題。

閃電網路通道程式設計進階教學:LND、Core Lightning API 完整攻略

閃電網路(Lightning Network)是比特幣第二層擴容解決方案的核心,而通道管理是其最重要的技術基礎。本篇文章將深入探討閃電網路通道的程式設計,從 LND(Lightning Network Daemon)和 Core Lightning 兩個主流實現的 API 入手,提供可直接運用的程式碼範例,幫助開發者建立自己的閃電網路應用。

通道建立與管理基礎

LND 節點初始化與連接

LND 是由 Lightning Labs 開發的閃電網路節點實現,採用 gRPC API 進行通訊。以下是完整的節點初始化和通道建立流程:

import codecs
import grpc
import os
from lnd import lnrpc
from lnd.lnrpc import lightning_pb2 as ln
from lnd.lnrpc import lightning_pb2_grpc as lnrpc

class LightningNode:
    """
    閃電網路節點管理類別
    封裝 LND API 的常用操作
    """
    
    def __init__(self, cert_path, macaroon_path, host='localhost:10009'):
        """
        初始化 LND 連接
        
        參數:
            cert_path: LND TLS 憑證路徑
            macaroon_path: LND macaroon 路徑
            host: LND gRPC 服務地址
        """
        self.cert_path = cert_path
        self.macaroon_path = macaroon_path
        self.host = host
        
        # 建立 gRPC 通道
        cert = open(cert_path, 'rb').read()
        credentials = grpc.ssl_channel_credentials(cert)
        
        # 讀取 macaroon 用於認證
        with open(macaroon_path, 'rb') as f:
            macaroon = f.read()
        metadata = [('macaroon', codecs.encode(macaroon, 'hex').decode())]
        
        self.channel = grpc.secure_channel(
            host, 
            credentials,
            options=[('grpc.max_receive_message_length', 50 * 1024 * 1024)]
        )
        self.stub = lnrpc.LightningStub(self.channel)
    
    def get_info(self):
        """
        獲取節點資訊
        """
        request = ln.GetInfoRequest()
        response = self.stub.GetInfo(request, metadata=[('macaroon', self._get_macaroons())])
        
        return {
            'version': response.version,
            'identity_pubkey': response.identity_pubkey,
            'alias': response.alias,
            'num_peers': response.num_peers,
            'num_channels': response.num_channels,
            'num_pending_channels': response.num_pending_channels,
            'total_network_capacity': response.total_network_capacity,
            'best_header_timestamp': response.best_header_timestamp,
            'synced_to_chain': response.synced_to_chain,
            'synced_to_graph': response.synced_to_graph
        }
    
    def _get_macaroons(self):
        """讀取 macaroon"""
        with open(self.macaroon_path, 'rb') as f:
            return codecs.encode(f.read(), 'hex').decode()
    
    def list_channels(self):
        """
        列出所有通道
        """
        request = ln.ListChannelsRequest()
        response = self.stub.ListChannels(request, metadata=[('macaroon', self._get_macaroons())])
        
        channels = []
        for chan in response.channels:
            channels.append({
                'channel_point': chan.channel_point,
                'chan_id': chan.chan_id,
                'capacity': chan.capacity,
                'local_balance': chan.local_balance,
                'remote_balance': chan.remote_balance,
                'commit_fee': chan.commit_fee,
                'commit_weight': chan.commit_weight,
                'fee_per_kw': chan.fee_per_kw,
                'total_satoshis_sent': chan.total_satoshis_sent,
                'total_satoshis_received': chan.total_satoshis_received,
                'num_updates': chan.num_updates,
                'pending_htlcs': chan.num_pending_htlcs,
                'static_remotekey': chan.static_remotekey,
                'initiator': chan.initiator
            })
        
        return channels
    
    def list_pending_channels(self):
        """
        列出待處理通道
        """
        request = ln.PendingChannelsRequest()
        response = self.stub.PendingChannels(request, metadata=[('macaroon', self._get_macaroons())])
        
        pending = {
            'total_limbo_balance': response.total_limbo_balance,
            'pending_open_channels': [],
            'pending_closing_channels': [],
            'pending_force_closing_channels': [],
            'waiting_close_channels': []
        }
        
        for chan in response.pending_open_channels:
            pending['pending_open_channels'].append({
                'channel_point': chan.channel_point,
                'capacity': chan.capacity,
                'local_balance': chan.local_balance,
                'remote_balance': chan.remote_balance,
                'commit_fee': chan.commit_fee,
                'confirmation_height': chan.confirmation_height,
                'commit_type': ln.CommitmentType.Name(chan.commit_type)
            })
        
        return pending
    
    def connect_peer(self, pubkey, address):
        """
        連接到其他節點
        
        參數:
            pubkey: 對方節點的公鑰
            address: 對方節點的地址 (如 "1.2.3.4:9735")
        """
        request = ln.ConnectPeerRequest(
            addr=ln.LightningAddress(pubkey=pubkey, host=address),
            perm=False,
            timeout=30
        )
        
        try:
            response = self.stub.ConnectPeer(request, metadata=[('macaroon', self._get_macaroons())])
            return {'success': True, 'peer': response.peer}
        except grpc.RpcError as e:
            return {'success': False, 'error': e.details()}
    
    def open_channel(self, pubkey, local_amount, push_amount=0, sat_per_byte=1):
        """
        開啟通道
        
        參數:
            pubkey: 對方節點的公鑰
            local_amount: 本地節點出資額(satoshi)
            push_amount: 推送給對方的金額(satoshi)
            sat_per_byte: 每位元組的手續費率(satoshi)
        """
        # 首先獲取節點地址
        request = ln.NodeInfoRequest(pub_key=pubkey, include_channels=False)
        try:
            node_info = self.stub.GetNodeInfo(request, metadata=[('macaroon', self._get_macaroons())])
            node_addrs = node_info.node.addresses
        except:
            node_addrs = []
        
        # 估算手續費
        # 通道建立交易約 250 vbytes
        estimated_fee = int(250 * sat_per_byte)
        
        request = ln.OpenChannelRequest(
            node_pubkey=bytes.fromhex(pubkey),
            local_funding_amount=local_amount,
            push_satoshis=push_amount,
            spend_unconfirmed=False,
            min_htlc_msat=1,
            remote_csv_delay=144,
            private=False,
            # 10 分鐘後超時
            timeout=600,
            # 手續費率
            sat_per_vbyte=sat_per_byte
        )
        
        # 發送開啟通道請求(返回的是迭代器)
        response_stream = self.stub.OpenChannel(request, metadata=[('macaroon', self._get_macaroons())])
        
        # 處理通道開啟狀態
        pending_channel = {}
        for response in response_stream:
            if hasattr(response, 'chan_pending'):
                pending_channel['status'] = 'pending'
                pending_channel['txid'] = response.chan_pending.txid
                pending_channel['output_index'] = response.chan_pending.output_index
            elif hasattr(response, 'confirmation'):
                pending_channel['status'] = 'confirmed'
                pending_channel['block_height'] = response.confirmation.block_height
        
        return pending_channel
    
    def close_channel(self, channel_point, force=False):
        """
        關閉通道
        
        參數:
            channel_point: 通道的輸出點 (格式: "txid:index")
            force: 是否強制關閉
        """
        parts = channel_point.split(':')
        txid = bytes.fromhex(parts[0])
        output_index = int(parts[1])
        
        request = ln.CloseChannelRequest(
            channel_point=ln.ChannelPoint(
                funding_txid_str=txid,
                output_index=output_index
            ),
            force=force,
            # 通道關閉手續費目標
            sat_per_vbyte=1
        )
        
        response_stream = self.stub.CloseChannel(request, metadata=[('macaroon', self._get_macaroons())])
        
        closing_status = {}
        for response in response_stream:
            if hasattr(response, 'pending'):
                closing_status['status'] = 'pending'
            elif hasattr(response, 'confirmation'):
                closing_status['status'] = 'confirmed'
                closing_status['txid'] = response.confirmation.txid
        
        return closing_status


# 使用範例
if __name__ == '__main__':
    # LND 默認路徑
    cert_path = os.path.expanduser('~/.lnd/tls.cert')
    macaroon_path = os.path.expanduser('~/.lnd/data/chain/bitcoin/mainnet/admin.macaroon')
    
    # 初始化節點
    node = LightningNode(cert_path, macaroon_path)
    
    # 獲取節點資訊
    info = node.get_info()
    print(f"節點版本: {info['version']}")
    print(f"節點公鑰: {info['identity_pubkey']}")
    print(f"節點別名: {info['alias']}")
    print(f"通道數量: {info['num_channels']}")
    
    # 列出所有通道
    channels = node.list_channels()
    print(f"\n通道列表:")
    for chan in channels:
        print(f"  - 容量: {chan['capacity']} sat")
        print(f"    本地餘額: {chan['local_balance']} sat")
        print(f"    遠端餘額: {chan['remote_balance']} sat")

Core Lightning API 操作

Core Lightning(原名 c-lightning)使用 JSON-RPC 接口進行通訊,更符合傳統 Web 開發者的習慣:

import requests
import json
import base64

class CoreLightningNode:
    """
    Core Lightning 節點管理類別
    使用 JSON-RPC 接口
    """
    
    def __init__(self, rpc_path='/tmp/lightningd/bitcoin/lightning-rpc'):
        """
        初始化 Core Lightning 連接
        
        參數:
            rpc_path: lightningd RPC socket 路徑
        """
        self.rpc_path = rpc_path
    
    def _call(self, method, params=None):
        """
        發送 JSON-RPC 請求
        """
        if params is None:
            params = {}
        
        payload = {
            'jsonrpc': '2.0',
            'id': 0,
            'method': method,
            'params': params
        }
        
        with open(self.rpc_path, 'rb') as socket:
            sock = socket.read()
            if sock:
                # 透過 socket 讀取認證
                pass
        
        # 使用 HTTP 方式連接(如果配置了插件)
        # 或者透過 lightning-cli 命令
        import subprocess
        
        cmd = ['lightning-cli', method]
        for key, value in params.items():
            cmd.append(f'{key}={value}')
        
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            cwd='/tmp/lightningd'
        )
        
        return json.loads(result.stdout)
    
    def getinfo(self):
        """
        獲取節點資訊
        """
        return self._call('getinfo')
    
    def listchannels(self, short_channel_id=None):
        """
        列出通道
        """
        params = {}
        if short_channel_id:
            params['short_channel_id'] = short_channel_id
        
        return self._call('listchannels', params)
    
    def listpeers(self):
        """
        列出對等節點
        """
        return self._call('listpeers')
    
    def fundchannel(self, node_id, amount, feerate='normal'):
        """
        建立通道
        
        參數:
            node_id: 對方節點的 ID(公鑰或節點別名)
            amount: 出資金額(satoshi)
            feerate: 手續費率
        """
        params = {
            'id': node_id,
            'amount': amount,
            'feerate': feerate
        }
        
        return self._call('fundchannel', params)
    
    def close(self, node_id, unilateraltimeout=None, destination=None):
        """
        關閉通道
        
        參數:
            node_id: 對方節點 ID
            unilateraltimeout: 單方面關閉的超時時間
            destination: 資金目標地址
        """
        params = {'id': node_id}
        
        if unilateraltimeout:
            params['unilateraltimeout'] = unilateraltimeout
        if destination:
            params['destination'] = destination
        
        return self._call('close', params)
    
    def withdraw(self, destination, amount, feerate='normal'):
        """
        提款(關閉通道並將資金發送到鏈上地址)
        
        參數:
            destination: 比特幣地址
            amount: 金額(satoshi 或 "all")
            feerate: 手續費率
        """
        params = {
            'destination': destination,
            'satoshi': amount,
            'feerate': feerate
        }
        
        return self._call('withdraw', params)
    
    def invoice(self, amount_msat, label, description, expiry=3600):
        """
        創建發票
        
        參數:
            amount_msat: 金額(毫 satoshi)
            label: 標籤(唯一識別碼)
            description: 發票描述
            expiry: 過期時間(秒)
        """
        params = {
            'amount_msat': amount_msat,
            'label': label,
            'description': description,
            'expiry': expiry
        }
        
        return self._call('invoice', params)
    
    def pay(self, bolt11, amount_msat=None):
        """
        支付發票
        
        參數:
            bolt11: BOLT11 發票字串
            amount_msat: 可選的指定金額
        """
        params = {'bolt11': bolt11}
        
        if amount_msat:
            params['amount_msat'] = amount_msat
        
        return self._call('pay', params)
    
    def listinvoices(self, label=None):
        """
        列出發票
        """
        params = {}
        if label:
            params['label'] = label
        
        return self._call('listinvoices', params)
    
    def listpayments(self, bolt11=None):
        """
        列出支付記錄
        """
        params = {}
        if bolt11:
            params['bolt11'] = bolt11
        
        return self._call('listpayments', params)


# 使用範例
if __name__ == '__main__':
    node = CoreLightningNode()
    
    # 獲取節點資訊
    info = node.getinfo()
    print(f"節點版本: {info['version']}")
    print(f"節點 ID: {info['id']}")
    print(f"網路: {info['network']}")
    print(f"通道數量: {len(info['peers'])}")
    
    # 列出對等節點
    peers = node.listpeers()
    print(f"\n對等節點:")
    for peer in peers['peers']:
        print(f"  - ID: {peer['id']}")
        print(f"    連接狀態: {peer['connected']}")
        if 'channels' in peer:
            for chan in peer['channels']:
                print(f"    通道: {chan.get('short_channel_id', 'N/A')}")
                print(f"    狀態: {chan['state']}")

HTLC 管理與支付流程

創建與處理 HTLC

HTLC(Hash Time Locked Contract)是閃電網路實現支付的關鍵機制。以下是完整的 HTLC 管理程式碼:

class HTLCManager:
    """
    HTLC 管理類別
    處理 HTLC 的創建、轉發和結算
    """
    
    def __init__(self, lnd_node):
        self.node = lnd_node
    
    def create_hodl_invoice(self, amount_msat, secret_hash, expiry=3600):
        """
        創建 HOLD 發票(使用指定的 secret hash)
        這種發票在提供原像之前不會被結算
        
        參數:
            amount_msat: 金額(毫 satoshi)
            secret_hash: 預圖像的 SHA256 哈希(Base64 編碼)
            expiry: 過期時間(秒)
        """
        request = ln.Invoice(
            memo=f"HODL Invoice - {secret_hash[:8]}...",
            value_msat=amount_msat,
            expiry=expiry,
            # 設定 secret_hash(如果 API 支援)
            r_preimage=secret_hash.encode()  # 這裡是示例,實際根據 API 調整
        )
        
        response = self.node.stub.Invoice(
            request,
            metadata=[('macaroon', self.node._get_macaroons())]
        )
        
        return {
            'payment_request': response.payment_addr.hex(),
            'r_hash': response.r_hash.hex(),
            'add_index': response.add_index,
            'payment_addr': response.payment_addr.hex()
        }
    
    def settle_hodl_invoice(self, preimage):
        """
        結算 HOLD 發票(提供原像)
        
        參數:
            preimage: 預圖像(32 字節)
        """
        request = ln.SettleInvoiceRequest(
            preimage=preimage
        )
        
        response = self.node.stub.SettleInvoice(
            request,
            metadata=[('macaroon', self.node._get_macaroons())]
        )
        
        return {'success': True}
    
    def cancel_hodl_invoice(self, r_hash):
        """
        取消 HOLD 發票
        
        參數:
            r_hash: 發票的 R 哈希
        """
        request = ln.CancelInvoiceRequest(
            payment_addr=bytes.fromhex(r_hash)
        )
        
        response = self.node.stub.CancelInvoice(
            request,
            metadata=[('macaroon', self.node._get_macaroons())]
        )
        
        return {'success': True, 'cancelled': True}
    
    def decode_payment_request(self, payment_request):
        """
        解碼支付請求(BOLT11 發票)
        """
        request = ln.PayReqString(
            pay_req=payment_request
        )
        
        response = self.node.stub.DecodePayReq(
            request,
            metadata=[('macaroon', self.node._get_macaroons())]
        )
        
        return {
            'destination': response.destination,
            'payment_hash': response.payment_hash,
            'amount_msat': response.num_msat,
            'timestamp': response.timestamp,
            'expiry': response.expiry,
            'description': response.description,
            'cltv_expiry': response.cltv_expiry,
            'route_hints': [
                {
                    'hop_hints': [
                        {
                            'chan_id': hint.chan_id,
                            'fee_base_msat': hint.fee_base_msat,
                            'fee_proportional_millionths': hint.fee_proportional_millionths,
                            'cltv_expiry_delta': hint.cltv_expiry_delta
                        }
                        for hint in route.hop_hints
                    ]
                }
                for route in response.route_hints
            ]
        }
    
    def send_payment(self, payment_request, timeout=60):
        """
        發送支付
        
        參數:
            payment_request: BOLT11 發票
            timeout: 超時時間(秒)
        """
        request = ln.SendRequest(
            payment_req=payment_request,
            timeout_seconds=timeout
        )
        
        # 發送請求(返回流)
        response_stream = self.node.stub.SendPayment(
            request,
            metadata=[('macaroon', self.node._get_macaroons())]
        )
        
        payment_result = {
            'status': 'pending',
            'htlcs': []
        }
        
        for response in response_stream:
            htlc_status = {
                'attempt_id': response.attempt_id,
                'status': ln.HTLCAttempt.HTLCState.Name(response.state),
                'route': {
                    'total_amt_msat': response.route.total_amt_msat,
                    'total_fees_msat': response.route.total_fees_msat
                }
            }
            
            if response.state == ln.HTLCAttempt.HTLCState.SETTLED:
                payment_result['status'] = 'settled'
                payment_result['preimage'] = response.preimage.hex()
            elif response.state == ln.HTLCAttempt.HTLCState.FAILED:
                payment_result['status'] = 'failed'
                payment_result['failure'] = {
                    'code': response.failure.code,
                    'message': response.failure.onion_reason
                }
            
            payment_result['htlcs'].append(htlc_status)
        
        return payment_result
    
    def get_payment_status(self, payment_hash):
        """
        查詢支付狀態
        
        參數:
            payment_hash: 支付的 R 哈希
        """
        request = ln.ListPaymentsRequest(
            reversed=True,
            include_incomplete=True
        )
        
        response = self.node.stub.ListPayments(
            request,
            metadata=[('macaroon', self.node._get_macaroons())]
        )
        
        for payment in response.payments:
            if payment.payment_hash == bytes.fromhex(payment_hash):
                return {
                    'status': ln.Payment.PaymentState.Name(payment.status),
                    'amount_msat': payment.value_msat,
                    'fee_msat': payment.fee_msat_sat,
                    'preimage': payment.preimage.hex() if payment.preimage else None,
                    'creation_time': payment.creation_time_ns,
                    'failure_reason': payment.failure_reason
                }
        
        return None
    
    def probe_for_route(self, destination, amount_msat,进行一次路由探測以找到成功支付的路徑。這對於優化支付路徑和確定正確的手續費率非常有用:
        """
        探測路徑(估算路由費用)
        
        參數:
            destination: 目標節點公鑰
            amount_msat: 金額(毫 satoshi)
            routes: 可選的候選路由列表
        """
        request = ln.QueryRoutesRequest(
            pub_key=destination,
            amt_msat=amount_msat,
            num_routes=5,
            # 忽略現有的路由限制
            use_mission_control=False
        )
        
        response = self.node.stub.QueryRoutes(
            request,
            metadata=[('macaroon', self.node._get_macaroons())]
        )
        
        routes = []
        for route in response.routes:
            route_info = {
                'total_amt_msat': route.total_amt_msat,
                'total_fees_msat': route.total_fees_msat,
                'total_time_lock': route.total_time_lock,
                'hops': []
            }
            
            for hop in route.hops:
                route_info['hops'].append({
                    'chan_id': hop.chan_id,
                    'chan_capacity': hop.chan_capacity,
                    'amt_to_forward_msat': hop.amt_to_forward_msat,
                    'fee_msat': hop.fee_msat,
                    'expiry': hop.expiry,
                    'pub_key': hop.pub_key
                })
            
            routes.append(route_info)
        
        return routes

通道狀態監控與自動化

實時通道監控系統

import time
import threading
import logging
from datetime import datetime

class ChannelMonitor:
    """
    通道監控系統
    實時監控通道狀態變化並觸發告警
    """
    
    def __init__(self, lnd_node, alert_callback=None):
        """
        初始化監控系統
        
        參數:
            lnd_node: LightningNode 實例
            alert_callback: 告警回調函數
        """
        self.node = lnd_node
        self.alert_callback = alert_callback
        self.logger = logging.getLogger('ChannelMonitor')
        
        # 存儲通道歷史狀態
        self.channel_history = {}
        
        # 監控閾值
        self.thresholds = {
            'min_local_balance': 100000,  # 最小本地餘額 1000 sat
            'min_remote_balance': 100000,  # 最小遠端餘額
            'max_htlcs': 10,  # 最大待處理 HTLC 數量
            'channel_timeout': 144  # 通道超時閾值(區塊數)
        }
        
        self.running = False
        self.monitor_thread = None
    
    def start_monitoring(self, interval=60):
        """
        啟動監控線程
        
        參數:
            interval: 監控間隔(秒)
        """
        self.running = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop,
            args=(interval,)
        )
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        self.logger.info("通道監控已啟動")
    
    def stop_monitoring(self):
        """停止監控"""
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join(timeout=5)
        self.logger.info("通道監控已停止")
    
    def _monitor_loop(self, interval):
        """監控主循環"""
        while self.running:
            try:
                self._check_channels()
                self._check_pending_channels()
            except Exception as e:
                self.logger.error(f"監控錯誤: {e}")
            
            time.sleep(interval)
    
    def _check_channels(self):
        """檢查所有通道狀態"""
        channels = self.node.list_channels()
        
        for chan in channels:
            chan_id = chan['chan_id']
            
            # 初始化歷史記錄
            if chan_id not in self.channel_history:
                self.channel_history[chan_id] = []
            
            # 記錄當前狀態
            current_state = {
                'timestamp': datetime.now(),
                'local_balance': chan['local_balance'],
                'remote_balance': chan['remote_balance'],
                'pending_htlcs': chan['pending_htlcs']
            }
            
            # 檢查閾值
            self._check_thresholds(chan, current_state)
            
            # 檢查狀態變化
            self._check_state_changes(chan_id, current_state)
            
            # 更新歷史
            self.channel_history[chan_id].append(current_state)
            
            # 保持歷史記錄長度
            if len(self.channel_history[chan_id]) > 1000:
                self.channel_history[chan_id] = self.channel_history[chan_id][-500:]
    
    def _check_thresholds(self, chan, state):
        """檢查通道是否觸發閾值"""
        alerts = []
        
        # 檢查本地餘額
        if chan['local_balance'] < self.thresholds['min_local_balance']:
            alerts.append({
                'type': 'low_local_balance',
                'severity': 'warning',
                'message': f"通道 {chan['chan_id'][:8]} 本地餘額過低: {chan['local_balance']} sat",
                'channel': chan
            })
        
        # 檢查遠端餘額
        if chan['remote_balance'] < self.thresholds['min_remote_balance']:
            alerts.append({
                'type': 'low_remote_balance',
                'severity': 'info',
                'message': f"通道 {chan['chan_id'][:8]} 遠端餘額過低: {chan['remote_balance']} sat",
                'channel': chan
            })
        
        # 檢查待處理 HTLC
        if chan['pending_htlcs'] > self.thresholds['max_htlcs']:
            alerts.append({
                'type': 'high_htlcs',
                'severity': 'warning',
                'message': f"通道 {chan['chan_id'][:8]} 待處理 HTLC 過多: {chan['pending_htlcs']}",
                'channel': chan
            })
        
        # 發送告警
        for alert in alerts:
            self._send_alert(alert)
    
    def _check_state_changes(self, chan_id, state):
        """檢查通道狀態變化"""
        if len(self.channel_history[chan_id]) < 2:
            return
        
        prev_state = self.channel_history[chan_id][-2]
        
        # 檢查本地餘額大幅變化
        balance_change = abs(state['local_balance'] - prev_state['local_balance'])
        if balance_change > 1000000:  # 10 sat 變化
            self._send_alert({
                'type': 'balance_change',
                'severity': 'info',
                'message': f"通道 {chan_id[:8]} 餘額變化: {balance_change} sat",
                'channel': chan_id,
                'change': balance_change
            })
        
        # 檢查 HTLC 數量變化
        if state['pending_htlcs'] != prev_state['pending_htlcs']:
            self._send_alert({
                'type': 'htlc_change',
                'severity': 'info',
                'message': f"通道 {chan_id[:8]} HTLC 變化: {prev_state['pending_htlcs']} -> {state['pending_htlcs']}",
                'channel': chan_id
            })
    
    def _check_pending_channels(self):
        """檢查待處理通道"""
        pending = self.node.list_pending_channels()
        
        for chan in pending['pending_open_channels']:
            self._send_alert({
                'type': 'pending_channel',
                'severity': 'info',
                'message': f"待開啟通道: 容量 {chan['capacity']} sat",
                'channel': chan
            })
    
    def _send_alert(self, alert):
        """發送告警"""
        self.logger.warning(alert['message'])
        
        if self.alert_callback:
            self.alert_callback(alert)
    
    def get_channel_statistics(self):
        """獲取通道統計資訊"""
        channels = self.node.list_channels()
        
        stats = {
            'total_channels': len(channels),
            'total_capacity': sum(c['capacity'] for c in channels),
            'total_local_balance': sum(c['local_balance'] for c in channels),
            'total_remote_balance': sum(c['remote_balance'] for c in channels),
            'total_satoshis_sent': sum(c['total_satoshis_sent'] for c in channels),
            'total_satoshis_received': sum(c['total_satoshis_received'] for c in channels),
            'avg_local_balance': 0,
            'avg_remote_balance': 0,
            'channels_by_status': {}
        }
        
        if channels:
            stats['avg_local_balance'] = stats['total_local_balance'] / len(channels)
            stats['avg_remote_balance'] = stats['total_remote_balance'] / len(channels)
        
        return stats
    
    def export_channel_data(self):
        """導出通道數據用於分析"""
        channels = self.node.list_channels()
        
        export_data = {
            'export_time': datetime.now().isoformat(),
            'channels': channels,
            'statistics': self.get_channel_statistics()
        }
        
        return export_data


# 告警回調範例
def alert_handler(alert):
    """處理告警"""
    print(f"[{alert['severity'].upper()}] {alert['message']}")
    
    # 這裡可以添加更多處理邏輯:
    # - 發送 Telegram 通知
    # - 發送電子郵件
    # - 觸發自動化操作


# 使用範例
if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    
    node = LightningNode(cert_path, macaroon_path)
    monitor = ChannelMonitor(node, alert_callback=alert_handler)
    
    # 設置閾值
    monitor.thresholds = {
        'min_local_balance': 500000,  # 5000 sat
        'min_remote_balance': 500000,
        'max_htlcs': 5,
        'channel_timeout': 144
    }
    
    # 啟動監控
    monitor.start_monitoring(interval=30)
    
    # 打印統計資訊
    stats = monitor.get_channel_statistics()
    print(f"\n=== 通道統計 ===")
    print(f"總通道數: {stats['total_channels']}")
    print(f"總容量: {stats['total_capacity'] / 1e8:.4f} BTC")
    print(f"本地餘額總計: {stats['total_local_balance'] / 1e8:.4f} BTC")
    print(f"遠端餘額總計: {stats['total_remote_balance'] / 1e8:.4f} BTC")
    print(f"已發送總額: {stats['total_satoshis_sent'] / 1e8:.4f} BTC")
    print(f"已接收總額: {stats['total_satoshis_received'] / 1e8:.4f} BTC")
    
    # 保持運行
    try:
        while True:
            time.sleep(10)
    except KeyboardInterrupt:
        monitor.stop_monitoring()

流動性管理自動化

Submarine Swap 實現

import requests
import hashlib
import json

class SubmarineSwapManager:
    """
    Submarine Swap 管理類
    用於在鏈上和閃電網路之間交換流動性
    """
    
    def __init__(self, lnd_node, submarine_swap_server='https://api.lightning反转.com/v1'):
        """
        初始化 Submarine Swap 管理器
        
        參數:
            lnd_node: LightningNode 實例
            submarine_swap_server: Submarine Swap 服務器地址
        """
        self.node = lnd_node
        self.server = submarine_swap_server
    
    def loop_out(self, amount_sat, destination_address, channel_id=None):
        """
        Loop Out: 從閃電通道提取資金到鏈上地址
        
        參數:
            amount_sat: 提取金額(satoshi)
            destination_address: 目標比特幣地址
            channel_id: 指定通道(可選)
        
        返回:
            Swap 詳細資訊
        """
        # 使用 Loop 服務
        loop_url = f"{self.server}/loop/out"
        
        payload = {
            'amount_sat': amount_sat,
            'address': destination_address,
            'swap_fee_sat': max(1000, int(amount_sat * 0.005)),  # 0.5% 最低 1000 sat
            'prepay_sat': max(10, int(amount_sat * 0.0001))  # 預付費
        }
        
        if channel_id:
            payload['channel'] = channel_id
        
        # 注意:這裡需要 Loop API 的實際實現
        # 這裡提供概念範例
        
        # 步驟 1: 創建 Swap
        # response = requests.post(loop_url, json=payload).json()
        
        # 步驟 2: 等待 HTLC 鎖定
        # swap_id = response['id']
        
        # 步驟 3: 廣播鏈上交易
        # tx_hex = broadcast_onchain_transaction(response['onchain_address'], amount_sat)
        
        # 步驟 4: 等待確認並提取
        # wait_for_confirmation(tx_hex)
        # reveal_preimage(swap_id)
        
        return {
            'status': 'initiated',
            'amount_sat': amount_sat,
            'destination': destination_address,
            'estimated_fee': payload['swap_fee_sat'] + payload['prepay_sat']
        }
    
    def loop_in(self, amount_sat, node_public_key=None):
        """
        Loop In: 從鏈上存入資金到閃電通道
        
        參數:
            amount_sat: 存入金額(satoshi)
            node_public_key: 目標節點公鑰(可選)
        
        返回:
            Swap 詳細資訊
        """
        # 生成接收地址
        # response = requests.post(f"{self.server}/loop/in", 
        #                          json={'amount_sat': amount_sat}).json()
        
        return {
            'status': 'initiated',
            'amount_sat': amount_sat,
            'onchain_address': '生成中...',
            'invoice': '生成中...'
        }
    
    def get_swap_status(self, swap_id):
        """
        查詢 Swap 狀態
        """
        # response = requests.get(f"{self.server}/swap/{swap_id}").json()
        
        return {
            'swap_id': swap_id,
            'status': 'pending',
            'confirmation_block': None
        }


class Rebalancer:
    """
    通道餘額重新平衡器
    自動調整通道餘額以優化支付能力
    """
    
    def __init__(self, lnd_node):
        self.node = lnd_node
    
    def analyze_imbalance(self):
        """
        分析通道餘額不平衡
        
        返回:
            需要重新平衡的通道列表
        """
        channels = self.node.list_channels()
        
        imbalances = []
        
        for chan in channels:
            local_ratio = chan['local_balance'] / chan['capacity']
            
            # 如果本地餘額比例過高或過低
            if local_ratio > 0.9:  # 本地餘額過多
                imbalances.append({
                    'channel_id': chan['chan_id'],
                    'type': 'excess_local',
                    'amount': int(chan['capacity'] * 0.4),  # 建議重新分配 40%
                    'current_local': chan['local_balance'],
                    'current_remote': chan['remote_balance']
                })
            elif local_ratio < 0.1:  # 本地餘額過少
                imbalances.append({
                    'channel_id': chan['chan_id'],
                    'type': 'deficit_local',
                    'amount': int(chan['capacity'] * 0.3),  # 建議增加 30%
                    'current_local': chan['local_balance'],
                    'current_remote': chan['remote_balance']
                })
        
        return imbalances
    
    def auto_rebalance(self, max_rebalance_amount=1000000):
        """
        自動重新平衡通道
        
        參數:
            max_rebalance_amount: 最大重新平衡金額
        """
        imbalances = self.analyze_imbalance()
        
        rebalance_plan = []
        
        for imb in imbalances:
            # 查找互補的通道
            if imb['type'] == 'excess_local':
                # 需要將資金轉出
                target_channels = [c for c in imbalances 
                                   if c['type'] == 'deficit_local']
                
                if target_channels:
                    # 創建循環支付
                    amount = min(imb['amount'], target_channels[0]['amount'])
                    amount = min(amount, max_rebalance_amount)
                    
                    # 這裡需要實現實際的路向計算和支付
                    rebalance_plan.append({
                        'from_channel': imb['channel_id'],
                        'to_channel': target_channels[0]['channel_id'],
                        'amount': amount,
                        'estimated_fee': int(amount * 0.001)  # 0.1%
                    })
        
        return rebalance_plan
    
    def rebalance_via_circular_route(self, target_channel_id, amount):
        """
        通過循環路徑重新平衡
        
        參數:
            target_channel_id: 目標通道 ID
            amount: 金額
        """
        # 這需要實現複雜的路向發現
        # 1. 找到從目標通道出發的循環路徑
        # 2. 計算費用
        # 3. 執行支付
        
        return {
            'status': 'not_implemented',
            'note': '需要實現路向發現算法'
        }


# 使用範例
if __name__ == '__main__':
    node = LightningNode(cert_path, macaroon_path)
    
    # 初始化管理器
    rebalancer = Rebalancer(node)
    swap_manager = SubmarineSwapManager(node)
    
    # 分析不平衡
    imbalances = rebalancer.analyze_imbalance()
    print(f"\n=== 通道餘額分析 ===")
    print(f"發現 {len(imbalances)} 個不平衡通道")
    
    for imb in imbalances:
        print(f"\n通道: {imb['channel_id'][:16]}...")
        print(f"  類型: {imb['type']}")
        print(f"  建議金額: {imb['amount'] / 1e8:.4f} BTC")
        print(f"  當前本地/遠端: {imb['current_local']}/{imb['current_remote']}")
    
    # 檢查 Submarine Swap
    print(f"\n=== Submarine Swap ===")
    # swap_info = swap_manager.loop_out(500000, "bc1q...")
    # print(f"Loop Out 狀態: {swap_info['status']}")

費用管理與優化

動態費用計算

class FeeManager:
    """
    閃電網路費用管理器
    動態計算和優化支付費用
    """
    
    def __init__(self, lnd_node):
        self.node = lnd_node
    
    def estimate_route_fees(self, destination, amount_msat):
        """
        估算路由費用
        
        參數:
            destination: 目標節點公鑰
            amount_msat: 金額(毫 satoshi)
        
        返回:
            費用估計
        """
        routes = self.node.stub.QueryRoutes(
            ln.QueryRoutesRequest(
                pub_key=destination,
                amt_msat=amount_msat,
                num_routes=10
            ),
            metadata=[('macaroon', self.node._get_macaroons())]
        )
        
        fee_estimates = []
        
        for route in routes.routes:
            fee_estimate = {
                'total_fee_msat': route.total_fees_msat,
                'total_amount_msat': route.total_amt_msat,
                'fee_rate': route.total_fees_msat / route.total_amt_msat if route.total_amt_msat > 0 else 0,
                'num_hops': len(route.hops),
                'total_time_lock': route.total_time_lock,
                'route': [hop.pub_key for hop in route.hops]
            }
            fee_estimates.append(fee_estimate)
        
        # 按費用排序
        fee_estimates.sort(key=lambda x: x['total_fee_msat'])
        
        return {
            'cheapest_route': fee_estimates[0] if fee_estimates else None,
            'fastest_route': min(fee_estimates, key=lambda x: x['total_time_lock']) if fee_estimates else None,
            'all_routes': fee_estimates
        }
    
    def calculate_optimal_fee(self, destination, amount_msat, priority='cheapest'):
        """
        計算最優費用
        
        參數:
            destination: 目標節點
            amount_msat: 金額
            priority: 優先級 ('cheapest' 或 'fastest')
        
        返回:
            推薦的路由和費用
        """
        estimates = self.estimate_route_fees(destination, amount_msat)
        
        if priority == 'cheapest':
            return estimates['cheapest_route']
        elif priority == 'fastest':
            return estimates['fastest_route']
        else:
            return estimates['cheapest_route']  # 默認
    
    def set_channel_fees(self, channel_id, base_fee_msat=0, fee_rate_ppm=1):
        """
        設定通道費用
        
        參數:
            channel_id: 通道 ID
            base_fee_msat: 基礎費用(毫 satoshi)
            fee_rate_ppm: 費用率(百萬分之一)
        """
        request = ln.ChannelAcceptRequest(
            channel_point=ln.ChannelPoint(
                funding_txid_str=channel_id.split(':')[0],
                output_index=int(channel_id.split(':')[1])
            ),
            # 這裡是設定通道費用的 API
            # 實際 API 調用可能不同
        )
        
        # 更新通道費用
        update_request = ln.UpdateChannelRequest(
            chan_point=ln.ChannelPoint(
                funding_txid_str=channel_id.split(':')[0],
                output_index=int(channel_id.split(':')[1])
            ),
            base_fee_msat=base_fee_msat,
            fee_rate_ppm=fee_rate_ppm,
            time_lock_delta=40  # 默認 CLTV delta
        )
        
        return self.node.stub.UpdateChannel(
            update_request,
            metadata=[('macaroon', self.node._get_macaroons())]
        )
    
    def get_onchain_fees(self, target_confirmations=6):
        """
        獲取鏈上費用估計
        """
        request = ln.EstimateFeeRequest(
            conf_target=target_confirmations,
            add_ess=[]  # 估計費用的交易
        )
        
        response = self.node.stub.EstimateFee(
            request,
            metadata=[('macaroon', self.node._get_macaroons())]
        )
        
        return {
            'fee_per_kw': response.fee_per_kw,
            'fee_per_vbyte': response.fee_per_kw / 250,  # 近似轉換
            'conf_target': target_confirmations
        }
    
    def recommend_channel_fees(self, channel_balance, peer_count):
        """
        推薦通道費用設定
        
        根據通道餘額和對等節點數量推薦費用
        """
        # 基礎費用
        base_fee = 1  # 1 msat
        
        # 根據餘額調整
        balance_ratio = channel_balance / 1e8  # 轉換為 BTC
        
        if balance_ratio < 0.1:  # 餘額低
            fee_rate = 50  # 提高費用以吸引重平衡
        elif balance_ratio > 0.5:  # 餘額高
            fee_rate = 5  # 降低費用以吸引流量
        else:
            fee_rate = 10  # 正常費用
        
        # 根據對等節點數量調整
        if peer_count < 5:
            fee_rate *= 1.5  # 節點少,提高費用
        elif peer_count > 20:
            fee_rate *= 0.8  # 節點多,降低費用
        
        return {
            'base_fee_msat': base_fee,
            'fee_rate_ppm': int(fee_rate),
            'reasoning': f"基於餘額 {balance_ratio:.2f} BTC 和 {peer_count} 個對等節點"
        }


# 使用範例
if __name__ == '__main__':
    node = LightningNode(cert_path, macaroon_path)
    fee_manager = FeeManager(node)
    
    # 估算費用
    # 假設要支付給某個節點 100000 msat
    estimates = fee_manager.estimate_route_fees(
        '02abcdef...' * 8,  # 目標節點公鑰
        100000
    )
    
    print("\n=== 費用估算 ===")
    if estimates['cheapest_route']:
        print(f"最便宜路由:")
        print(f"  費用: {estimates['cheapest_route']['total_fee_msat']} msat")
        print(f"  費用率: {estimates['cheapest_route']['fee_rate']*100:.4f}%")
        print(f"  跳數: {estimates['cheapest_route']['num_hops']}")
    
    # 鏈上費用
    onchain_fees = fee_manager.get_onchain_fees()
    print(f"\n鏈上費用估計:")
    print(f"  每 vbyte: {onchain_fees['fee_per_vbyte']:.2f} sat")
    print(f"  目標確認: {onchain_fees['conf_target']} 區塊")

結論

本文深入探討了閃電網路通道程式設計的各個層面,從 LND 和 Core Lightning 的 API 操作、HTLC 管理、通道監控、流動性管理到費用優化。這些技術構成構建專業級閃電網路應用的基礎。

關鍵要點回顧:

  1. API 整合:LND 的 gRPC 接口和 Core Lightning 的 JSON-RPC 接口各有優勢,選擇應基於具體應用場景和技術棧。
  1. HTLC 管理:HTLC 是閃電支付的核心,理解其創建、轉發和結算流程對於構建可靠的支付系統至關重要。
  1. 監控自動化:實時監控通道狀態、餘額變化和潛在風險是運營閃電節點的必要條件。
  1. 流動性優化:Submarine Swap 和自動重平衡是解決通道餘額不均問題的有效工具。
  1. 費用策略:動態費用計算和優化可以顯著提高支付的經濟效益。

隨著閃電網路技術的持續發展,這些 API 和工具將進一步完善,為比特幣的微支付場景開闢更廣闘的應用空間。


相關文章

延伸閱讀與來源

這篇文章對您有幫助嗎?

評論

發表評論

注意:由於這是靜態網站,您的評論將儲存在本地瀏覽器中,不會公開顯示。

目前尚無評論,成為第一個發表評論的人吧!