Author

Topic: I've implemented various algorithms. Bitcoin Core node via RPC (Read 21 times)

?
Activity: -
Merit: -
I installed Bitcoin Core on my laptop, synchronized it, and successfully ran it. Additionally, I developed a Python program to perform solo mining by connecting to Bitcoin Core via RPC.

In this program, I implemented an algorithm that generates various nonce values and applies them to the mining process. While I am aware that the probability of successfully mining a block is extremely low, I would like to confirm whether my solo mining Python program is technically correct and operates without errors on the actual Bitcoin network.

It would be greatly helpful if you could review my program and let me know if there are any issues or potential improvements. I sincerely appreciate your time and assistance with this request.

Thank you once again, and I look forward to your response.

I've implemented various algorithms. Could you please verify whether they actually function when integrated with a real Bitcoin Core node via RPC

import hashlib
import requests
import json
import struct
import time
import random
import threading
from binascii import hexlify, unhexlify
from typing import Optional, Dict, List, Tuple
from concurrent.futures import ThreadPoolExecutor
import base58

# RPC settings
RPC_USER = "userID"
RPC_PASSWORD = "userPasswd"
RPC_URL = "http://127.0.0.1:8332/"

# User-defined address
MINING_ADDRESS = "your_bitcoin_address"

# Elliptic Curve SECP256K1 settings
P = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
A = 0
B = 7
Gx = 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798
Gy = 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8
G = (Gx, Gy)

# Global variables
current_template = None
template_lock = threading.Lock()
stop_event = threading.Event()

def mod_inverse(a, p):
    """Calculate modular inverse."""
    return pow(a, p - 2, p)

def point_addition(P1, P2):
    """Elliptic curve point addition."""
    if P1 is None:
        return P2
    if P2 is None:
        return P1

    x1, y1 = P1
    x2, y2 = P2

    if x1 == x2 and y1 != y2:
        return None

    if x1 == x2:
        s = (3 * x1 * x1 + A) * mod_inverse(2 * y1, P) % P
    else:
        s = (y2 - y1) * mod_inverse(x2 - x1, P) % P

    x3 = (s * s - x1 - x2) % P
    y3 = (s * (x1 - x3) - y1) % P
    return (x3, y3)

def scalar_multiplication(k, point):
    """Scalar multiplication."""
    result = None
    addend = point

    while k:
        if k & 1:
            result = point_addition(result, addend)
        addend = point_addition(addend, addend)
        k >>= 1

    return result

def decode_bitcoin_address(address: str) -> bytes:
    """Convert a Bitcoin address to a script pubkey."""
    try:
        decoded = base58.b58decode(address)
        version = decoded[0]
        hash160 = decoded[1:-4]

        if version == 0:  # P2PKH
            return b'\x76\xa9\x14' + hash160 + b'\x88\xac'
        elif version == 5:  # P2SH
            return b'\xa9\x14' + hash160 + b'\x87'
        else:
            raise ValueError(f"Unsupported address version: {version}")
    except Exception as e:
        print(f"Address decoding error: {e}")
        return b'\x76\xa9\x14' + b'\x00' * 20 + b'\x88\xac'

def create_merkle_root(transactions: List[Dict]) -> bytes:
    """Create Merkle root."""
    if not transactions:
        return None

    def double_sha256(data: bytes) -> bytes:
        return hashlib.sha256(hashlib.sha256(data).digest()).digest()

    hashes = [unhexlify(tx["txid"])[::-1] for tx in transactions]

    while len(hashes) > 1:
        if len(hashes) % 2 == 1:
            hashes.append(hashes[-1])
        new_hashes = []
        for i in range(0, len(hashes), 2):
            new_hashes.append(double_sha256(hashes + hashes[i+1]))
        hashes = new_hashes

    return hashes[0][::-1] if hashes else None

def create_coinbase_transaction(height: int, coinbase_value: int, mining_address: str) -> bytes:
    """Create a coinbase transaction."""
    height_script = bytes([len(hex(height)[2:])]) + \
                   int(height).to_bytes((height.bit_length() + 7) // 8, byteorder='little')

    script_pubkey = decode_bitcoin_address(mining_address)

    tx = (
        b"\x01\x00\x00\x00"  # version
        + b"\x01"  # input count
        + b"\x00" * 32  # prev_output_hash
        + b"\xff" * 4   # prev_output_index
        + bytes([len(height_script)])  # script length
        + height_script  # height script
        + b"\xff\xff\xff\xff"  # sequence
        + b"\x01"  # output count
        + struct.pack("        + bytes([len(script_pubkey)])  # script length
        + script_pubkey  # output script
        + b"\x00\x00\x00\x00"  # locktime
    )
    return tx

def build_block_header(version: int, prev_block: str, merkle_root: bytes, timestamp: int, bits: str) -> bytes:
    """Create a block header."""
    header = struct.pack("    header += unhexlify(prev_block)[::-1]
    header += merkle_root
    header += struct.pack("    header += b"\x00\x00\x00\x00"  # nonce placeholder
    return header

class MiningAlgorithm:
    """Parent class for common mining algorithms."""
    def __init__(self, block_header: bytes, target: int):
        self.block_header = block_header
        self.target = target  # Target value computed from bits
        self.best_nonce = None
        self.best_hash = None
        self.start_time = time.time()

    def check_nonce(self, nonce: int) -> Tuple[bool, int]:
        """Compute hash for the given nonce and compare it with the target."""
        header = self.block_header[:-4] + struct.pack("        current_hash = int.from_bytes(
            hashlib.sha256(hashlib.sha256(header).digest()).digest()[::-1],
            byteorder="big"
        )
        # Valid only if current_hash is smaller than self.target
        return current_hash < self.target, current_hash

    def print_status(self, iterations: int, extra_info: str = ""):
        """Print mining status."""
        elapsed = time.time() - self.start_time
        hash_rate = iterations / max(elapsed, 0.001)
        print(f"Iteration {iterations:,d}: Hash rate: {hash_rate:.2f} H/s {extra_info}")

class PrimeBasedMining(MiningAlgorithm):
    """Prime-based mining."""
    def mine(self, max_iterations: int = 1000000) -> Optional[int]:
        print("Starting prime-based mining...")
        primes = [
            p
            for p in range(2, 10**6)
            if all(p % d != 0 for d in range(2, int(p**0.5) + 1))
        ]
        iterations = 0
        for prime in primes:
            nonce = prime % (2**32)
            success, current_hash = self.check_nonce(nonce)
            if success:
                self.best_nonce = nonce
                self.best_hash = current_hash
                return nonce
            iterations += 1
            if iterations % 10000 == 0:
                self.print_status(iterations)
        return None

class GeneticMining(MiningAlgorithm):
    """Genetic algorithm-based mining."""
    def mine(self, max_generations: int = 1000) -> Optional[int]:
        print("Starting genetic algorithm-based mining...")
        population = [random.randint(0, 2**32 - 1) for _ in range(100)]
        for generation in range(max_generations):
            fitness_scores = [self.target - self.check_nonce(n)[1] for n in population]
            best_nonce = population[fitness_scores.index(max(fitness_scores))]
            success, current_hash = self.check_nonce(best_nonce)
            if success:
                self.best_nonce = best_nonce
                self.best_hash = current_hash
                return best_nonce
            if generation % 10 == 0:
                self.print_status(generation)
            population = [random.randint(0, 2**32 - 1) for _ in range(100)]
        return None

class BinarySearchMining(MiningAlgorithm):
    """Binary search-based mining."""
    def mine(self, max_iterations: int = 1000000) -> Optional[int]:
        print("Starting binary search-based mining...")
        left, right = 0, 2**32 - 1
        iterations = 0
        while left <= right:
            mid = (left + right) // 2
            success, current_hash = self.check_nonce(mid)
            if success:
                self.best_nonce = mid
                self.best_hash = current_hash
                return mid
            if current_hash > self.target:
                right = mid - 1
            else:
                left = mid + 1
            iterations += 1
            if iterations % 1000 == 0:
                self.print_status(iterations)
        return None

class EfficientRandomSearchMining(MiningAlgorithm):
    """Random search-based mining."""
    def mine(self, max_iterations: int = 1000000) -> Optional[int]:
        print("Starting random search-based mining...")
        iterations = 0
        for _ in range(max_iterations):
            nonce = random.randint(0, 2**32 - 1)
            success, current_hash = self.check_nonce(nonce)
            if success:
                self.best_nonce = nonce
                self.best_hash = current_hash
                return nonce
            iterations += 1
            if iterations % 1000 == 0:
                self.print_status(iterations)
        return None

class EllipticCurveMining(MiningAlgorithm):
    """Elliptic curve-based mining."""
    def mine(self, max_iterations: int = 1000000) -> Optional[int]:
        print("Starting elliptic curve-based mining...")
        private_key = random.randint(1, P - 1)
        current_point = scalar_multiplication(private_key, G)
        iterations = 0
        while iterations < max_iterations:
            x, _ = current_point
            nonce = x % (2**32)
            success, current_hash = self.check_nonce(nonce)
            if success:
                self.best_nonce = nonce
                self.best_hash = current_hash
                return nonce
            current_point = point_addition(current_point, G)
            iterations += 1
            if iterations % 1000 == 0:
                self.print_status(iterations)
        return None

def rpc_request(method: str, params: List = None) -> Dict:
    """RPC request."""
    payload = json.dumps({"jsonrpc": "2.0", "id": "mining", "method": method, "params": params or []})
    response = requests.post(RPC_URL, auth=(RPC_USER, RPC_PASSWORD), data=payload, headers={"content-type": "application/json"})
    response.raise_for_status()
    return response.json()["result"]

def get_new_template(interval: int = 60):
    """Request a new template."""
    global current_template
    while not stop_event.is_set():
        current_template = rpc_request("getblocktemplate", [{"rules": ["segwit"]}])
        print(f"New template updated: {current_template['height']}")
        time.sleep(interval)

def mine_block(block_template: Dict, algorithm: str) -> Optional[Dict]:
    """Mine a block using the specified algorithm."""
    version = block_template["version"]
    prev_block = block_template["previousblockhash"]
    timestamp = int(time.time())
    bits = block_template["bits"]
    target = int(block_template["target"], 16)

    # Create coinbase transaction
    coinbase_value = sum(tx.get("fee", 0) for tx in block_template["transactions"]) + \
                     block_template["coinbasevalue"]

    coinbase_tx = create_coinbase_transaction(
        block_template["height"],
        coinbase_value,
        MINING_ADDRESS
    )

    transactions = [{
        "txid": hexlify(
            hashlib.sha256(
                hashlib.sha256(coinbase_tx).digest()
            ).digest()
        ).decode()
    }]
    transactions.extend(block_template["transactions"])

    # Create Merkle root
    merkle_root = create_merkle_root(transactions)
    block_header = build_block_header(version, prev_block, merkle_root, timestamp, bits)

    # Select algorithm and run
    if algorithm == "prime":
        miner = PrimeBasedMining(block_header, target)
    elif algorithm == "genetic":
        miner = GeneticMining(block_header, target)
    elif algorithm == "binary":
        miner = BinarySearchMining(block_header, target)
    elif algorithm == "random":
        miner = EfficientRandomSearchMining(block_header, target)
    elif algorithm == "elliptic":
        miner = EllipticCurveMining(block_header, target)
    else:
        raise ValueError("Unknown algorithm: " + algorithm)

    nonce = miner.mine()

    if nonce is not None:
        # Return the block header and related information
        final_header = block_header[:-4] + struct.pack("        return {
            "header": hexlify(final_header).decode(),
            "coinbase": hexlify(coinbase_tx).decode(),
            "transactions": [tx["txid"] for tx in block_template["transactions"]]
        }

    return None

def main():
    """Main function."""
    print("Starting Bitcoin mining.")
    print("1. Prime-based")
    print("2. Genetic algorithm")
    print("3. Binary search")
    print("4. Random search")
    print("5. Elliptic curve")
    choice = input("Choose (1/2/3/4/5): ").strip()
    algorithm = {"1": "prime", "2": "genetic", "3": "binary", "4": "random", "5": "elliptic"}[choice]

    last_template = None
    template_request_time = 0
    MIN_TEMPLATE_INTERVAL = 5  # Minimum interval for requesting a template (seconds)

    while True:
        try:
            current_time = time.time()
           
            # Check if enough time has passed since the last template request
            if current_time - template_request_time >= MIN_TEMPLATE_INTERVAL:
                # Retrieve a new block template
                block_template = rpc_request("getblocktemplate", [{"rules": ["segwit"]}])
                template_request_time = current_time

                # Only handle if different from the previous template
                if last_template is None or block_template['height'] != last_template['height']:
                    print(f"\nReceived a new block template - Height: {block_template['height']}")
                    last_template = block_template

                    # Mine using the selected algorithm
                    block = mine_block(block_template, algorithm)

                    if block:
                        print("\nBlock found! Submitting...")
                        result = rpc_request("submitblock", [block["header"]])
                        if result is None:
                            print("Block submission successful!")
                        else:
                            print(f"Block submission failed: {result}")
            else:
                # Continue mining with the existing template
                if last_template:
                    block = mine_block(last_template, algorithm)
                    if block:
                        print("\nBlock found! Submitting...")
                        result = rpc_request("submitblock", [block["header"]])
                        if result is None:
                            print("Block submission successful!")
                        else:
                            print(f"Block submission failed: {result}")

        except Exception as e:
            print(f"An error occurred: {e}")
            time.sleep(5)
            continue

if __name__ == "__main__":
    main()
Jump to: