Skip to content

Commit

Permalink
Merge pull request #1358 from pguyot/w44/encode_and_decode_local_pids
Browse files Browse the repository at this point in the history
Add support for encoded local pids in external terms

Fixes #1350

These changes are made under both the "Apache 2.0" and the "GNU Lesser General
Public License 2.1 or later" license terms (dual license).

SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
  • Loading branch information
bettio committed Dec 21, 2024
2 parents 1e6248f + 941b581 commit fae8f60
Show file tree
Hide file tree
Showing 11 changed files with 517 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `code:all_loaded/0` and `code:all_available/0`
- Added `erlang:split_binary/2`
- Added `inet:getaddr/2`
- Added support for external pids and encoded pids in external terms

## [0.6.6] - Unreleased

Expand Down
19 changes: 16 additions & 3 deletions src/libAtomVM/ets_hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ static uint32_t hash_float(term t, int32_t h, GlobalContext *global)
return h * LARGE_PRIME_FLOAT;
}

static uint32_t hash_pid(term t, int32_t h, GlobalContext *global)
static uint32_t hash_local_pid(term t, int32_t h, GlobalContext *global)
{
UNUSED(global);
uint32_t n = (uint32_t) term_to_local_process_id(t);
Expand All @@ -255,6 +255,17 @@ static uint32_t hash_pid(term t, int32_t h, GlobalContext *global)
return h * LARGE_PRIME_PID;
}

static uint32_t hash_external_pid(term t, int32_t h, GlobalContext *global)
{
UNUSED(global);
uint32_t n = (uint32_t) term_get_external_pid_process_id(t);
while (n) {
h = h * LARGE_PRIME_PID + (n & 0xFF);
n >>= 8;
}
return h * LARGE_PRIME_PID;
}

static uint32_t hash_reference(term t, int32_t h, GlobalContext *global)
{
UNUSED(global);
Expand Down Expand Up @@ -285,8 +296,10 @@ static uint32_t hash_term_incr(term t, int32_t h, GlobalContext *global)
return hash_integer(t, h, global);
} else if (term_is_float(t)) {
return hash_float(t, h, global);
} else if (term_is_pid(t)) {
return hash_pid(t, h, global);
} else if (term_is_local_pid(t)) {
return hash_local_pid(t, h, global);
} else if (term_is_external_pid(t)) {
return hash_external_pid(t, h, global);
} else if (term_is_reference(t)) {
return hash_reference(t, h, global);
} else if (term_is_binary(t)) {
Expand Down
84 changes: 84 additions & 0 deletions src/libAtomVM/externalterm.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
#include <stdlib.h>

#include "bitstring.h"
#include "defaultatoms.h"
#include "term.h"
#include "unicode.h"
#include "utils.h"

#define NEW_FLOAT_EXT 70
#define NEW_PID_EXT 88
#define SMALL_INTEGER_EXT 97
#define INTEGER_EXT 98
#define ATOM_EXT 100
#define PID_EXT 103
#define SMALL_TUPLE_EXT 104
#define LARGE_TUPLE_EXT 105
#define NIL_EXT 106
Expand Down Expand Up @@ -390,6 +394,33 @@ static int serialize_term(uint8_t *buf, term t, GlobalContext *glb)
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, mfa, glb);
}
return k;
} else if (term_is_local_pid(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = NEW_PID_EXT;
}
size_t k = 1;
term node_name = glb->node_name;
uint32_t creation = node_name == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node_name, glb);
if (!IS_NULL_PTR(buf)) {
WRITE_32_UNALIGNED(buf + k, term_to_local_process_id(t));
WRITE_32_UNALIGNED(buf + k + 4, 0); // serial is 0 for local pids
WRITE_32_UNALIGNED(buf + k + 8, creation);
}
return k + 12;
} else if (term_is_external_pid(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = NEW_PID_EXT;
}
size_t k = 1;
term node = term_get_external_node(t);
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node, glb);
if (!IS_NULL_PTR(buf)) {
WRITE_32_UNALIGNED(buf + k, term_get_external_pid_process_id(t));
WRITE_32_UNALIGNED(buf + k + 4, term_get_external_pid_serial(t));
WRITE_32_UNALIGNED(buf + k + 8, term_get_external_node_creation(t));
}
return k + 12;
} else {
fprintf(stderr, "Unknown external term type: %" TERM_U_FMT "\n", t);
AVM_ABORT();
Expand Down Expand Up @@ -659,6 +690,32 @@ static term parse_external_terms(const uint8_t *external_term_buf, size_t *eterm
return term_from_atom_index(global_atom_id);
}

case NEW_PID_EXT: {
size_t node_size;
term node = parse_external_terms(external_term_buf + 1, &node_size, copy, heap, glb);
if (UNLIKELY(!term_is_atom(node))) {
return term_invalid_term();
}
uint32_t number = READ_32_UNALIGNED(external_term_buf + node_size + 1);
uint32_t serial = READ_32_UNALIGNED(external_term_buf + node_size + 5);
uint32_t creation = READ_32_UNALIGNED(external_term_buf + node_size + 9);
*eterm_size = node_size + 13;
if (node != NONODE_AT_NOHOST_ATOM) {
term this_node = glb->node_name;
uint32_t this_creation = this_node == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
if (node == this_node && creation == this_creation) {
return term_from_local_process_id(number);
} else {
return term_make_external_process_id(node, number, serial, creation, heap);
}
} else {
if (UNLIKELY(serial != 0 || creation != 0)) {
return term_invalid_term();
}
return term_from_local_process_id(number);
}
}

default:
return term_invalid_term();
}
Expand Down Expand Up @@ -948,6 +1005,33 @@ static int calculate_heap_usage(const uint8_t *external_term_buf, size_t remaini
return 0;
}

case NEW_PID_EXT: {
if (UNLIKELY(remaining < 1)) {
return INVALID_TERM_SIZE;
}
remaining -= 1;
int buf_pos = 1;
size_t heap_size = EXTERNAL_PID_SIZE;
size_t node_size = 0;
int u = calculate_heap_usage(external_term_buf + buf_pos, remaining, &node_size, copy);
if (UNLIKELY(u == INVALID_TERM_SIZE)) {
return INVALID_TERM_SIZE;
}
if (external_term_buf[1] == SMALL_ATOM_UTF8_EXT) {
// Check if it's non-distributed node, in which case it's always a local pid
if (external_term_buf[2] == strlen("nonode@nohost") && memcmp(external_term_buf + 3, "nonode@nohost", strlen("nonode@nohost")) == 0) {
heap_size = 0;
}
// If this is our node, but we're distributed, we'll allocate more memory and may not use it.
// This way we're sure to not go out of bounds if distribution changes between now and when we deserialize
} else if (UNLIKELY(external_term_buf[1] != ATOM_EXT)) {
return INVALID_TERM_SIZE;
}
buf_pos += node_size;
*eterm_size = buf_pos + 12;
return heap_size + u;
}

default:
return INVALID_TERM_SIZE;
}
Expand Down
29 changes: 18 additions & 11 deletions src/libAtomVM/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ unsigned long memory_estimate_usage(term t)
} else if (term_is_nil(t)) {
t = temp_stack_pop(&temp_stack);

} else if (term_is_pid(t)) {
} else if (term_is_local_pid(t)) {
t = temp_stack_pop(&temp_stack);

} else if (term_is_nonempty_list(t)) {
Expand Down Expand Up @@ -587,19 +587,19 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
TRACE("Found NIL (%" TERM_X_FMT ")\n", t);
ptr++;

} else if (term_is_pid(t)) {
} else if (term_is_local_pid(t)) {
TRACE("Found PID (%" TERM_X_FMT ")\n", t);
ptr++;

} else if ((t & 0x3) == 0x0) {
TRACE("Found boxed header (%" TERM_X_FMT ")\n", t);

size_t arity = term_get_size_from_boxed_header(t);
switch (t & TERM_BOXED_TAG_MASK) {
case TERM_BOXED_TUPLE: {
int arity = term_get_size_from_boxed_header(t);
TRACE("- Boxed is tuple (%" TERM_X_FMT "), arity: %i\n", t, arity);
TRACE("- Boxed is tuple (%" TERM_X_FMT "), arity: %i\n", t, (int) arity);

for (int i = 1; i <= arity; i++) {
for (size_t i = 1; i <= arity; i++) {
TRACE("-- Elem: %" TERM_X_FMT "\n", ptr[i]);
ptr[i] = memory_shallow_copy_term(old_fragment, ptr[i], &new_heap, move);
}
Expand All @@ -620,13 +620,16 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
TRACE("- Found ref.\n");
break;

case TERM_BOXED_EXTERNAL_PID:
TRACE("- Found external pid.\n");
break;

case TERM_BOXED_FUN: {
int fun_size = term_get_size_from_boxed_header(t);
TRACE("- Found fun, size: %i.\n", fun_size);
TRACE("- Found fun, size: %i.\n", (int) arity);

// first term is the boxed header, followed by module and fun index.

for (int i = 3; i <= fun_size; i++) {
for (size_t i = 3; i <= arity; i++) {
TRACE("-- Frozen: %" TERM_X_FMT "\n", ptr[i]);
ptr[i] = memory_shallow_copy_term(old_fragment, ptr[i], &new_heap, move);
}
Expand Down Expand Up @@ -658,7 +661,7 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co

case TERM_BOXED_MAP: {
TRACE("- Found map.\n");
size_t map_size = term_get_size_from_boxed_header(t) - 1;
size_t map_size = arity - 1;
size_t keys_offset = term_get_map_keys_offset();
size_t value_offset = term_get_map_value_offset();
TRACE("-- Map keys: %" TERM_X_FMT "\n", ptr[keys_offset]);
Expand All @@ -674,7 +677,7 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
AVM_ABORT();
}

ptr += term_get_size_from_boxed_header(t) + 1;
ptr += arity + 1;

} else if (term_is_nonempty_list(t)) {
TRACE("Found nonempty list (%p)\n", (void *) t);
Expand Down Expand Up @@ -740,6 +743,10 @@ static void memory_scan_and_rewrite(size_t count, term *terms, const term *old_s
ptr += term_get_size_from_boxed_header(t);
break;

case TERM_BOXED_EXTERNAL_PID:
ptr += term_get_size_from_boxed_header(t);
break;

case TERM_BOXED_FUN:
// Skip header and module and process next terms
ptr++;
Expand Down Expand Up @@ -810,7 +817,7 @@ HOT_FUNC static term memory_shallow_copy_term(HeapFragment *old_fragment, term t
} else if (term_is_nil(t)) {
return t;

} else if (term_is_pid(t)) {
} else if (term_is_local_pid(t)) {
return t;

} else if (term_is_cp(t)) {
Expand Down
28 changes: 14 additions & 14 deletions src/libAtomVM/nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[])
term reg_name_term = argv[0];
VALIDATE_VALUE(reg_name_term, term_is_atom);
term pid_or_port_term = argv[1];
VALIDATE_VALUE(pid_or_port_term, term_is_pid);
VALIDATE_VALUE(pid_or_port_term, term_is_local_pid);

int atom_index = term_to_atom_index(reg_name_term);
int32_t pid = term_to_local_process_id(pid_or_port_term);
Expand Down Expand Up @@ -1407,7 +1407,7 @@ static term nif_erlang_send_2(Context *ctx, int argc, term argv[])
term target = argv[0];
GlobalContext *glb = ctx->global;

if (term_is_pid(target)) {
if (term_is_local_pid(target)) {
int32_t local_process_id = term_to_local_process_id(target);

globalcontext_send_message(glb, local_process_id, argv[1]);
Expand Down Expand Up @@ -2748,7 +2748,7 @@ static term nif_erlang_process_flag(Context *ctx, int argc, term argv[])
flag = argv[1];
value = argv[2];

VALIDATE_VALUE(pid, term_is_pid);
VALIDATE_VALUE(pid, term_is_local_pid);
int local_process_id = term_to_local_process_id(pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
if (IS_NULL_PTR(target)) {
Expand Down Expand Up @@ -3225,7 +3225,7 @@ static term nif_binary_split(Context *ctx, int argc, term argv[])

if (num_segments == 1) {
// not found
if (UNLIKELY(memory_ensure_free_with_roots(ctx, 2, 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, 0), 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}

Expand Down Expand Up @@ -3477,11 +3477,11 @@ static term nif_erlang_pid_to_list(Context *ctx, int argc, term argv[])

term t = argv[0];
VALIDATE_VALUE(t, term_is_pid);
size_t max_len = term_is_external(t) ? EXTERNAL_PID_AS_CSTRING_LEN : LOCAL_PID_AS_CSTRING_LEN;

char buf[PID_AS_CSTRING_LEN];
int str_len = term_snprint(buf, PID_AS_CSTRING_LEN, t, ctx->global);
char buf[max_len];
int str_len = term_snprint(buf, max_len, t, ctx->global);
if (UNLIKELY(str_len < 0)) {
// TODO: change to internal error or something like that
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}

Expand Down Expand Up @@ -3593,7 +3593,7 @@ static term nif_erlang_garbage_collect(Context *ctx, int argc, term argv[])
} else {
// argc == 1
term t = argv[0];
VALIDATE_VALUE(t, term_is_pid);
VALIDATE_VALUE(t, term_is_local_pid);

int local_id = term_to_local_process_id(t);
Context *target = globalcontext_get_process_lock(ctx->global, local_id);
Expand Down Expand Up @@ -3636,7 +3636,7 @@ static term nif_erlang_exit(Context *ctx, int argc, term argv[])
RAISE(LOWERCASE_EXIT_ATOM, reason);
} else {
term target_process = argv[0];
VALIDATE_VALUE(target_process, term_is_pid);
VALIDATE_VALUE(target_process, term_is_local_pid);
term reason = argv[1];
GlobalContext *glb = ctx->global;
Context *target = globalcontext_get_process_lock(glb, term_to_local_process_id(target_process));
Expand Down Expand Up @@ -3749,7 +3749,7 @@ static term nif_erlang_monitor(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}

VALIDATE_VALUE(target_pid, term_is_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3817,7 +3817,7 @@ static term nif_erlang_link(Context *ctx, int argc, term argv[])

term target_pid = argv[0];

VALIDATE_VALUE(target_pid, term_is_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3848,7 +3848,7 @@ static term nif_erlang_unlink(Context *ctx, int argc, term argv[])

term target_pid = argv[0];

VALIDATE_VALUE(target_pid, term_is_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3879,8 +3879,8 @@ static term nif_erlang_group_leader(Context *ctx, int argc, term argv[])
} else {
term leader = argv[0];
term pid = argv[1];
VALIDATE_VALUE(pid, term_is_pid);
VALIDATE_VALUE(leader, term_is_pid);
VALIDATE_VALUE(pid, term_is_local_pid);
VALIDATE_VALUE(leader, term_is_local_pid);

int local_process_id = term_to_local_process_id(pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down
4 changes: 2 additions & 2 deletions src/libAtomVM/opcodesswitch.h
Original file line number Diff line number Diff line change
Expand Up @@ -2404,7 +2404,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
#ifdef IMPL_EXECUTE_LOOP
term recipient_term = x_regs[0];
int local_process_id;
if (term_is_pid(recipient_term)) {
if (term_is_local_pid(recipient_term)) {
local_process_id = term_to_local_process_id(recipient_term);
} else if (term_is_atom(recipient_term)) {
local_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(recipient_term));
Expand Down Expand Up @@ -3004,7 +3004,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
#ifdef IMPL_EXECUTE_LOOP
TRACE("is_port/2, label=%i, arg1=%lx\n", label, arg1);

if (term_is_pid(arg1)) {
if (term_is_local_pid(arg1)) {
int local_process_id = term_to_local_process_id(arg1);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
bool is_port_driver = false;
Expand Down
2 changes: 1 addition & 1 deletion src/libAtomVM/posix_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ static term nif_atomvm_posix_write(Context *ctx, int argc, term argv[])
static term nif_atomvm_posix_select(Context *ctx, term argv[], enum ErlNifSelectFlags mode)
{
term process_pid_term = argv[1];
VALIDATE_VALUE(process_pid_term, term_is_pid);
VALIDATE_VALUE(process_pid_term, term_is_local_pid);
int32_t process_pid = term_to_local_process_id(process_pid_term);
term select_ref_term = argv[2];
if (select_ref_term != UNDEFINED_ATOM) {
Expand Down
Loading

0 comments on commit fae8f60

Please sign in to comment.