16. Асинхронное общение нод с помощью ZeroMQ - Часть 2Доделал общение нод с помощью ZeroMQ, принцип работы по итогу получается такой:
- нода имеет ip и открытый порт 5556 (по умолчанию)
- ZeroMQ создает сокет в который сваливается все что возможно от других нод (каждое сообщение помечено идентификатором и само тело сообщения)
- сервер ноды создает асинхронные потоки, которые все как пираньи хватают новые пакеты из сокет и обрабатывают их (так создается максимальная скорость обработки данных)
Взаимодействие общение нод работает, проверено на двух серверах. Так же сделан вектор файл (файл который хранит данные по другим нодам, чтобы понимать, кто в сети и с кем общаться для консенсуса), об этом напишу в следующей статье.
Пример кода:
class ServerTask(threading.Thread):
def __init__(self):
threading.Thread.__init__ (self)
def run(self):
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind('tcp://*:5556')
backend = context.socket(zmq.DEALER)
backend.bind('inproc://backend')
workers = []
for i in range(5):
worker = ServerWorker(context)
worker.start()
workers.append(worker)
zmq.proxy(frontend, backend)
frontend.close()
backend.close()
context.term()
class ServerWorker(threading.Thread):
def __init__(self, context):
threading.Thread.__init__ (self)
self.context = context
def run(self):
worker = self.context.socket(zmq.DEALER)
worker.connect('inproc://backend')
identity_w = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000)) # это внутренний id, не zmq
print('Worker '+identity_w+' started')
while True:
ident, msg = worker.recv_multipart()
print('Worker received %s from %s' % (msg, ident))
self.test_task(worker,ident,msg)
#replies = randint(0,3)
#for i in range(replies):
#time.sleep((randint(0,2)))
#msg = eval(msg)
#msg['worker'] = identity_w
#msg = str(msg).encode()
#worker.send_multipart([ident, msg])
worker.close()
def test_task(self,worker,ident,msg):
msg = eval(msg)
# запрос last vector для поиска nonce
if 'nonce' in msg['task']:
print('Запрос vector')
msg['task'] = 'vector-file'
msg['msg'] = node.vector_only_my_node_GET()
msg = str(msg).encode()
worker.send_multipart([ident, msg])
return
if 'vector-pow' in msg['task']:
print('Запрос vector pow')
msg = node.vector_check_POW(msg['msg'])
msg = str(msg).encode()
worker.send_multipart([ident, msg])
worker.send_multipart([ident, PPP_END])
return
Теперь приступил к разработке способа хранения блоков в блокчейне.
- это классический блокчейн, связка блоков с помощью хеша
- обработка на лету в redis
- внедрение алгоритма консенсуса POH
Материалы:
Хорошая статья о многопоточности и асинхронностиАсинхронный Python: различные формы конкурентностиZeroMQ The Asynchronous Client/Server Pattern