Skip to content

Commit

Permalink
wip: add node's role
Browse files Browse the repository at this point in the history
  • Loading branch information
trungnotchung committed Nov 12, 2024
1 parent 59e1603 commit 013ecc7
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 120 deletions.
151 changes: 88 additions & 63 deletions packages/blueprints/src/AddWinsSet/index.ts
Original file line number Diff line number Diff line change
@@ -1,77 +1,102 @@
import {
ActionType,
type CRO,
type Operation,
type ResolveConflictsType,
SemanticsType,
type Vertex,
ActionType,
type CRO,
type Operation,
type ResolveConflictsType,
SemanticsType,
type Vertex,
} from "@topology-foundation/object";
import { Role } from "@topology-foundation/node";

export class AddWinsSet<T> implements CRO {
operations: string[] = ["add", "remove"];
state: Map<T, boolean>;
semanticsType = SemanticsType.pair;
operations: string[] = ["add", "remove"];
state: Map<T, boolean>;
roles: Map<string, number>;
semanticsType = SemanticsType.pair;

constructor() {
this.state = new Map<T, boolean>();
}
constructor(nodeIds: string[]) {
this.state = new Map<T, boolean>();
this.roles = new Map<string, number>();
for (const nodeId of nodeIds) {
this.roles.set(nodeId, Role.ADMIN);
}
}

private _add(value: T): void {
if (!this.state.get(value)) this.state.set(value, true);
}
private _add(value: T): void {
if (!this.state.get(value)) this.state.set(value, true);
}

add(value: T): void {
this._add(value);
}
add(value: T): void {
this._add(value);
}

private _remove(value: T): void {
if (this.state.get(value)) this.state.set(value, false);
}
private _remove(value: T): void {
if (this.state.get(value)) this.state.set(value, false);
}

remove(value: T): void {
this._remove(value);
}
remove(value: T): void {
this._remove(value);
}

contains(value: T): boolean {
return this.state.get(value) === true;
}
contains(value: T): boolean {
return this.state.get(value) === true;
}

values(): T[] {
return Array.from(this.state.entries())
.filter(([_, exists]) => exists)
.map(([value, _]) => value);
}
values(): T[] {
return Array.from(this.state.entries())
.filter(([_, exists]) => exists)
.map(([value, _]) => value);
}

// in this case is an array of length 2 and there are only two possible operations
resolveConflicts(vertices: Vertex[]): ResolveConflictsType {
// Both must have operations, if not return no-op
if (
vertices[0].operation &&
vertices[1].operation &&
vertices[0].operation?.type !== vertices[1].operation?.type &&
vertices[0].operation?.value === vertices[1].operation?.value
) {
return vertices[0].operation.type === "add"
? { action: ActionType.DropRight }
: { action: ActionType.DropLeft };
}
return { action: ActionType.Nop };
}
// in this case is an array of length 2 and there are only two possible operations
resolveConflicts(vertices: Vertex[]): ResolveConflictsType {
// Both must have operations, if not return no-op
if (
vertices[0].operation &&
vertices[1].operation &&
vertices[0].operation?.type !== vertices[1].operation?.type &&
vertices[0].operation?.value === vertices[1].operation?.value
) {
return vertices[0].operation.type === "add"
? { action: ActionType.DropRight }
: { action: ActionType.DropLeft };
}
return { action: ActionType.Nop };
}

// merged at HG level and called as a callback
mergeCallback(operations: Operation[]): void {
this.state = new Map<T, boolean>();
for (const op of operations) {
switch (op.type) {
case "add":
if (op.value !== null) this._add(op.value);
break;
case "remove":
if (op.value !== null) this._remove(op.value);
break;
default:
break;
}
}
}
// merged at HG level and called as a callback
mergeCallback(operations: Operation[]): void {
this.state = new Map<T, boolean>();
for (const op of operations) {
switch (op.type) {
case "add":
if (op.value !== null) this._add(op.value);
break;
case "remove":
if (op.value !== null) this._remove(op.value);
break;
default:
break;
}
}
}

hasRole(nodeId: string, role: number): boolean {
if (!this.roles.has(nodeId)) {
return false;
}
return this.roles.get(nodeId) === role;
}

grantRole(nodeId: string): void {
if (!this.roles.has(nodeId) || this.roles.get(nodeId) === Role.NONE) {
this.roles.set(nodeId, Role.GUEST);
}
}

revokeRole(nodeId: string): void {
if (this.roles.has(nodeId)) {
this.roles.set(nodeId, Role.NONE);
}
}
}
5 changes: 5 additions & 0 deletions packages/node/src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export enum Role {
NONE = 0,
GUEST = 1,
ADMIN = 2,
}
128 changes: 72 additions & 56 deletions packages/node/src/operations.ts
Original file line number Diff line number Diff line change
@@ -1,84 +1,100 @@
import { NetworkPb } from "@topology-foundation/network";
import { ObjectPb, type TopologyObject } from "@topology-foundation/object";
import { CRO, ObjectPb, type TopologyObject } from "@topology-foundation/object";
import {
topologyMessagesHandler,
topologyObjectChangesHandler,
topologyMessagesHandler,
topologyObjectChangesHandler,
} from "./handlers.js";
import type { TopologyNode } from "./index.js";

/* Object operations */
enum OPERATIONS {
/* Create a new CRO */
CREATE = 0,
/* Update operation on a CRO */
UPDATE = 1,
/* Create a new CRO */
CREATE = 0,
/* Update operation on a CRO */
UPDATE = 1,

/* Subscribe to a PubSub group (either CRO or custom) */
SUBSCRIBE = 2,
/* Unsubscribe from a PubSub group */
UNSUBSCRIBE = 3,
/* Actively send the CRO RIBLT to a random peer */
SYNC = 4,
/* Subscribe to a PubSub group (either CRO or custom) */
SUBSCRIBE = 2,
/* Unsubscribe from a PubSub group */
UNSUBSCRIBE = 3,
/* Actively send the CRO RIBLT to a random peer */
SYNC = 4,
/* Grant permission guest to another node */
GRANT = 5,
/* Revoke permission guest from another node */
REVOKE = 6,
}

export function createObject(node: TopologyNode, object: TopologyObject) {
node.objectStore.put(object.id, object);
object.subscribe((obj, originFn, vertices) =>
topologyObjectChangesHandler(node, obj, originFn, vertices),
);
node.objectStore.put(object.id, object);
object.subscribe((obj, originFn, vertices) =>
topologyObjectChangesHandler(node, obj, originFn, vertices)
);
}

/* data: { id: string } */
export async function subscribeObject(node: TopologyNode, objectId: string) {
node.networkNode.subscribe(objectId);
node.networkNode.addGroupMessageHandler(objectId, async (e) =>
topologyMessagesHandler(node, undefined, e.detail.msg.data),
);
node.networkNode.subscribe(objectId);
node.networkNode.addGroupMessageHandler(objectId, async (e) =>
topologyMessagesHandler(node, undefined, e.detail.msg.data)
);
}

export function unsubscribeObject(
node: TopologyNode,
objectId: string,
purge?: boolean,
node: TopologyNode,
objectId: string,
purge?: boolean
) {
node.networkNode.unsubscribe(objectId);
if (purge) node.objectStore.remove(objectId);
node.networkNode.unsubscribe(objectId);
if (purge) node.objectStore.remove(objectId);
}

/*
data: { vertex_hashes: string[] }
*/
export async function syncObject(
node: TopologyNode,
objectId: string,
peerId?: string,
node: TopologyNode,
objectId: string,
peerId?: string
) {
const object: TopologyObject | undefined = node.objectStore.get(objectId);
if (!object) {
console.error("topology::node::syncObject", "Object not found");
return;
}
const data = NetworkPb.Sync.create({
objectId,
vertexHashes: object.vertices.map((v) => v.hash),
});
const message = NetworkPb.Message.create({
sender: node.networkNode.peerId,
type: NetworkPb.Message_MessageType.SYNC,
data: NetworkPb.Sync.encode(data).finish(),
});
const object: TopologyObject | undefined = node.objectStore.get(objectId);
if (!object) {
console.error("topology::node::syncObject", "Object not found");
return;
}
const data = NetworkPb.Sync.create({
objectId,
vertexHashes: object.vertices.map((v) => v.hash),
});
const message = NetworkPb.Message.create({
sender: node.networkNode.peerId,
type: NetworkPb.Message_MessageType.SYNC,
data: NetworkPb.Sync.encode(data).finish(),
});

if (!peerId) {
await node.networkNode.sendGroupMessageRandomPeer(
objectId,
["/topology/message/0.0.1"],
message,
);
} else {
await node.networkNode.sendMessage(
peerId,
["/topology/message/0.0.1"],
message,
);
}
if (!peerId) {
await node.networkNode.sendGroupMessageRandomPeer(
objectId,
["/topology/message/0.0.1"],
message
);
} else {
await node.networkNode.sendMessage(
peerId,
["/topology/message/0.0.1"],
message
);
}
}

export async function grantPermission(
node: TopologyNode,
objectId: string,
peerId: string
) {
const object: TopologyObject | undefined = node.objectStore.get(objectId);
if (!object) {
console.error("topology::node::syncObject", "Object not found");
return;
}
}
2 changes: 1 addition & 1 deletion packages/object/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import * as ObjectPb from "./proto/topology/object/object_pb.js";

export * as ObjectPb from "./proto/topology/object/object_pb.js";
export * from "./hashgraph/index.js";

export interface CRO {
operations: string[];
semanticsType: SemanticsType;
resolveConflicts: (vertices: Vertex[]) => ResolveConflictsType;
mergeCallback: (operations: Operation[]) => void;
hasRole: (nodeId: string, role: number) => boolean;
}

export type TopologyObjectCallback = (
Expand Down

0 comments on commit 013ecc7

Please sign in to comment.