Author

Topic: Attacking ECDSA by Lattice Sieving: Bridging Gap with Fourier Analysis Attacks (Read 578 times)

member
Activity: 873
Merit: 22
$$P2P BTC BRUTE.JOIN NOW ! https://uclck.me/SQPJk
to test if the script is functional there are 3 signatures with the k nonce of just 20 bits.

r =  0x6f007fe0bc2a1a0f6d944b8d4a2353e48833e1f6be178f87f293068b25d6d96e
s =  0xea5459e9fb80101fbc261b90c59bf20dc497f5f71c90356c9530d035d32bd023
z =  0x840131231b57268049ccbd6b2c84408251b935cb951c069ecb86cf8d4755bf34
k =  0xb37b1

r =  0xdc52f02499d8859b4be4a517898e23f42bd409d5d1ef0f9e49edf775c49aab93
s =  0x120e526332349b7ebe20565240feeb224ed1f0886990d70513a83cb6d941c648
z =  0xa4075d10756be846ad1156e878de2d43b8585e3baf7617b1f164e4c52956419
k =  0x9d59

r =  0xb52b260367f658f3527bfca721358daaa8a115d9566ba966ffdff77e103fd6b1
s =  0x1e7454c7321b93e0c8b85bfdc5801a2e0f29dd62724c3cd1b26a699a486661c5
z =  0xbd0636aac51faf14a7d3439ea4abd1d28095f2836e95d5eea3fbe68a4baaee52
k =  0x2a78a

private key; 0x1a838b13505b26867

pubkey: 0430210c23b1a047bc9bdbb13448e67deddc108946de6de639bcc75d47c0216b1be383c4a8ed4fa c77c0d2ad737d8499a362f483f8fe39d1e86aaed578a9455dfc

you try scrypt ? Result ?

Try recover nonce or priv from this sighnatures ?


r=861a00ee94221ccd4a4557f558f989642657187aad741dcb4298a13571859d49

nonce start with 0x4000000...

s = 1

z= 0

r = fda403bfcb379b95a27ac76754fe77808e96b43419e33583cf5aba7d49c72f2d

s = 1

z = 0

nonce start with 0x04000000...

r =3031b819c8bbdb7484bf70ca79542da5566900dca414b9519302ca74b332bf72

nonce start with 0x00400000...

s = 1

z = 0


Nonce from 254 to 255 bit

What is a full nonce or privkey  ?

Pubkey were this R used:
    
    049c48b9367e0f42212c05e97d7af3534fe94673713c1cded5393fd6705f89fa5a05f0f60069f50 0e81490f26bcaf25d45e21a1a95954a416711555b05fab15941
    


newbie
Activity: 10
Merit: 0
to test if the script is functional there are 3 signatures with the k nonce of just 20 bits.

r =  0x6f007fe0bc2a1a0f6d944b8d4a2353e48833e1f6be178f87f293068b25d6d96e
s =  0xea5459e9fb80101fbc261b90c59bf20dc497f5f71c90356c9530d035d32bd023
z =  0x840131231b57268049ccbd6b2c84408251b935cb951c069ecb86cf8d4755bf34
k =  0xb37b1

r =  0xdc52f02499d8859b4be4a517898e23f42bd409d5d1ef0f9e49edf775c49aab93
s =  0x120e526332349b7ebe20565240feeb224ed1f0886990d70513a83cb6d941c648
z =  0xa4075d10756be846ad1156e878de2d43b8585e3baf7617b1f164e4c52956419
k =  0x9d59

r =  0xb52b260367f658f3527bfca721358daaa8a115d9566ba966ffdff77e103fd6b1
s =  0x1e7454c7321b93e0c8b85bfdc5801a2e0f29dd62724c3cd1b26a699a486661c5
z =  0xbd0636aac51faf14a7d3439ea4abd1d28095f2836e95d5eea3fbe68a4baaee52
k =  0x2a78a

private key; 0x1a838b13505b26867

pubkey: 0430210c23b1a047bc9bdbb13448e67deddc108946de6de639bcc75d47c0216b1be383c4a8ed4fa c77c0d2ad737d8499a362f483f8fe39d1e86aaed578a9455dfc

member
Activity: 873
Merit: 22
$$P2P BTC BRUTE.JOIN NOW ! https://uclck.me/SQPJk
newbie
Activity: 0
Merit: 0
I made some adjustments to the code, but there are small nuances with the selection of signatures

Code:
import os
os.environ['CUDA_PATH'] = '/usr/local/cuda-11.5'

import time
import logging
import multiprocessing
import cupy as cp
import numpy as np
import pickle
import ecdsa
from ecdsa.ellipticcurve import Point
from ecdsa.curves import SECP256k1
import itertools

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Print CUDA version for verification
logger.info(f"CUDA version: {cp.cuda.runtime.runtimeGetVersion()}")
logger.info(f"CuPy version: {cp.__version__}")

# Definition of the SECP256k1 curve and the base point G
curve = SECP256k1.curve
G = SECP256k1.generator
n = SECP256k1.order
p = SECP256k1.curve.p()
b = SECP256k1.curve.b()

def ecdsa_to_hnp(signatures, l):
    logger.info(f"Converting {len(signatures)} ECDSA signatures to HNP instances")
    q = n
    w = q // (2**(l+1))
    samples = []
    for sig in signatures:
        r = int(sig['r'], 16)
        s = int(sig['s'], 16)
        z = int(sig['z'], 16)

        t = (r * pow(s, -1, q)) % q
        a = (z * pow(s, -1, q) - w) % q
       
        samples.append((t, a))
    logger.info(f"Conversion complete. Generated {len(samples)} HNP samples")
    return samples

def construct_lattice(samples, x, l):
    logger.info(f"Constructing lattice with {len(samples)} samples, x={x}, l={l}")
    m = len(samples)
    q = int(n)
    w = q // (2**(l+1))
   
    B = np.zeros((m+1, m+1), dtype=object)
    for i in range(m):
        B[i, i] = q
    B[m-1, :m] = np.array([x * t for t, _ in samples], dtype=object)
    B[m-1, m-1] = x
    B[m, :m] = np.array([a for _, a in samples], dtype=object)
    B[m, m] = int((w * w // 3)**0.5)   
    logger.info(f"Lattice construction complete. Dimension: {B.shape[0]} x {B.shape[1]}")
    return B

def gpu_lll(B, delta=0.99):
    logger.info(f"Starting GPU-accelerated LLL reduction on {B.shape[0]}x{B.shape[1]} matrix")
    Q, R = qr(B)
    n = B.shape[0]
    k = 1
    while k < n:
        for j in range(k-1, -1, -1):
            mu = cp.round(R[j,k] / R[j,j])
            if mu != 0:
                R[:j+1,k] -= mu * R[:j+1,j]
                Q[:,k] -= mu * Q[:,j]
        if delta * R[k-1,k-1]**2 > R[k,k]**2 + R[k-1,k]**2:
            R[[k-1,k]] = R[[k,k-1]]
            Q[:,[k-1,k]] = Q[:,[k,k-1]]
            k = max(k-1, 1)
        else:
            k += 1
    logger.info("GPU-accelerated LLL reduction complete")
    return Q @ R

def gpu_bkz(B, block_size=20):
    logger.info(f"Starting GPU-accelerated BKZ reduction with block size {block_size}")
    n = B.shape[0]
    for i in range(0, n - block_size + 1):
        logger.debug(f"Processing block {i}/{n - block_size}")
        block = B[i:i+block_size, i:i+block_size]
        block = gpu_lll(block)
        B[i:i+block_size, i:i+block_size] = block
    logger.info("GPU-accelerated BKZ reduction complete")
    return B

def gauss_sieve(B, target_norm, max_list_size=None):
    logger.info(f"Starting Gauss sieve with target norm {target_norm}")
    L = []
    S = []
    C = B.get().tolist()  # Convert CuPy array to list
   
    while C:
        v = C.pop(0)
        v = cp.array(v)  # Convert back to CuPy array for GPU operations
        if cp.linalg.norm(v) > target_norm:
            continue
       
        if not L:
            L.append(v)
            continue
       
        changed = True
        while changed:
            changed = False
            for w in L:
                if cp.linalg.norm(v - w) < cp.linalg.norm(v):
                    v = v - w
                    changed = True
                    break
                elif cp.linalg.norm(v + w) < cp.linalg.norm(v):
                    v = v + w
                    changed = True
                    break
       
        if cp.linalg.norm(v) <= target_norm:
            L.append(v)
            if max_list_size and len(L) > max_list_size:
                L.sort(key=lambda x: cp.linalg.norm(x))
                L = L[:max_list_size]
        else:
            S.append(v)
   
    logger.info(f"Gauss sieve complete. Found {len(L)} vectors")
    return L

def interval_reduction(low, high, samples, q, l):
    logger.info(f"Starting interval reduction: [{low}, {high}]")
    M = high - low + 1
    N = int(np.log2(M).ceil())
    R = [(low, high)]
   
    for t, a in samples[:N]:
        R_new = []
        for interval in R:
            low, high = interval
            n_min = ((t * low - a - q/(2**(l+1))) // q).ceil()
            n_max = ((t * high - a + q/(2**(l+1))) // q).floor()
            for n in range(n_min, n_max + 1):
                new_low = max(low, ((a + n*q - q/(2**(l+1))) // t).ceil())
                new_high = min(high, ((a + n*q + q/(2**(l+1))) // t).floor())
                if new_low <= new_high:
                    R_new.append((new_low, new_high))
        R = R_new
   
    logger.info(f"Interval reduction complete. Resulting intervals: {len(R)}")
    return R

def pre_screening(alpha0, samples, q, l, x):
    logger.debug(f"Pre-screening candidate α₀: {alpha0}")
    w = q // (2**(l+1))
    result = all(abs(((x * t * alpha0 - a + q//2) % q) - q//2) <= w + q//(2**(l+4)) for t, a in samples)
    logger.debug(f"Pre-screening result: {'Passed' if result else 'Failed'}")
    return result

def improved_linear_predicate(v, samples, q, l, tau):
    logger.debug(f"Checking improved linear predicate for v: {v}")
    if v[0] == 0 or abs(v[0]) > q/(2**(l+1)) or abs(v[1]) != tau:
        logger.debug("Predicate failed initial checks")
        return None
   
    k0 = -np.sign(v[1]) * v[0] + q/(2**(l+1))
    alpha = (Mod(samples[0][1] + k0, q) * Mod(samples[0][0], q)**(-1)) % q
   
    N = 2 * int(np.log2(q).ceil())
    M = sum(1 for t, a in samples[:N] if abs((t * alpha - a) % q) < q/(2**l))
   
    if M > N * (1 - np.log2(q)/(2**l) + 2**(-l)) / 2:
        logger.debug(f"Predicate passed. Potential α: {alpha}")
        return int(alpha)
    logger.debug("Predicate failed final check")
    return None

def decomposition_predicate(v, samples, q, l, tau, x):
    logger.debug(f"Checking decomposition predicate for v: {v}")
    if v[0] == 0 or abs(v[0]) > q/(2**(l+1)) or abs(v[1]) != tau:
        logger.debug("Decomposition predicate failed initial checks")
        return None
   
    low = -np.sign(v[1]) * v[0] - x//2
    high = -np.sign(v[1]) * v[0] + x//2
   
    R = interval_reduction(low, high, samples, q, l)
   
    for interval in R:
        for h in range(interval[0], interval[1] + 1):
            alpha = improved_linear_predicate((h, -tau), samples, q, l, tau)
            if alpha is not None and pre_screening(alpha, samples, q, l, x):
                logger.info(f"Decomposition predicate found potential solution: {alpha}")
                return alpha
   
    logger.debug("Decomposition predicate failed to find a solution")
    return None

def progressive_bkz_sieve(B, predicate, start_dim=20, step=5, max_dim=None):
    if max_dim is None:
        max_dim = B.shape[0]
   
    for d in range(start_dim, max_dim + 1, step):
        logger.info(f"Processing dimension {d}")
        B_sub = B[:d, :d]
       
        B_sub = gpu_bkz(B_sub, block_size=min(20, d))
       
        target_norm = cp.sqrt(4/3) * cp.linalg.det(B_sub)**(1/d)
        logger.info(f"Target norm for dimension {d}: {target_norm}")
        sieved_vectors = gauss_sieve(B_sub, target_norm, max_list_size=d*10)
       
        logger.info(f"Checking predicates for {len(sieved_vectors)} vectors")
        for v in sieved_vectors:
            sk = predicate(v[-2:])
            if sk is not None:
                logger.info(f"Found potential solution: {sk}")
                return sk
   
    logger.info("Progressive BKZ-sieve completed without finding a solution")
    return None


def try_nonce_patterns(args):
    signatures, l, x, pubkey, patterns = args
    logger.info(f"Trying nonce pattern: {patterns}")
    modified_sigs = [{**sig, 'r': hex(int(sig['r'], 16) ^ (p << (256 - l)))[2:].zfill(64)}
                     for sig, p in zip(signatures, patterns)]
    samples = ecdsa_to_hnp(modified_sigs, l)
    B = construct_lattice(samples, x, l)
   
    def predicate(v):
        return decomposition_predicate(v, samples, int(n), l, int(B[-1, -1].get()), x)
   
    sk = progressive_bkz_sieve(B, predicate, start_dim=20)
   
    if sk:
        recovered_pubkey_point = sk * G
        recovered_pubkey = '04' + hex(recovered_pubkey_point[0])[2:].zfill(64) + hex(recovered_pubkey_point[1])[2:].zfill(64)
        if recovered_pubkey == pubkey:
            logger.info(f"Successfully recovered private key: {hex(sk)}")
            return sk
    logger.info(f"Failed to recover private key for pattern: {patterns}")
    return None

def save_progress(completed_patterns):
    logger.info(f"Saving progress. Completed patterns: {len(completed_patterns)}")
    with open("progress.pkl", "wb") as f:
        pickle.dump(completed_patterns, f)

def load_progress():
    if os.path.exists("progress.pkl"):
        with open("progress.pkl", "rb") as f:
            completed_patterns = pickle.load(f)
        logger.info(f"Loaded progress. Completed patterns: {len(completed_patterns)}")
        return completed_patterns
    logger.info("No previous progress found. Starting from scratch.")
    return set()

def main():
    signatures = [
        {
            'r': '1e8e175bd4fe465c4be9999840cc5bc50d8da9195d10e3350ebf388e429df874',
            's': 'dedd1a4422041d6d2a5c2dabba51d45b4bb9d233baed5cd4caf54e3d0a80d47e',
            'z': '941e563da856ee60678e59c7fdb71d3ed476c9322b3fcd4133dd677d07c82ff7',
        },
        {
            'r': '48939db78d89e510ce280efb8ec47c11af39bcd58d59b87b690a33b0322fd73e',
            's': '62eda7479b658e06bb83d0135d69553d838ca9f7bd63ed7294ed59e2bd37c492',
            'z': 'ce4b9ad74ce61b4ac087f2b0404d313f61d86eed00923806b0d83e9a4559140f',
        },
        {
            'r': '58347d292315a1c7a273b66d7bde268f2c8daad892bddcfe77df4891af48e4ea',
            's': 'c3bbcf5912b25738f7bd2379b57e40f290ca84ed87380c05326b49635f7ad1fc',
            'z': 'fd74693fed61ef0cd6b7bd7284057fe747ee29e39b663520b227e56f8ce1f9bc',
        }
    ]
   
    l = 7  # 6-bit guessing
    x = 2**15
    pubkey = "04f22b7f1e9990eeac8570517a567d46c1f173f6670244cca6184f59e350312129671e4f5a614e164d151a5836bab8684e24bfe247141b7e30251bb7290e275e69"
   
    all_patterns = list(itertools.product(range(2**l), repeat=len(signatures)))
    completed_patterns = load_progress()
    patterns_to_try = [p for p in all_patterns if p not in completed_patterns]
   
    num_processors = min(24, multiprocessing.cpu_count())
    logger.info(f"Using {num_processors} processors")

    try:
        with multiprocessing.Pool(num_processors) as pool:
            args = [(signatures, l, x, pubkey, pattern) for pattern in patterns_to_try]
            for i, result in enumerate(pool.imap_unordered(try_nonce_patterns, args)):
                if result is not None:
                    print(f"Successfully recovered private key: {hex(result)}")
                    return
                completed_patterns.add(patterns_to_try[i])
                if (i+1) % 1000 == 0:
                    save_progress(completed_patterns)
                    logger.info(f"Completed {i+1}/{len(patterns_to_try)} pattern combinations")
    except KeyboardInterrupt:
        logger.info("Interrupted by user. Saving progress...")
        save_progress(completed_patterns)
   
    print("Failed to recover the private key.")

if __name__ == "__main__":
    main()

member
Activity: 127
Merit: 14
Life aint interesting without any cuts and bruises
Well after going through it the only point where i feel your method could fall short is in the potential noise in signature
The ECDSA signature  may have noise and if the signature you are using is not perfectly aligned with the assumptions made in your model it could disrupt the entire process and this will make it hard for the lattice based approach to solve the problem accurately
Aside that I really can't think of anything
You should consider examining the impact of signature noise

I was warn about this yesterday. I totally forgot about it when i was coding. thank you so much for the reminder. I appreciate it.
member
Activity: 127
Merit: 14
Life aint interesting without any cuts and bruises
I had chance upon this research.. https://eprint.iacr.org/2024/296

The author had through multiple signatures found the private key through less than 4 bits of nonce leakage.

My problem is, i do not know the k nonce. there is no leakage. I do not know the private key. i do not know the message that was used..  So i modify the script accordingly.  i believe if it takes less than 4 bits of nonce leakage to eventually gain the private key,

I could 'leak' the 6 bits of the k nonce. try on every pattern (Total This script will try all 262,144 (64^3) possible combinations) and eventually get the right 'leak' for the full k nonce.

Notwithstanding using GPU, CPU ONLY, it will take me roughly 22 hours to complete the full pattern.

I tried following the method describe in the research very closely and as much as i can.

What i want to ask is, why do you think my method won't work?
Where am i going wrong with my code?

Thank You. I Appreciate your opinion.

Code:
 Written in Sagemath / Python

import os
os.environ['CUDA_PATH'] = '/usr/local/cuda-11.5'

import time
import logging
import multiprocessing
import cupy as cp
import pickle

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Print CUDA version for verification
logger.info(f"CUDA version: {cp.cuda.runtime.runtimeGetVersion()}")
logger.info(f"CuPy version: {cp.__version__}")

p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
a, b = 0, 7
E = EllipticCurve(GF(p), [a, b])
G = E(0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798,
      0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8)
n = G.order()

def ecdsa_to_hnp(signatures, l):
    logger.info(f"Converting {len(signatures)} ECDSA signatures to HNP instances")
    q = n
    w = q // (2**(l+1))
    samples = []
    for sig in signatures:
        r = int(sig['r'], 16)
        s = int(sig['s'], 16)
        z = int(sig['z'], 16)
        t = (Mod(r, q) * Mod(s, q)**(-1)) % q
        a = (Mod(z, q) * Mod(s, q)**(-1) - w) % q
        samples.append((t, a))
    logger.info(f"Conversion complete. Generated {len(samples)} HNP samples")
    return samples

def construct_lattice(samples, x, l):
    logger.info(f"Constructing lattice with {len(samples)} samples, x={x}, l={l}")
    m = len(samples)
    q = int(n)
    w = q // (2**(l+1))
    B = matrix(ZZ, m+1, m+1)
    for i in range(m-1):
        B[i, i] = q
    B[m-1, :m-1] = [x * t for t, _ in samples]
    B[m-1, m-1] = x
    B[m, :m-1] = [a for _, a in samples]
    B[m, m] = int((w * w // 3)**0.5)
    logger.info(f"Lattice construction complete. Dimension: {B.nrows()} x {B.ncols()}")
    return cp.array(B.numpy())

def gpu_lll(B, delta=0.99):
    logger.info(f"Starting GPU-accelerated LLL reduction on {B.shape[0]}x{B.shape[1]} matrix")
    Q, R = qr(B)
    n = B.shape[0]
    k = 1
    while k < n:
        for j in range(k-1, -1, -1):
            mu = cp.round(R[j,k] / R[j,j])
            if mu != 0:
                R[:j+1,k] -= mu * R[:j+1,j]
                Q[:,k] -= mu * Q[:,j]
        if delta * R[k-1,k-1]**2 > R[k,k]**2 + R[k-1,k]**2:
            R[[k-1,k]] = R[[k,k-1]]
            Q[:,[k-1,k]] = Q[:,[k,k-1]]
            k = max(k-1, 1)
        else:
            k += 1
    logger.info("GPU-accelerated LLL reduction complete")
    return Q @ R

def gpu_bkz(B, block_size=20):
    logger.info(f"Starting GPU-accelerated BKZ reduction with block size {block_size}")
    n = B.shape[0]
    for i in range(0, n - block_size + 1):
        logger.debug(f"Processing block {i}/{n - block_size}")
        block = B[i:i+block_size, i:i+block_size]
        block = gpu_lll(block)
        B[i:i+block_size, i:i+block_size] = block
    logger.info("GPU-accelerated BKZ reduction complete")
    return B

def gauss_sieve(B, target_norm, max_list_size=None):
    logger.info(f"Starting Gauss sieve with target norm {target_norm}")
    L = []
    S = []
    C = B.get().tolist()  # Convert CuPy array to list
    
    while C:
        v = C.pop(0)
        v = cp.array(v)  # Convert back to CuPy array for GPU operations
        if cp.linalg.norm(v) > target_norm:
            continue
        
        if not L:
            L.append(v)
            continue
        
        changed = True
        while changed:
            changed = False
            for w in L:
                if cp.linalg.norm(v - w) < cp.linalg.norm(v):
                    v = v - w
                    changed = True
                    break
                elif cp.linalg.norm(v + w) < cp.linalg.norm(v):
                    v = v + w
                    changed = True
                    break
        
        if cp.linalg.norm(v) <= target_norm:
            L.append(v)
            if max_list_size and len(L) > max_list_size:
                L.sort(key=lambda x: cp.linalg.norm(x))
                L = L[:max_list_size]
        else:
            S.append(v)
    
    logger.info(f"Gauss sieve complete. Found {len(L)} vectors")
    return L

def interval_reduction(low, high, samples, q, l):
    logger.info(f"Starting interval reduction: [{low}, {high}]")
    M = high - low + 1
    N = int(np.log2(M).ceil())
    R = [(low, high)]
    
    for t, a in samples[:N]:
        R_new = []
        for interval in R:
            low, high = interval
            n_min = ((t * low - a - q/(2**(l+1))) // q).ceil()
            n_max = ((t * high - a + q/(2**(l+1))) // q).floor()
            for n in range(n_min, n_max + 1):
                new_low = max(low, ((a + n*q - q/(2**(l+1))) // t).ceil())
                new_high = min(high, ((a + n*q + q/(2**(l+1))) // t).floor())
                if new_low <= new_high:
                    R_new.append((new_low, new_high))
        R = R_new
    
    logger.info(f"Interval reduction complete. Resulting intervals: {len(R)}")
    return R

def pre_screening(alpha0, samples, q, l, x):
    logger.debug(f"Pre-screening candidate α₀: {alpha0}")
    w = q // (2**(l+1))
    result = all(abs(((x * t * alpha0 - a + q//2) % q) - q//2) <= w + q//(2**(l+4)) for t, a in samples)
    logger.debug(f"Pre-screening result: {'Passed' if result else 'Failed'}")
    return result

def improved_linear_predicate(v, samples, q, l, tau):
    logger.debug(f"Checking improved linear predicate for v: {v}")
    if v[0] == 0 or abs(v[0]) > q/(2**(l+1)) or abs(v[1]) != tau:
        logger.debug("Predicate failed initial checks")
        return None
    
    k0 = -np.sign(v[1]) * v[0] + q/(2**(l+1))
    alpha = (Mod(samples[0][1] + k0, q) * Mod(samples[0][0], q)**(-1)) % q
    
    N = 2 * int(np.log2(q).ceil())
    M = sum(1 for t, a in samples[:N] if abs((t * alpha - a) % q) < q/(2**l))
    
    if M > N * (1 - np.log2(q)/(2**l) + 2**(-l)) / 2:
        logger.debug(f"Predicate passed. Potential α: {alpha}")
        return int(alpha)
    logger.debug("Predicate failed final check")
    return None

def decomposition_predicate(v, samples, q, l, tau, x):
    logger.debug(f"Checking decomposition predicate for v: {v}")
    if v[0] == 0 or abs(v[0]) > q/(2**(l+1)) or abs(v[1]) != tau:
        logger.debug("Decomposition predicate failed initial checks")
        return None
    
    low = -np.sign(v[1]) * v[0] - x//2
    high = -np.sign(v[1]) * v[0] + x//2
    
    R = interval_reduction(low, high, samples, q, l)
    
    for interval in R:
        for h in range(interval[0], interval[1] + 1):
            alpha = improved_linear_predicate((h, -tau), samples, q, l, tau)
            if alpha is not None and pre_screening(alpha, samples, q, l, x):
                logger.info(f"Decomposition predicate found potential solution: {alpha}")
                return alpha
    
    logger.debug("Decomposition predicate failed to find a solution")
    return None

def progressive_bkz_sieve(B, predicate, start_dim=20, step=5, max_dim=None):
    if max_dim is None:
        max_dim = B.shape[0]
    
    for d in range(start_dim, max_dim + 1, step):
        logger.info(f"Processing dimension {d}")
        B_sub = B[:d, :d]
        
        B_sub = gpu_bkz(B_sub, block_size=min(20, d))
        
        target_norm = cp.sqrt(4/3) * cp.linalg.det(B_sub)**(1/d)
        logger.info(f"Target norm for dimension {d}: {target_norm}")
        sieved_vectors = gauss_sieve(B_sub, target_norm, max_list_size=d*10)
        
        logger.info(f"Checking predicates for {len(sieved_vectors)} vectors")
        for v in sieved_vectors:
            sk = predicate(v[-2:])
            if sk is not None:
                logger.info(f"Found potential solution: {sk}")
                return sk
    
    logger.info("Progressive BKZ-sieve completed without finding a solution")
    return None

def try_nonce_patterns(args):
    signatures, l, x, pubkey, patterns = args
    logger.info(f"Trying nonce pattern: {patterns}")
    modified_sigs = [{**sig, 'r': hex(int(sig['r'], 16) ^ (p << (256 - l)))[2:].zfill(64)}
                     for sig, p in zip(signatures, patterns)]
    samples = ecdsa_to_hnp(modified_sigs, l)
    B = construct_lattice(samples, x, l)
    
    def predicate(v):
        return decomposition_predicate(v, samples, int(n), l, int(B[-1, -1].get()), x)
    
    sk = progressive_bkz_sieve(B, predicate, start_dim=20)
    
    if sk:
        recovered_pubkey_point = sk * G
        recovered_pubkey = '04' + hex(recovered_pubkey_point[0])[2:].zfill(64) + hex(recovered_pubkey_point[1])[2:].zfill(64)
        if recovered_pubkey == pubkey:
            logger.info(f"Successfully recovered private key: {hex(sk)}")
            return sk
    logger.info(f"Failed to recover private key for pattern: {patterns}")
    return None

def save_progress(completed_patterns):
    logger.info(f"Saving progress. Completed patterns: {len(completed_patterns)}")
    with open("progress.pkl", "wb") as f:
        pickle.dump(completed_patterns, f)

def load_progress():
    if os.path.exists("progress.pkl"):
        with open("progress.pkl", "rb") as f:
            completed_patterns = pickle.load(f)
        logger.info(f"Loaded progress. Completed patterns: {len(completed_patterns)}")
        return completed_patterns
    logger.info("No previous progress found. Starting from scratch.")
    return set()


def main():
    signatures = [
        {
            'r': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            's': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            'z': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        },
        {
            'r': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            's': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            'z': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        },
        {
            'r': '5caxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            's': '21bxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            'z': 'hexadecimalxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        }
    ]
    
    l = 6  # 6-bit guessing
    x = 2**15
    pubkey = "04xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    
    all_patterns = list(itertools.product(range(2**l), repeat=len(signatures)))
    completed_patterns = load_progress()
    patterns_to_try = [p for p in all_patterns if p not in completed_patterns]
    
    num_processors = min(24, multiprocessing.cpu_count())
    logger.info(f"Using {num_processors} processors")

    try:
        with multiprocessing.Pool(num_processors) as pool:
            args = [(signatures, l, x, pubkey, pattern) for pattern in patterns_to_try]
            for i, result in enumerate(pool.imap_unordered(try_nonce_patterns, args)):
                if result is not None:
                    print(f"Successfully recovered private key: {hex(result)}")
                    return
                completed_patterns.add(patterns_to_try[i])
                if (i+1) % 1000 == 0:
                    save_progress(completed_patterns)
                    logger.info(f"Completed {i+1}/{len(patterns_to_try)} pattern combinations")
    except KeyboardInterrupt:
        logger.info("Interrupted by user. Saving progress...")
        save_progress(completed_patterns)
    
    print("Failed to recover the private key.")

if __name__ == "__main__":
    main()


The code is pretty much self explanatory. i already extended some definitions that has no #comments so its easier for you to read and understand what the code is achieving..

PS: i had remove some library imports.
Jump to: