tpeertopeer.py - electrum-personal-server - Maximally lightweight electrum server for a single user
HTML git clone https://git.parazyd.org/electrum-personal-server
DIR Log
DIR Files
DIR Refs
DIR README
---
tpeertopeer.py (16716B)
---
1 #! /usr/bin/env python
2
3 import socket
4 import time
5 import base64
6 import threading
7 import queue
8 import random
9 from struct import pack, unpack
10 from datetime import datetime
11
12 import electrumpersonalserver.bitcoin as btc
13 from electrumpersonalserver.server.socks import (
14 socksocket,
15 setdefaultproxy,
16 PROXY_TYPE_SOCKS5
17 )
18 from electrumpersonalserver.server.jsonrpc import JsonRpcError
19
20 PROTOCOL_VERSION = 70016
21 DEFAULT_USER_AGENT = '/Satoshi:0.21.0/'
22
23 #https://github.com/bitcoin/bitcoin/blob/master/src/protocol.h
24 NODE_NETWORK = 1
25 NODE_BLOOM = 1 << 2
26 NODE_WITNESS = 1 << 3
27 NODE_NETWORK_LIMITED = 1 << 10
28
29 # protocol versions above this also send a relay boolean
30 RELAY_TX_VERSION = 70001
31
32 # length of bitcoin p2p packets
33 HEADER_LENGTH = 24
34
35 # if no message has been seen for this many seconds, send a ping
36 KEEPALIVE_INTERVAL = 2 * 60
37
38 # close connection if keep alive ping isnt responded to in this many seconds
39 KEEPALIVE_TIMEOUT = 20 * 60
40
41
42 def ip_to_hex(ip_str):
43 # ipv4 only for now
44 return socket.inet_pton(socket.AF_INET, ip_str)
45
46 def create_net_addr(hexip, port): # doesnt contain time as in bitcoin wiki
47 services = 0
48 hex = bytes(10) + b'\xFF\xFF' + hexip
49 return pack('<Q16s', services, hex) + pack('>H', port)
50
51 def create_var_str(s):
52 return btc.num_to_var_int(len(s)) + s.encode()
53
54 def read_int(ptr, payload, n, littleendian=True):
55 data = payload[ptr[0] : ptr[0]+n]
56 if littleendian:
57 data = data[::-1]
58 ret = btc.decode(data, 256)
59 ptr[0] += n
60 return ret
61
62 def read_var_int(ptr, payload):
63 val = payload[ptr[0]]
64 ptr[0] += 1
65 if val < 253:
66 return val
67 return read_int(ptr, payload, 2**(val - 252))
68
69 def read_var_str(ptr, payload):
70 l = read_var_int(ptr, payload)
71 ret = payload[ptr[0]: ptr[0] + l]
72 ptr[0] += l
73 return ret
74
75 def ip_hex_to_str(ip_hex):
76 # https://en.wikipedia.org/wiki/IPv6#IPv4-mapped_IPv6_addresses
77 # https://www.cypherpunk.at/onioncat_trac/wiki/OnionCat
78 if ip_hex[:14] == '\x00'*10 + '\xff'*2:
79 # ipv4 mapped ipv6 addr
80 return socket.inet_ntoa(ip_hex[12:])
81 elif ip_hex[:6] == '\xfd\x87\xd8\x7e\xeb\x43':
82 return base64.b32encode(ip_hex[6:]).lower() + '.onion'
83 else:
84 return socket.inet_ntop(socket.AF_INET6, ip_hex)
85
86 class P2PMessageHandler(object):
87 def __init__(self, logger):
88 self.last_message = datetime.now()
89 self.waiting_for_keepalive = False
90 self.logger = logger
91
92 def check_keepalive(self, p2p):
93 if self.waiting_for_keepalive:
94 if ((datetime.now() - self.last_message).total_seconds()
95 < KEEPALIVE_TIMEOUT):
96 return
97 self.logger.debug('keepalive timed out, closing')
98 p2p.sock.close()
99 else:
100 if ((datetime.now() - self.last_message).total_seconds()
101 < KEEPALIVE_INTERVAL):
102 return
103 self.logger.debug('sending keepalive to peer')
104 self.waiting_for_keepalive = True
105 p2p.sock.sendall(p2p.create_message('ping', '\x00'*8))
106
107 def handle_message(self, p2p, command, length, payload):
108 self.last_message = datetime.now()
109 self.waiting_for_keepalive = False
110 ptr = [0]
111 if command == b'version':
112 version = read_int(ptr, payload, 4)
113 services = read_int(ptr, payload, 8)
114 timestamp = read_int(ptr, payload, 8)
115 addr_recv_services = read_int(ptr, payload, 8)
116 addr_recv_ip = payload[ptr[0] : ptr[0]+16]
117 ptr[0] += 16
118 addr_recv_port = read_int(ptr, payload, 2, False)
119 addr_trans_services = read_int(ptr, payload, 8)
120 addr_trans_ip = payload[ptr[0] : ptr[0]+16]
121 ptr[0] += 16
122 addr_trans_port = read_int(ptr, payload, 2, False)
123 ptr[0] += 8 # skip over nonce
124 user_agent = read_var_str(ptr, payload)
125 start_height = read_int(ptr, payload, 4)
126 if version > RELAY_TX_VERSION:
127 relay = read_int(ptr, payload, 1) != 0
128 else:
129 # must check node accepts unconfirmed txes before broadcasting
130 relay = True
131 self.logger.debug(('Received peer version message: version=%d'
132 + ' services=0x%x'
133 + ' timestamp=%s user_agent=%s start_height=%d relay=%i'
134 + ' them=%s:%d us=%s:%d') % (version,
135 services, str(datetime.fromtimestamp(timestamp)),
136 user_agent, start_height, relay, ip_hex_to_str(addr_trans_ip)
137 , addr_trans_port, ip_hex_to_str(addr_recv_ip), addr_recv_port))
138 p2p.sock.sendall(p2p.create_message('verack', b''))
139 self.on_recv_version(p2p, version, services, timestamp,
140 addr_recv_services, addr_recv_ip, addr_trans_services,
141 addr_trans_ip, addr_trans_port, user_agent, start_height,
142 relay)
143 elif command == b'verack':
144 self.on_connected(p2p)
145 elif command == b'ping':
146 p2p.sock.sendall(p2p.create_message('pong', payload))
147
148 # optional override these in a subclass
149
150 def on_recv_version(self, p2p, version, services, timestamp,
151 addr_recv_services, addr_recv_ip, addr_trans_services,
152 addr_trans_ip, addr_trans_port, user_agent, start_height, relay):
153 pass
154
155 def on_connected(self, p2p):
156 pass
157
158 def on_heartbeat(self, p2p):
159 pass
160
161
162 class P2PProtocol(object):
163 def __init__(self, p2p_message_handler, remote_hostport,
164 network, logger, notify_queue, user_agent=DEFAULT_USER_AGENT,
165 socks5_hostport=("localhost", 9050), connect_timeout=30,
166 heartbeat_interval=15, start_height=0):
167 self.p2p_message_handler = p2p_message_handler
168 self.remote_hostport = remote_hostport
169 self.logger = logger
170 self.notify_queue = notify_queue
171 self.user_agent = user_agent
172 self.socks5_hostport = socks5_hostport
173 self.connect_timeout = connect_timeout
174 self.heartbeat_interval = heartbeat_interval
175 self.start_height = start_height
176 if network == "testnet":
177 self.magic = 0x0709110b
178 elif network == "regtest":
179 self.magic = 0xdab5bffa
180 else:
181 self.magic = 0xd9b4bef9
182 self.closed = False
183
184 def run(self):
185 services = (NODE_NETWORK | NODE_WITNESS | NODE_NETWORK_LIMITED)
186 st = int(time.time())
187 nonce = random.getrandbits(64)
188
189 netaddr_them = create_net_addr(ip_to_hex('0.0.0.0'), 0)
190 netaddr_us = create_net_addr(ip_to_hex('0.0.0.0'), 0)
191 version_message = (pack('<iQQ', PROTOCOL_VERSION, services, st)
192 + netaddr_them
193 + netaddr_us
194 + pack('<Q', nonce)
195 + create_var_str(self.user_agent)
196 + pack('<I', self.start_height)
197 + b'\x01')
198
199 self.logger.debug('Connecting to bitcoin peer at ' +
200 str(self.remote_hostport) + ' with proxy ' +
201 str(self.socks5_hostport))
202 setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0],
203 self.socks5_hostport[1], True)
204 self.sock = socksocket()
205 self.sock.settimeout(self.connect_timeout)
206 self.sock.connect(self.remote_hostport)
207 self.sock.sendall(self.create_message('version', version_message))
208
209 self.sock.settimeout(self.heartbeat_interval)
210 self.closed = False
211 try:
212 recv_buffer = b''
213 payload_length = -1 # -1 means waiting for header
214 command = None
215 checksum = None
216 while not self.closed:
217 try:
218 recv_data = self.sock.recv(4096)
219 if not recv_data or len(recv_data) == 0:
220 raise EOFError()
221 recv_buffer += recv_data
222 # this is O(N^2) scaling in time, another way would be to
223 # store in a list and combine at the end with "".join()
224 # but this isnt really timing critical so didnt optimize it
225
226 data_remaining = True
227 while data_remaining and not self.closed:
228 if payload_length == -1 and (len(recv_buffer)
229 >= HEADER_LENGTH):
230 net_magic, command, payload_length, checksum =\
231 unpack('<I12sI4s', recv_buffer[:HEADER_LENGTH])
232 recv_buffer = recv_buffer[HEADER_LENGTH:]
233 if net_magic != self.magic:
234 self.logger.debug('wrong MAGIC: ' +
235 hex(net_magic))
236 self.sock.close()
237 break
238 command = command.strip(b'\0')
239 else:
240 if payload_length >= 0 and (len(recv_buffer)
241 >= payload_length):
242 payload = recv_buffer[:payload_length]
243 recv_buffer = recv_buffer[payload_length:]
244 if btc.bin_dbl_sha256(payload)[:4] == checksum:
245 self.p2p_message_handler.handle_message(
246 self, command, payload_length, payload)
247 else:
248 self.logger.debug("wrong checksum, " +
249 "dropping " +
250 "message, cmd=" + command +
251 " payloadlen=" + str(payload_length))
252 payload_length = -1
253 data_remaining = True
254 else:
255 data_remaining = False
256 except socket.timeout:
257 self.p2p_message_handler.check_keepalive(self)
258 self.p2p_message_handler.on_heartbeat(self)
259 except EOFError as e:
260 self.closed = True
261 except IOError as e:
262 import traceback
263 self.logger.debug("logging traceback from %s: \n" %
264 traceback.format_exc())
265 self.closed = True
266 finally:
267 try:
268 self.sock.close()
269 except Exception as _:
270 pass
271
272 def close(self):
273 self.closed = True
274
275 def create_message(self, command, payload):
276 return (pack("<I12sI", self.magic, command.encode(), len(payload))
277 + btc.bin_dbl_sha256(payload)[:4] + payload)
278
279 class P2PBroadcastTx(P2PMessageHandler):
280 def __init__(self, txhex, logger, notify_queue):
281 P2PMessageHandler.__init__(self, logger)
282 self.txhex = bytes.fromhex(txhex)
283 self.txid = btc.bin_txhash(self.txhex)
284 self.uploaded_tx = False
285 self.time_marker = datetime.now()
286 self.connected = False
287 self.notify_queue = notify_queue
288
289 def on_recv_version(self, p2p, version, services, timestamp,
290 addr_recv_services, addr_recv_ip, addr_trans_services,
291 addr_trans_ip, addr_trans_port, user_agent, start_height, relay):
292 if not relay:
293 self.logger.debug('peer not accepting unconfirmed txes, trying ' +
294 'another')
295 # this happens if the other node is using blockonly=1
296 p2p.close()
297 if not services & NODE_WITNESS:
298 self.logger.debug('peer not accepting witness data, trying another')
299 p2p.close()
300
301 def on_connected(self, p2p):
302 MSG = 1 #msg_tx
303 inv_payload = pack('<BI', 1, MSG) + self.txid
304 p2p.sock.sendall(p2p.create_message('inv', inv_payload))
305 self.time_marker = datetime.now()
306 self.uploaded_tx = False
307 self.connected = True
308
309 def on_heartbeat(self, p2p):
310 self.logger.debug('broadcaster heartbeat')
311 VERACK_TIMEOUT = 40
312 GETDATA_TIMEOUT = 60
313 if not self.connected:
314 if ((datetime.now() - self.time_marker).total_seconds()
315 < VERACK_TIMEOUT):
316 return
317 self.logger.debug('timed out of waiting for verack')
318 else:
319 if ((datetime.now() - self.time_marker).total_seconds()
320 < GETDATA_TIMEOUT):
321 return
322 self.logger.debug('timed out in waiting for getdata, node ' +
323 'already has tx')
324 self.uploaded_tx = True
325 p2p.close()
326
327 def handle_message(self, p2p, command, length, payload):
328 P2PMessageHandler.handle_message(self, p2p, command, length, payload)
329 ptr = [0]
330 if command == b'getdata':
331 count = read_var_int(ptr, payload)
332 for _ in range(count):
333 ptr[0] += 4
334 hash_id = payload[ptr[0] : ptr[0] + 32]
335 ptr[0] += 32
336 if hash_id == self.txid:
337 p2p.sock.sendall(p2p.create_message('tx', self.txhex))
338 self.uploaded_tx = True
339 self.logger.debug("Uploaded transaction via tor to peer at "
340 + str(p2p.remote_hostport))
341 self.notify_queue.put(True)
342 ##make sure the packets really got through by sleeping
343 ##some kernels seem to send a RST packet on close() even
344 ##if theres still data in the send buffer
345 time.sleep(5)
346 p2p.close()
347
348 def broadcaster_thread(txhex, node_addrs, tor_hostport, network, logger,
349 start_height, notify_queue):
350 for node_addr in node_addrs:
351 remote_hostport = (node_addr["address"], node_addr["port"])
352 p2p_msg_handler = P2PBroadcastTx(txhex, logger, notify_queue)
353 p2p = P2PProtocol(p2p_msg_handler, remote_hostport,
354 network, logger, notify_queue, socks5_hostport=tor_hostport,
355 heartbeat_interval=20, start_height=start_height)
356 try:
357 p2p.run()
358 except IOError as e:
359 logger.debug("p2p.run() exited: " + repr(e))
360 continue
361 if p2p_msg_handler.uploaded_tx:
362 break
363 logger.debug("Exiting tor broadcast thread, uploaded_tx = " +
364 str(p2p_msg_handler.uploaded_tx))
365 if not p2p_msg_handler.uploaded_tx:
366 notify_queue.put(False)
367 return p2p_msg_handler.uploaded_tx
368
369 def chunk_list(d, n):
370 return [d[x:x + n] for x in range(0, len(d), n)]
371
372 def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
373 CONNECTION_THREADS = 8
374 CONNECTION_ATTEMPTS_PER_THREAD = 10
375
376 required_address_count = CONNECTION_ATTEMPTS_PER_THREAD * CONNECTION_THREADS
377 node_addrs_witness = []
378 while True:
379 try:
380 new_node_addrs = rpc.call("getnodeaddresses",
381 [3*required_address_count//2])
382 except JsonRpcError as e:
383 logger.debug(repr(e))
384 logger.error("Bitcoin Core v0.18.0 or higher is required "
385 "to broadcast through Tor")
386 return False
387 node_addrs_witness.extend(
388 [a for a in new_node_addrs if a["services"] & NODE_WITNESS]
389 )
390 logger.debug("len(new_node_addrs) = " + str(len(new_node_addrs)) +
391 " len(node_addrs_witness) = " + str(len(node_addrs_witness)))
392 if len(node_addrs_witness) > required_address_count:
393 break
394 node_addrs_chunks = chunk_list(
395 node_addrs_witness[:required_address_count],
396 CONNECTION_ATTEMPTS_PER_THREAD
397 )
398 notify_queue = queue.Queue()
399 start_height = rpc.call("getblockcount", [])
400 for node_addrs in node_addrs_chunks:
401 t = threading.Thread(target=broadcaster_thread,
402 args=(txhex, node_addrs, tor_hostport, network, logger,
403 start_height, notify_queue),
404 daemon=True)
405 t.start()
406 try:
407 success = notify_queue.get(block=True, timeout=20)
408 except queue.Empty:
409 logger.debug("Timed out getting notification for broadcasting "
410 + "transaction")
411 #the threads will maybe still continue to try broadcasting even
412 # after this timeout
413 #could time out at 20 seconds for any legitimate reason, tor is slow
414 # so no point failing, this timeout is just so the user doesnt have
415 # to stare at a seemingly-frozen dialog
416 success = True
417 return success