tzeromq.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tzeromq.py (18114B)
---
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2011 thomasv@gitorious
5 # Copyright (C) 2021 Ivan J. <parazyd@dyne.org>
6 # Copyright (C) 2018 Harm Aarts <harmaarts@gmail.com>
7 #
8 # Permission is hereby granted, free of charge, to any person
9 # obtaining a copy of this software and associated documentation files
10 # (the "Software"), to deal in the Software without restriction,
11 # including without limitation the rights to use, copy, modify, merge,
12 # publish, distribute, sublicense, and/or sell copies of the Software,
13 # and to permit persons to whom the Software is furnished to do so,
14 # subject to the following conditions:
15 #
16 # The above copyright notice and this permission notice shall be
17 # included in all copies or substantial portions of the Software.
18 #
19 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
21 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
23 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
24 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
25 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 # SOFTWARE.
27 import asyncio
28 import logging
29 import functools
30 import hashlib
31 import struct
32 from random import randint
33 from binascii import hexlify, unhexlify
34
35 import zmq
36 import zmq.asyncio
37
38 from .logging import Logger
39 from .libbitcoin_errors import make_error_code, ErrorCode
40 from .util import bh2u
41
42
43 from datetime import datetime
44 def __(msg):
45 print("***********************")
46 print("*** DEBUG %s ***: %s" % (datetime.now().strftime("%H:%M:%S"), msg))
47
48
49 def create_random_id():
50 """Generate a random request ID"""
51 max_uint32 = 4294967295
52 return randint(0, max_uint32)
53
54
55 def checksum(hash_, index):
56 """This method takes a transaction hash and an index and returns
57 a checksum.
58 This checksum is based on 49 bits starting from the 12th byte of the
59 reversed hash. Combined with the last 15 bits of the 4 byte index.
60 """
61 mask = 0xffffffffffff8000
62 magic_start_position = 12
63
64 hash_bytes = bytes.fromhex(hash_)[::-1]
65 last_20_bytes = hash_bytes[magic_start_position:]
66
67 assert len(hash_bytes) == 32
68 assert index < 2**32
69
70 hash_upper_49_bits = to_int(last_20_bytes) & mask
71 index_lower_15_bits = index & ~mask
72
73 return hash_upper_49_bits | index_lower_15_bits
74
75
76 def to_int(some_bytes):
77 return int.from_bytes(some_bytes, byteorder='little')
78
79
80 def pack_block_index(index):
81 if isinstance(index, str):
82 index = unhexlify(index)
83 assert len(index) == 32
84 return index
85 elif isinstance(index, int):
86 return struct.pack('<I', index)
87 else:
88 raise ValueError(f"Unknown index type {type(index)} v:{index}, should be int or bytearray")
89
90
91 def unpack_table(row_fmt, data):
92 # get the number of rows
93 row_size = struct.calcsize(row_fmt)
94 nrows = len(data) // row_size
95
96 # unpack
97 rows = []
98 for idx in range(nrows):
99 offset = idx * row_size
100 row = struct.unpack_from(row_fmt, data, offset)
101 rows.append(row)
102 return rows
103
104
105 class ClientSettings:
106 """Class implementing client settings"""
107 def __init__(self, timeout=10, context=None, loop=None):
108 __("Zeromq ClientSettings: __init__")
109 self._timeout = timeout
110 self._context = context
111 self._loop = loop
112
113 @property
114 def context(self):
115 """zmq context property"""
116 if not self._context:
117 ctx = zmq.asyncio.Context()
118 ctx.linger = 500 # in milliseconds
119 self._context = ctx
120 return self._context
121
122 @context.setter
123 def context(self, context):
124 self._context = context
125
126 @property
127 def timeout(self):
128 """Set to None for no timeout"""
129 return self._timeout
130
131 @timeout.setter
132 def timeout(self, timeout):
133 self._timeout = timeout
134
135
136 class Request:
137 """Class implementing a _send_ request.
138 This is either a simple request/response affair or a subscription.
139 """
140 def __init__(self, socket, command, data):
141 __("Zeromq Request: __init__")
142 self.id_ = create_random_id()
143 self.socket = socket
144 self.command = command
145 self.data = data
146 self.future = asyncio.Future()
147 self.queue = None
148
149 async def send(self):
150 """Send the zmq request"""
151 __(f"Zeromq Request: send: {self.command}, {self.data}")
152 request = [self.command, struct.pack('<I', self.id_), self.data]
153 await self.socket.send_multipart(request)
154
155 def is_subscription(self):
156 """If the request is a subscription, then the response to this
157 request is a notification.
158 """
159 return self.queue is not None
160
161 def __str__(self):
162 return 'Request(command, ID) {}, {:d}'.format(self.command,
163 self.id_)
164
165
166 class InvalidServerResponseException(Exception): pass
167
168
169 class Response:
170 """Class implementing a request response"""
171 def __init__(self, frame):
172 __("Zeromq Response: __init__")
173 if len(frame) != 3:
174 raise InvalidServerResponseException(
175 'Length of the frame was not 3: %d' % len(frame))
176
177 self.command = frame[0]
178 self.request_id = struct.unpack('<I', frame[1])[0]
179 error_code = struct.unpack('<I', frame[2][:4])[0]
180 self.error_code = make_error_code(error_code)
181 self.data = frame[2][4:]
182
183 def is_bound_for_queue(self):
184 return len(self.data) > 0
185
186 def __str__(self):
187 return 'Response(command, request ID, error code, data):' \
188 + ' %s, %d, %s, %s' \
189 % (self.command, self.request_id, self.error_code, self.data)
190
191
192 class RequestCollection:
193 """RequestCollection carries a list of Requests and matches incoming
194 responses to them.
195 """
196 def __init__(self, socket, loop):
197 __("Zeromq RequestCollection: __init__")
198 self._socket = socket
199 self._requests = {}
200 self._task = asyncio.ensure_future(self._run(), loop=loop)
201
202 async def _run(self):
203 while True:
204 __("Zeromq RequestCollection: _run loop iteration")
205 await self._receive()
206
207 async def stop(self):
208 """Stops listening for incoming responses (or subscription) messages).
209 Returns the number of _responses_ expected but which are now dropped
210 on the floor.
211 """
212 __("Zeromq RequestCollection: stop")
213 self._task.cancel()
214 try:
215 await self._task
216 except asyncio.CancelledError:
217 return len(self._requests)
218
219 async def _receive(self):
220 __("Zeromq RequestCollection: receive")
221 frame = await self._socket.recv_multipart()
222 response = Response(frame)
223
224 if response.request_id in self._requests:
225 self._handle_response(response)
226 else:
227 __("Zeromq RequestCollection: receive: unhandled response %s:%s" % (response.command, response.request_id))
228
229 def _handle_response(self, response):
230 __("Zeromq RequestCollection: _handle_response")
231 request = self._requests[response.request_id]
232
233 if request.is_subscription():
234 if response.is_bound_for_queue():
235 # TODO: decode the data into something usable
236 request.queue.put_nowait(response.data)
237 else:
238 request.future.set_result(response)
239 else:
240 self.delete_request(request)
241 request.future.set_result(response)
242
243 def add_request(self, request):
244 __("Zeromq RequestCollection: add_request")
245 # TODO: we should maybe check if the request.id_ is unique
246 self._requests[request.id_] = request
247
248 def delete_request(self, request):
249 __("Zeromq RequestCollection: delete_request")
250 del self._requests[request.id_]
251
252
253 class Client:
254 """This class represents a connection to a libbitcoin server.
255 hostname -- the server DNS name to connect to.
256 ports -- a dictionary containing four keys; query/heartbeat/block/tx
257 """
258 # def __init__(self, hostname, ports, settings=ClientSettings()):
259 def __init__(self, hostname, ports, loop):
260 __("Zeromq Client: __init__")
261 self._hostname = hostname
262 self._ports = ports
263 # self._settings = settings
264 self._settings = ClientSettings(loop=loop)
265 self._query_socket = self._create_query_socket()
266 self._block_socket = self._create_block_socket()
267 self._request_collection = RequestCollection(
268 self._query_socket, self._settings._loop)
269
270 async def stop(self):
271 __("Zeromq Client: stop")
272 self._query_socket.close()
273 self._block_socket.close()
274 return await self._request_collection.stop()
275
276 def _create_block_socket(self):
277 __("Zeromq Client: _create_block_socket")
278 socket = self._settings.context.socket(
279 zmq.SUB, io_loop=self._settings._loop) # pylint: disable=E1101
280 socket.connect(self.__server_url(self._hostname,
281 self._ports['block']))
282 socket.setsockopt_string(zmq.SUBSCRIBE, '') # pylint: disable=E1101
283 return socket
284
285 def _create_query_socket(self):
286 __("Zeromq Client: _create_query_socket")
287 socket = self._settings.context.socket(
288 zmq.DEALER, io_loop=self._settings._loop) # pylint: disable=E1101
289 socket.connect(self.__server_url(self._hostname,
290 self._ports['query']))
291 return socket
292
293 async def _subscription_request(self, command, data):
294 __("Zeromq Client: _subscription_request")
295 request = await self._request(command, data)
296 request.queue = asyncio.Queue(loop=self._settings._loop)
297 error_code, _ = await self._wait_for_response(request)
298 return error_code, request.queue
299
300 async def _simple_request(self, command, data):
301 __("Zeromq Client: _simple_request")
302 return await self._wait_for_response(
303 await self._request(command, data))
304
305 async def _request(self, command, data):
306 """Make a generic request. Both options are byte objects
307 specified like b'blockchain.fetch_block_header' as an example.
308 """
309 __("Zeromq Client: _request")
310 request = Request(self._query_socket, command, data)
311 await request.send()
312 self._request_collection.add_request(request)
313 return request
314
315 async def _wait_for_response(self, request):
316 __("Zeromq Client: _wait_for_response")
317 try:
318 response = await asyncio.wait_for(request.future,
319 self._settings.timeout)
320 except asyncio.TimeoutError:
321 self._request_collection.delete_request(request)
322 return ErrorCode.channel_timeout, None
323
324 assert response.command == request.command
325 assert response.request_id == request.id_
326 return response.error_code, response.data
327
328 @staticmethod
329 def __server_url(hostname, port):
330 return 'tcp://' + hostname + ':' + str(port)
331
332 async def last_height(self):
333 __("Zeromq Client: last_height")
334 command = b'blockchain.fetch_last_height'
335 error_code, data = await self._simple_request(command, b'')
336 if error_code:
337 return error_code, None
338 height = struct.unpack('<I', data)[0]
339 return error_code, height
340
341 async def subscribe_to_blocks(self, queue):
342 __("Zeromq Client: subscribe_to_blocks")
343 asyncio.ensure_future(self._listen_for_blocks(queue))
344 return queue
345
346 async def _listen_for_blocks(self, queue):
347 __("Zeromq Client: _listen_for_blocks")
348 _ec, tip = await self.last_height()
349 _, header = await self.block_header(tip)
350 queue.put_nowait((0, tip, header))
351 while True:
352 __("Zeromq Client: _listen_for_blocks loop iteration")
353 frame = await self._block_socket.recv_multipart()
354 seq = struct.unpack('<H', frame[0])[0]
355 height = struct.unpack('<I', frame[1])[0]
356 block_data = frame[2]
357 block_header = block_data[:80]
358 # block_header = raw[:80]
359 # version = block_header[:4]
360 # prev_merkle_root = block_header[4:36]
361 # merkle_root = block_header[36:68]
362 # timestamp = block_header[68:72]
363 # bits = block_header[72:76]
364 # nonce = blockheader[76:80]
365 queue.put_nowait((seq, height, block_header))
366
367 async def _subscribe_to_scripthash(self, sh, queue):
368 __("Zeromq Client: _subscribe_to_scripthash (stub)")
369 # TODO: libbitcoin here get history and make status (also review this entire func)
370 # https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-subscribe
371 # https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status
372 # https://parazyd.org/git/electrum-obelisk/file/electrumobelisk/protocol.py.html#l57
373 # queue.put_nowait((something,))
374 # while True:
375 # recv and put in queue
376
377
378 async def block_header(self, index):
379 """Fetches the block header by height or integer index"""
380 __("Zeromq Client: block_header")
381 command = b'blockchain.fetch_block_header'
382 data = pack_block_index(index)
383 error_code, data = await self._simple_request(command, data)
384 if error_code:
385 return error_code, None
386 return error_code, data
387
388 async def block_transaction_hashes(self, index):
389 __("Zeromq Client: block_transaction_hashes")
390 command = b'blockchain.fetch_block_transaction_hashes'
391 data = pack_block_index(index)
392 error_code, data = await self._simple_request(command, data)
393 if error_code:
394 return error_code, None
395 data = unpack_table('32s', data)
396 return error_code, data
397
398 async def transaction(self, hash_):
399 __("Zeromq Client: transaction")
400 command = b'blockchain.fetch_transaction2'
401 error_code, data = await self._simple_request(
402 command, bytes.fromhex(hash_)[::-1])
403 if error_code:
404 return error_code, None
405 return None, data
406
407 async def mempool_transaction(self, hash_):
408 __("Zeromq Client: mempool_transaction")
409 command = b'transaction_pool.fetch_transaction2'
410 error_code, data = await self._simple_request(
411 command, bytes.fromhex(hash_)[::-1])
412 if error_code:
413 return error_code, None
414 return None, bh2u(data)
415
416 async def broadcast_transaction(self, rawtx):
417 __("Zeromq Client: broadcast_transaction")
418 __(rawtx)
419 command = b'transaction_pool.broadcast'
420 return await self._simple_request(command, unhexlify(rawtx))
421
422 async def history4(self, scripthash, height=0):
423 __("Zeromq Client: history4")
424 command = b'blockchain.fetch_history4'
425 decoded_address = unhexlify(scripthash)[::-1] # TODO: check byte order
426 error_code, raw_points = await self._simple_request(
427 command, decoded_address + struct.pack('<I', height))
428 if error_code:
429 return error_code, None
430
431 def make_tuple(row):
432 kind, tx_hash, index, height, value = row
433 return (
434 kind,
435 #COutPoint(tx_hash, index), # TODO: libbitcoin XXX:
436 (tx_hash, index),
437 height,
438 value,
439 checksum(tx_hash[::-1].hex(), index),
440 )
441
442 rows = unpack_table('<B32sIIQ', raw_points)
443 points = [make_tuple(row) for row in rows]
444 correlated_points = Client.__correlate(points)
445 return None, correlated_points
446
447 async def balance(self, scripthash):
448 __("Zeromq Client: balance")
449 error, hist = await self.history4(scripthash)
450 if error:
451 return error, None
452 utxo = Client.__receives_without_spends(hist)
453 return None, functools.reduce(
454 lambda accumulator, point: accumulator + point['value'], utxo, 0)
455
456 async def unspent(self, scripthash):
457 __("Zeromq Client: unspent")
458 error, hist = await self.history4(scripthash)
459 if error:
460 return error, None
461 return None, Client.__receives_without_spends(hist)
462
463 @staticmethod
464 def __receives_without_spends(hist):
465 return (point for point in hist if 'spent' not in point)
466
467 @staticmethod
468 def __correlate(points):
469 transfers, checksum_to_index = Client.__find_receives(points)
470 transfers = Client.__correlate_spends_to_receives(
471 points, transfers, checksum_to_index)
472 return transfers
473
474 @staticmethod
475 def __correlate_spends_to_receives(points, transfers, checksum_to_index):
476 for point in points:
477 if point[0] == 0: # receive
478 continue
479
480 spent = {
481 'hash': point[1].hash,
482 'height': point[2],
483 'index': point[1].n,
484 }
485 if point[3] not in checksum_to_index:
486 transfers.append({'spent': spent})
487 else:
488 transfers[checksum_to_index[point[3]]]['spent'] = spent
489
490 return transfers
491
492 @staticmethod
493 def __find_receives(points):
494 transfers = []
495 checksum_to_index = {}
496
497 for point in points:
498 if point[0] == 1: # spent
499 continue
500
501 transfers.append({
502 'received': {
503 'hash': point[1].hash,
504 'height': point[2],
505 'index': point[1].n,
506 },
507 'value': point[3],
508 })
509
510 checksum_to_index[point[4]] = len(transfers) - 1
511
512 return transfers, checksum_to_index