Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
yewman committed Sep 4, 2024
1 parent fd51a05 commit 2ddc787
Show file tree
Hide file tree
Showing 16 changed files with 618 additions and 90 deletions.
10 changes: 10 additions & 0 deletions src/core/hash.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ const sig = @import("../sig.zig");
const Sha256 = std.crypto.hash.sha2.Sha256;
const Allocator = std.mem.Allocator;

pub fn hashValue(value: []const u8) Hash {
return hashValues([_]u8{value});
}

pub fn hashValues(values: [][]const u8) Hash {
var bytes: [Hash.size]u8 = undefined;
for (values) |value| Sha256.hash(value, &bytes, .{});
return .{ .data = bytes };
}

pub const Hash = extern struct {
data: [size]u8,

Expand Down
3 changes: 3 additions & 0 deletions src/gossip/data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1395,13 +1395,15 @@ pub const ThreadSafeContactInfo = struct {
shred_version: u16,
gossip_addr: ?SocketAddr,
rpc_addr: ?SocketAddr,
tvu_addr: ?SocketAddr,

pub fn fromContactInfo(contact_info: ContactInfo) ThreadSafeContactInfo {
return .{
.pubkey = contact_info.pubkey,
.shred_version = contact_info.shred_version,
.gossip_addr = contact_info.getSocket(.gossip),
.rpc_addr = contact_info.getSocket(.rpc),
.tvu_addr = contact_info.getSocket(.turbine_recv),
};
}

Expand All @@ -1411,6 +1413,7 @@ pub const ThreadSafeContactInfo = struct {
.shred_version = legacy_contact_info.shred_version,
.gossip_addr = legacy_contact_info.gossip,
.rpc_addr = legacy_contact_info.rpc,
.tvu_addr = legacy_contact_info.turbine_recv,
};
}
};
Expand Down
16 changes: 16 additions & 0 deletions src/ledger/shred.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const Allocator = std.mem.Allocator;

const BitFlags = sig.utils.bitflags.BitFlags;
const Hash = sig.core.Hash;
const Pubkey = sig.core.Pubkey;
const Nonce = sig.core.Nonce;
const Packet = sig.net.Packet;
const Signature = sig.core.Signature;
Expand Down Expand Up @@ -530,6 +531,21 @@ pub const ShredId = struct {
slot: Slot,
index: u32,
shred_type: sig.ledger.shred.ShredType,

pub fn seed(self: *const ShredId, leader: *const Pubkey) [32]u8 {
const slot_bytes: [8]u8 = undefined;
std.mem.writeInt(u64, slot_bytes, self.slot, .little);
const index_bytes: [4]u8 = undefined;
std.mem.writeInt(u32, index_bytes, self.index, .little);
const shred_bytes: [1]u8 = undefined;
std.mem.writeInt(u8, shred_bytes, @intCast(self.shred_type), .little);
return hashv(&.{
slot_bytes,
shred_bytes,
index_bytes,
leader.data,
}).data;
}
};

pub const ErasureSetId = struct {
Expand Down
1 change: 1 addition & 0 deletions src/net/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub const socket_utils = @import("socket_utils.zig");

pub const IpAddr = net.IpAddr;
pub const SocketAddr = net.SocketAddr;
pub const SocketAddrSpace = net.SocketAddrSpace;
pub const Packet = packet.Packet;
pub const SocketThread = socket_utils.SocketThread;

Expand Down
36 changes: 36 additions & 0 deletions src/net/net.zig
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,27 @@ pub const SocketAddrV6 = struct {
}
};

pub const SocketAddrSpace = enum {
Unspecified,
Global,

pub fn init(allow_private_addr: bool) SocketAddrSpace {
if (allow_private_addr) .Unspecified else .Global;
}

pub fn check(self: *const SocketAddrSpace, addr: *const SocketAddr) bool {
if (self == &.Unspecified) return true;
switch (addr.ip()) {
.ipv4 => |ip| {
return !(ip.isPrivate() or ip.isLoopback());
},
.ipv6 => |ip| {
return !ip.isLoopback();
},
}
}
};

pub const Ipv4Addr = struct {
octets: [4]u8,

Expand All @@ -376,6 +397,17 @@ pub const Ipv4Addr = struct {
};
}

pub fn isPrivate(self: *const Self) bool {
if (self.octets[0] == 10) return true;
if (self.octets[0] == 172 and self.octets[1] >= 16 and self.octets[1] <= 31) return true;
if (self.octets[0] == 192 and self.octets[1] == 168) return true;
return false;
}

pub fn isLoopback(self: *const Self) bool {
return self.octets[0] == 127;
}

pub fn eql(self: *const Self, other: *const Self) bool {
return std.mem.eql(u8, self.octets[0..], other.octets[0..]);
}
Expand Down Expand Up @@ -403,6 +435,10 @@ pub const Ipv6Addr = struct {
};
}

pub fn isLoopback(self: *const Self) bool {
return std.mem.eql(u8, &self.octets, &[_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 });
}

pub fn eql(self: *const Self, other: *const Self) bool {
return std.mem.eql(u8, &self.octets, &other.octets);
}
Expand Down
21 changes: 21 additions & 0 deletions src/poh/entry.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const std = @import("std");
const sig = @import("../lib.zig");

const Instant = std.time.Instant;

const Hash = sig.core.Hash;
const VersionedTransaction = sig.core.VersionedTransaction;

/// Analogous to Entry [TODO]
pub const Entry = struct {
/// The number of hashes since the previous Entry ID.
num_hashes: u64,

/// The SHA-256 hash `num_hashes` after the previous Entry ID.
hash: Hash,

/// An unordered list of transactions that were observed before the Entry ID was
/// generated. They may have been observed before a previous Entry ID but were
/// pushed back into this list to ensure deterministic interpretation of the ledger.
transactions: std.ArrayList(VersionedTransaction),
};
5 changes: 5 additions & 0 deletions src/poh/lib.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub const poh = @import("poh.zig");
pub const entry = @import("entry.zig");

pub const Poh = poh.Poh;
pub const Entry = entry.Entry;
77 changes: 77 additions & 0 deletions src/poh/poh.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
const std = @import("std");
const sig = @import("../lib.zig");

const Instant = std.time.Instant;

const Hash = sig.core.Hash;

const hashValue = sig.core.hash.hashValue;
const hashValues = sig.core.hash.hashValues;

pub const Poh = struct {
hash: Hash,
num_hashes: u64,
hashes_per_tick: u64,
remaining_hashes: u64,
tick_number: u64,
slot_start_time: Instant,

pub fn init(hash: Hash, hashes_per_tick: u64, tick_number: u64) !Poh {
return .{
.hash = hash,
.num_hashes = 0,
.hashes_per_tick = hashes_per_tick,
.remaining_hashes = hashes_per_tick,
.tick_number = tick_number,
.slot_start_time = try Instant.now(),
};
}

/// Repeatedly rehash until tick is required or max number of hashes completed
/// Returns true if a tick is required
pub fn increment(self: *Poh, max_num_hashes: u64) bool {
const num_hashes = @min(self.remaining_hashes - 1, max_num_hashes);
for (num_hashes) |_| self.hash = hashValue(self.hash.bytes);
self.num_hashes += num_hashes;
self.remaining_hashes -= num_hashes;
return self.remaining_hashes == 1;
}

/// Incorporate a mixin hash into the Poh chain
/// Return null if a tick is required
pub fn record(self: *Poh, mixin: Hash) ?PohEntry {
if (self.remaining_hashes == 1) return null;
self.hash = hashValues([_][]const u8{ self.hash.bytes, mixin.bytes });
const num_hashes = self.num_hashes + 1;
self.num_hashes = 0;
self.remaining_hashes -= 1;
return .{
.num_hashes = num_hashes,
.hash = self.hash,
};
}

/// Emit PohEntry and prepare Poh for next tick
/// Return null if their are still remaining hashes
pub fn tick(self: *const Poh) ?PohEntry {
self.hash = hashValue(self.hash.bytes);
self.num_hashes += 1;
self.remaining_hashes -= 1;
if (self.remaining_hashes != 0) return null;

const num_hashes = self.num_hashes;
self.remaining_hashes = self.hashes_per_tick;
self.num_hashes = 0;
self.tick_number += 1;

return .{
.num_hashes = num_hashes,
.hash = self.hash,
};
}
};

pub const PohEntry = struct {
num_hashes: u64,
hash: Hash,
};
8 changes: 8 additions & 0 deletions src/rand/weighted_shuffle.zig
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ pub fn WeightedShuffle(comptime T: type) type {
self.zeros.deinit();
}

pub fn clone(self: *const WeightedShuffle(T)) WeightedShuffle(T) {
return .{
.tree = self.tree.clone(),
.weight = self.weight,
.zeros = self.zeros.clone(),
};
}

// Removes given weight at index k.
pub fn remove(self: *WeightedShuffle(T), k: usize, weight: T) void {
std.debug.assert(self.weight >= weight);
Expand Down
2 changes: 2 additions & 0 deletions src/sig.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub const version = @import("version/version.zig");
pub const time = @import("time/lib.zig");
pub const common = @import("common/lib.zig");
pub const geyser = @import("geyser/lib.zig");
pub const turbine = @import("turbine/lib.zig");
pub const poh = @import("poh/lib.zig");

pub const VALIDATOR_DIR = "validator/";
pub const TEST_DATA_DIR = "data/test-data/";
Expand Down
3 changes: 3 additions & 0 deletions src/turbine/broadcast/lib.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub const service = @import("service.zig");

pub const StandardBroadcast = @import("standard_broadcast.zig");
110 changes: 110 additions & 0 deletions src/turbine/broadcast/service.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
const std = @import("std");
const sig = @import("../../lib.zig");

const AtomicBool = std.atomic.Value(bool);
const KeyPair = std.crypto.sign.Ed25519.KeyPair;
const Instant = std.time.Instant;

const Duration = sig.time.Duration;
const Channel = sig.sync.Channel;
const BlockstoreReader = sig.ledger.BlockstoreReader;
const BlockstoreWriter = sig.ledger.BlockstoreWriter;
const ShredInserter = sig.ledger.insert_shred.ShredInserter;
const Shred = sig.ledger.shred.Shred;
const Hash = sig.core.Hash;
const ReedSolomonCache = sig.ledger.shredder.ReedSolomonCache;
const Slot = sig.core.Slot;
const PohEntry = sig.poh.Entry;

// MISSING DATA STRUCTURES
const Bank = struct {};
const UdpSocket = struct {};
const Blockstore = struct {};
const BankForks = struct {};
const WorkingBankEntry = struct {
bank: Bank,
entry: PohEntry,
last_tick_height: u64,
};

/// Runs the broadcast service using a broadcaster implementation
/// The broadcaster must implement receive, transmit, and record functions
pub fn runBroadcastService(
allocator: std.mem.Allocator,
broadcaster: anytype,
sockets: std.ArrayList(UdpSocket),
keypair: KeyPair,
receiver: Channel(WorkingBankEntry),
retransmit_receiver: Channel(Slot),
blockstore: Blockstore,
bank_forks: BankForks,
exit: *AtomicBool,
) !void {
const socket_sender, const socket_receiver = Channel(std.ArrayList(Shred)).init(allocator, 100);
const blockstore_sender, const blockstore_receiver = Channel(std.ArrayList(Shred)).init(allocator, 100);

const broadcast_handle = try std.Thread.spawn(
.{},
broadcaster.receive,
.{
keypair,
blockstore,
receiver,
socket_sender,
blockstore_sender,
exit,
},
);

var transmit_handles = try std.ArrayList(std.Thread).init(allocator);
for (sockets.items) |socket| {
try transmit_handles.append(try std.Thread.spawn(
.{},
broadcaster.transmit,
.{
&socket_receiver,
&bank_forks,
&socket,
exit,
},
));
}

const record_handle = try std.Thread.spawn(
.{},
broadcaster.record,
.{
&blockstore_receiver,
&blockstore,
exit,
},
);

const retransmit_handle = try std.Thread.spawn(
.{},
retransmit,
.{
&blockstore,
&retransmit_receiver,
&socket_sender,
exit,
},
);

broadcast_handle.join();
for (transmit_handles) |h| h.join();
record_handle.join();
retransmit_handle.join();
}

fn retransmit(
blockstore: *Blockstore,
retransmit_receiver: *Channel(Slot),
socket_sender: *Channel(std.ArrayList(Shred)),
exit: *AtomicBool,
) !void {
_ = blockstore;
_ = retransmit_receiver;
_ = socket_sender;
_ = exit;
}
Loading

0 comments on commit 2ddc787

Please sign in to comment.