URI:
       tpeertopeer.py - electrum-personal-server - Maximally lightweight electrum server for a single user
  HTML git clone https://git.parazyd.org/electrum-personal-server
   DIR Log
   DIR Files
   DIR Refs
   DIR README
       ---
       tpeertopeer.py (16716B)
       ---
            1 #! /usr/bin/env python
            2 
            3 import socket
            4 import time
            5 import base64
            6 import threading
            7 import queue
            8 import random
            9 from struct import pack, unpack
           10 from datetime import datetime
           11 
           12 import electrumpersonalserver.bitcoin as btc
           13 from electrumpersonalserver.server.socks import (
           14     socksocket,
           15     setdefaultproxy,
           16     PROXY_TYPE_SOCKS5
           17 )
           18 from electrumpersonalserver.server.jsonrpc import JsonRpcError
           19 
           20 PROTOCOL_VERSION = 70016
           21 DEFAULT_USER_AGENT = '/Satoshi:0.21.0/'
           22 
           23 #https://github.com/bitcoin/bitcoin/blob/master/src/protocol.h
           24 NODE_NETWORK = 1
           25 NODE_BLOOM = 1 << 2
           26 NODE_WITNESS = 1 << 3
           27 NODE_NETWORK_LIMITED = 1 << 10
           28 
           29 # protocol versions above this also send a relay boolean
           30 RELAY_TX_VERSION = 70001
           31 
           32 # length of bitcoin p2p packets
           33 HEADER_LENGTH = 24
           34 
           35 # if no message has been seen for this many seconds, send a ping
           36 KEEPALIVE_INTERVAL = 2 * 60
           37 
           38 # close connection if keep alive ping isnt responded to in this many seconds
           39 KEEPALIVE_TIMEOUT = 20 * 60
           40 
           41 
           42 def ip_to_hex(ip_str):
           43     # ipv4 only for now
           44     return socket.inet_pton(socket.AF_INET, ip_str)
           45 
           46 def create_net_addr(hexip, port): # doesnt contain time as in bitcoin wiki
           47     services = 0
           48     hex = bytes(10) + b'\xFF\xFF' + hexip
           49     return pack('<Q16s', services, hex) + pack('>H', port)
           50 
           51 def create_var_str(s):
           52     return btc.num_to_var_int(len(s)) + s.encode()
           53 
           54 def read_int(ptr, payload, n, littleendian=True):
           55     data = payload[ptr[0] : ptr[0]+n]
           56     if littleendian:
           57         data = data[::-1]
           58     ret =  btc.decode(data, 256)
           59     ptr[0] += n
           60     return ret
           61 
           62 def read_var_int(ptr, payload):
           63     val = payload[ptr[0]]
           64     ptr[0] += 1
           65     if val < 253:
           66         return val
           67     return read_int(ptr, payload, 2**(val - 252))
           68 
           69 def read_var_str(ptr, payload):
           70     l = read_var_int(ptr, payload)
           71     ret = payload[ptr[0]: ptr[0] + l]
           72     ptr[0] += l
           73     return ret
           74 
           75 def ip_hex_to_str(ip_hex):
           76     # https://en.wikipedia.org/wiki/IPv6#IPv4-mapped_IPv6_addresses
           77     # https://www.cypherpunk.at/onioncat_trac/wiki/OnionCat
           78     if ip_hex[:14] == '\x00'*10 + '\xff'*2:
           79         # ipv4 mapped ipv6 addr
           80         return socket.inet_ntoa(ip_hex[12:])
           81     elif ip_hex[:6] == '\xfd\x87\xd8\x7e\xeb\x43':
           82         return base64.b32encode(ip_hex[6:]).lower() + '.onion'
           83     else:
           84         return socket.inet_ntop(socket.AF_INET6, ip_hex)
           85 
           86 class P2PMessageHandler(object):
           87     def __init__(self, logger):
           88         self.last_message = datetime.now()
           89         self.waiting_for_keepalive = False
           90         self.logger = logger
           91 
           92     def check_keepalive(self, p2p):
           93         if self.waiting_for_keepalive:
           94             if ((datetime.now() - self.last_message).total_seconds()
           95                     < KEEPALIVE_TIMEOUT):
           96                 return
           97             self.logger.debug('keepalive timed out, closing')
           98             p2p.sock.close()
           99         else:
          100             if ((datetime.now() - self.last_message).total_seconds()
          101                     < KEEPALIVE_INTERVAL):
          102                 return
          103             self.logger.debug('sending keepalive to peer')
          104             self.waiting_for_keepalive = True
          105             p2p.sock.sendall(p2p.create_message('ping', '\x00'*8))
          106 
          107     def handle_message(self, p2p, command, length, payload):
          108         self.last_message = datetime.now()
          109         self.waiting_for_keepalive = False
          110         ptr = [0]
          111         if command == b'version':
          112             version = read_int(ptr, payload, 4)
          113             services = read_int(ptr, payload, 8)
          114             timestamp = read_int(ptr, payload, 8)
          115             addr_recv_services = read_int(ptr, payload, 8)
          116             addr_recv_ip = payload[ptr[0] : ptr[0]+16]
          117             ptr[0] += 16
          118             addr_recv_port = read_int(ptr, payload, 2, False)
          119             addr_trans_services = read_int(ptr, payload, 8)
          120             addr_trans_ip = payload[ptr[0] : ptr[0]+16]
          121             ptr[0] += 16
          122             addr_trans_port = read_int(ptr, payload, 2, False)
          123             ptr[0] += 8 # skip over nonce
          124             user_agent = read_var_str(ptr, payload)
          125             start_height = read_int(ptr, payload, 4)
          126             if version > RELAY_TX_VERSION:
          127                 relay = read_int(ptr, payload, 1) != 0
          128             else:
          129                 # must check node accepts unconfirmed txes before broadcasting
          130                 relay = True
          131             self.logger.debug(('Received peer version message: version=%d'
          132                 + ' services=0x%x'
          133                 + ' timestamp=%s user_agent=%s start_height=%d relay=%i'
          134                 + ' them=%s:%d us=%s:%d') % (version,
          135                 services, str(datetime.fromtimestamp(timestamp)),
          136                 user_agent, start_height, relay, ip_hex_to_str(addr_trans_ip)
          137                 , addr_trans_port, ip_hex_to_str(addr_recv_ip), addr_recv_port))
          138             p2p.sock.sendall(p2p.create_message('verack', b''))
          139             self.on_recv_version(p2p, version, services, timestamp,
          140                 addr_recv_services, addr_recv_ip, addr_trans_services,
          141                 addr_trans_ip, addr_trans_port, user_agent, start_height,
          142                 relay)
          143         elif command == b'verack':
          144             self.on_connected(p2p)
          145         elif command == b'ping':
          146             p2p.sock.sendall(p2p.create_message('pong', payload))
          147 
          148     # optional override these in a subclass
          149 
          150     def on_recv_version(self, p2p, version, services, timestamp,
          151             addr_recv_services, addr_recv_ip, addr_trans_services,
          152             addr_trans_ip, addr_trans_port, user_agent, start_height, relay):
          153         pass
          154 
          155     def on_connected(self, p2p):
          156         pass
          157 
          158     def on_heartbeat(self, p2p):
          159         pass
          160 
          161 
          162 class P2PProtocol(object):
          163     def __init__(self, p2p_message_handler, remote_hostport,
          164                  network, logger, notify_queue, user_agent=DEFAULT_USER_AGENT,
          165                  socks5_hostport=("localhost", 9050), connect_timeout=30,
          166                  heartbeat_interval=15, start_height=0):
          167         self.p2p_message_handler = p2p_message_handler
          168         self.remote_hostport = remote_hostport
          169         self.logger = logger
          170         self.notify_queue = notify_queue
          171         self.user_agent = user_agent
          172         self.socks5_hostport = socks5_hostport
          173         self.connect_timeout = connect_timeout
          174         self.heartbeat_interval = heartbeat_interval
          175         self.start_height = start_height
          176         if network == "testnet":
          177             self.magic = 0x0709110b
          178         elif network == "regtest":
          179             self.magic = 0xdab5bffa
          180         else:
          181             self.magic = 0xd9b4bef9
          182         self.closed = False
          183 
          184     def run(self):
          185         services = (NODE_NETWORK | NODE_WITNESS | NODE_NETWORK_LIMITED)
          186         st = int(time.time())
          187         nonce = random.getrandbits(64)
          188 
          189         netaddr_them = create_net_addr(ip_to_hex('0.0.0.0'), 0)
          190         netaddr_us = create_net_addr(ip_to_hex('0.0.0.0'), 0)
          191         version_message = (pack('<iQQ', PROTOCOL_VERSION, services, st)
          192                            + netaddr_them
          193                            + netaddr_us
          194                            + pack('<Q', nonce)
          195                            + create_var_str(self.user_agent)
          196                            + pack('<I', self.start_height)
          197                            + b'\x01')
          198 
          199         self.logger.debug('Connecting to bitcoin peer at ' +
          200                 str(self.remote_hostport) + ' with proxy ' +
          201                 str(self.socks5_hostport))
          202         setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0],
          203                         self.socks5_hostport[1], True)
          204         self.sock = socksocket()
          205         self.sock.settimeout(self.connect_timeout)
          206         self.sock.connect(self.remote_hostport)
          207         self.sock.sendall(self.create_message('version', version_message))
          208 
          209         self.sock.settimeout(self.heartbeat_interval)
          210         self.closed = False
          211         try:
          212             recv_buffer = b''
          213             payload_length = -1  # -1 means waiting for header
          214             command = None
          215             checksum = None
          216             while not self.closed:
          217                 try:
          218                     recv_data = self.sock.recv(4096)
          219                     if not recv_data or len(recv_data) == 0:
          220                         raise EOFError()
          221                     recv_buffer += recv_data
          222                     # this is O(N^2) scaling in time, another way would be to
          223                     # store in a list and combine at the end with "".join()
          224                     # but this isnt really timing critical so didnt optimize it
          225 
          226                     data_remaining = True
          227                     while data_remaining and not self.closed:
          228                         if payload_length == -1 and (len(recv_buffer)
          229                                 >= HEADER_LENGTH):
          230                             net_magic, command, payload_length, checksum =\
          231                                 unpack('<I12sI4s', recv_buffer[:HEADER_LENGTH])
          232                             recv_buffer = recv_buffer[HEADER_LENGTH:]
          233                             if net_magic != self.magic:
          234                                 self.logger.debug('wrong MAGIC: ' +
          235                                     hex(net_magic))
          236                                 self.sock.close()
          237                                 break
          238                             command = command.strip(b'\0')
          239                         else:
          240                             if payload_length >= 0 and (len(recv_buffer)
          241                                     >= payload_length):
          242                                 payload = recv_buffer[:payload_length]
          243                                 recv_buffer = recv_buffer[payload_length:]
          244                                 if btc.bin_dbl_sha256(payload)[:4] == checksum:
          245                                     self.p2p_message_handler.handle_message(
          246                                         self, command, payload_length, payload)
          247                                 else:
          248                                     self.logger.debug("wrong checksum, " +
          249                                         "dropping " +
          250                                         "message, cmd=" + command +
          251                                         " payloadlen=" + str(payload_length))
          252                                 payload_length = -1
          253                                 data_remaining = True
          254                             else:
          255                                 data_remaining = False
          256                 except socket.timeout:
          257                     self.p2p_message_handler.check_keepalive(self)
          258                     self.p2p_message_handler.on_heartbeat(self)
          259         except EOFError as e:
          260             self.closed = True
          261         except IOError as e:
          262             import traceback
          263             self.logger.debug("logging traceback from %s: \n" %
          264                 traceback.format_exc())
          265             self.closed = True
          266         finally:
          267             try:
          268                 self.sock.close()
          269             except Exception as _:
          270                 pass
          271 
          272     def close(self):
          273         self.closed = True
          274 
          275     def create_message(self, command, payload):
          276         return (pack("<I12sI", self.magic, command.encode(), len(payload))
          277             + btc.bin_dbl_sha256(payload)[:4] + payload)
          278 
          279 class P2PBroadcastTx(P2PMessageHandler):
          280     def __init__(self, txhex, logger, notify_queue):
          281         P2PMessageHandler.__init__(self, logger)
          282         self.txhex = bytes.fromhex(txhex)
          283         self.txid = btc.bin_txhash(self.txhex)
          284         self.uploaded_tx = False
          285         self.time_marker = datetime.now()
          286         self.connected = False
          287         self.notify_queue = notify_queue
          288 
          289     def on_recv_version(self, p2p, version, services, timestamp,
          290             addr_recv_services, addr_recv_ip, addr_trans_services,
          291             addr_trans_ip, addr_trans_port, user_agent, start_height, relay):
          292         if not relay:
          293             self.logger.debug('peer not accepting unconfirmed txes, trying ' +
          294                 'another')
          295             # this happens if the other node is using blockonly=1
          296             p2p.close()
          297         if not services & NODE_WITNESS:
          298             self.logger.debug('peer not accepting witness data, trying another')
          299             p2p.close()
          300 
          301     def on_connected(self, p2p):
          302         MSG = 1 #msg_tx
          303         inv_payload = pack('<BI', 1, MSG) + self.txid
          304         p2p.sock.sendall(p2p.create_message('inv', inv_payload))
          305         self.time_marker = datetime.now()
          306         self.uploaded_tx = False
          307         self.connected = True
          308 
          309     def on_heartbeat(self, p2p):
          310         self.logger.debug('broadcaster heartbeat')
          311         VERACK_TIMEOUT = 40
          312         GETDATA_TIMEOUT = 60
          313         if not self.connected:
          314             if ((datetime.now() - self.time_marker).total_seconds()
          315                     < VERACK_TIMEOUT):
          316                 return
          317             self.logger.debug('timed out of waiting for verack')
          318         else:
          319             if ((datetime.now() - self.time_marker).total_seconds()
          320                     < GETDATA_TIMEOUT):
          321                 return
          322             self.logger.debug('timed out in waiting for getdata, node ' +
          323                 'already has tx')
          324             self.uploaded_tx = True
          325         p2p.close()
          326 
          327     def handle_message(self, p2p, command, length, payload):
          328         P2PMessageHandler.handle_message(self, p2p, command, length, payload)
          329         ptr = [0]
          330         if command == b'getdata':
          331             count = read_var_int(ptr, payload)
          332             for _ in range(count):
          333                 ptr[0] += 4
          334                 hash_id = payload[ptr[0] : ptr[0] + 32]
          335                 ptr[0] += 32
          336                 if hash_id == self.txid:
          337                     p2p.sock.sendall(p2p.create_message('tx', self.txhex))
          338                     self.uploaded_tx = True
          339                     self.logger.debug("Uploaded transaction via tor to peer at "
          340                         + str(p2p.remote_hostport))
          341                     self.notify_queue.put(True)
          342                     ##make sure the packets really got through by sleeping
          343                     ##some kernels seem to send a RST packet on close() even
          344                     ##if theres still data in the send buffer
          345                     time.sleep(5)
          346                     p2p.close()
          347 
          348 def broadcaster_thread(txhex, node_addrs, tor_hostport, network, logger,
          349         start_height, notify_queue):
          350     for node_addr in node_addrs:
          351         remote_hostport = (node_addr["address"], node_addr["port"])
          352         p2p_msg_handler = P2PBroadcastTx(txhex, logger, notify_queue)
          353         p2p = P2PProtocol(p2p_msg_handler, remote_hostport,
          354             network, logger, notify_queue, socks5_hostport=tor_hostport,
          355             heartbeat_interval=20, start_height=start_height)
          356         try:
          357             p2p.run()
          358         except IOError as e:
          359             logger.debug("p2p.run() exited: " + repr(e))
          360             continue
          361         if p2p_msg_handler.uploaded_tx:
          362             break
          363     logger.debug("Exiting tor broadcast thread, uploaded_tx = " +
          364         str(p2p_msg_handler.uploaded_tx))
          365     if not p2p_msg_handler.uploaded_tx:
          366         notify_queue.put(False)
          367     return p2p_msg_handler.uploaded_tx
          368 
          369 def chunk_list(d, n):
          370     return [d[x:x + n] for x in range(0, len(d), n)]
          371 
          372 def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
          373     CONNECTION_THREADS = 8
          374     CONNECTION_ATTEMPTS_PER_THREAD = 10
          375 
          376     required_address_count = CONNECTION_ATTEMPTS_PER_THREAD * CONNECTION_THREADS
          377     node_addrs_witness = []
          378     while True:
          379         try:
          380             new_node_addrs = rpc.call("getnodeaddresses",
          381                 [3*required_address_count//2])
          382         except JsonRpcError as e:
          383             logger.debug(repr(e))
          384             logger.error("Bitcoin Core v0.18.0 or higher is required "
          385                 "to broadcast through Tor")
          386             return False
          387         node_addrs_witness.extend(
          388             [a for a in new_node_addrs if a["services"] & NODE_WITNESS]
          389         )
          390         logger.debug("len(new_node_addrs) = " + str(len(new_node_addrs)) +
          391             " len(node_addrs_witness) = " + str(len(node_addrs_witness)))
          392         if len(node_addrs_witness) > required_address_count:
          393             break
          394     node_addrs_chunks = chunk_list(
          395         node_addrs_witness[:required_address_count],
          396         CONNECTION_ATTEMPTS_PER_THREAD
          397     )
          398     notify_queue = queue.Queue()
          399     start_height = rpc.call("getblockcount", [])
          400     for node_addrs in node_addrs_chunks:
          401         t = threading.Thread(target=broadcaster_thread,
          402             args=(txhex, node_addrs, tor_hostport, network, logger,
          403                 start_height, notify_queue),
          404             daemon=True)
          405         t.start()
          406     try:
          407         success = notify_queue.get(block=True, timeout=20)
          408     except queue.Empty:
          409         logger.debug("Timed out getting notification for broadcasting "
          410             + "transaction")
          411         #the threads will maybe still continue to try broadcasting even
          412         # after this timeout
          413         #could time out at 20 seconds for any legitimate reason, tor is slow
          414         # so no point failing, this timeout is just so the user doesnt have
          415         # to stare at a seemingly-frozen dialog
          416         success = True
          417     return success