Skip to content

Commit

Permalink
Add support for enif_monitors
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Guyot <[email protected]>
  • Loading branch information
pguyot committed Jun 11, 2023
1 parent 139ba21 commit 513d03c
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 96 deletions.
142 changes: 94 additions & 48 deletions src/libAtomVM/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <math.h>

#include "dictionary.h"
#include "erl_nif.h"
#include "erl_nif_priv.h"
#include "globalcontext.h"
#include "list.h"
#include "mailbox.h"
Expand Down Expand Up @@ -268,89 +270,133 @@ bool context_get_process_info(Context *ctx, term *out, term atom_key)

static void context_monitors_handle_terminate(Context *ctx)
{
GlobalContext *glb = ctx->global;
struct ListHead *item;
struct ListHead *tmp;
MUTABLE_LIST_FOR_EACH (item, tmp, &ctx->monitors_head) {
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
int local_process_id = term_to_local_process_id(monitor->monitor_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
if (IS_NULL_PTR(target)) {
// TODO: we should scan for existing monitors when a context is destroyed
// otherwise memory might be wasted for long living processes
free(monitor);
continue;
}
if (monitor->ref_ticks && term_is_boxed(monitor->monitor_obj)) {
// Resource monitor
struct ResourceMonitor *resource_monitor = (struct ResourceMonitor *) monitor;
void *resource = term_to_term_ptr(monitor->monitor_obj);
struct RefcBinary *refc = refc_binary_from_data(resource);
ErlNifEnv env;
erl_nif_env_partial_init_from_globalcontext(&env, glb);
refc->resource_type->down(&env, resource, &ctx->process_id, &monitor->ref_ticks);

struct ListHead *processes_table_list = synclist_wrlock(&glb->processes_table);
UNUSED(processes_table_list);
list_remove(&resource_monitor->resource_list_head);
synclist_unlock(&glb->processes_table);
} else {
int local_process_id = term_to_local_process_id(monitor->monitor_obj);
Context *target = globalcontext_get_process_lock(glb, local_process_id);
if (IS_NULL_PTR(target)) {
// TODO: we should scan for existing monitors when a context is destroyed
// otherwise memory might be wasted for long living processes
free(monitor);
continue;
}

if (monitor->linked && (ctx->exit_reason != NORMAL_ATOM || target->trap_exit)) {
if (target->trap_exit) {
if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) {
if (monitor->ref_ticks == 0 && (ctx->exit_reason != NORMAL_ATOM || target->trap_exit)) {
if (target->trap_exit) {
if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) {
// TODO: handle out of memory here
fprintf(stderr, "Cannot handle out of memory.\n");
globalcontext_get_process_unlock(glb, target);
AVM_ABORT();
}

// Prepare the message on ctx's heap which will be freed afterwards.
term info_tuple = term_alloc_tuple(3, &ctx->heap);
term_put_tuple_element(info_tuple, 0, EXIT_ATOM);
term_put_tuple_element(info_tuple, 1, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 2, ctx->exit_reason);
mailbox_send(target, info_tuple);
} else {
mailbox_send_term_signal(target, KillSignal, ctx->exit_reason);
}
} else if (monitor->ref_ticks) {
int required_terms = REF_SIZE + TUPLE_SIZE(5);
if (UNLIKELY(memory_ensure_free(ctx, required_terms) != MEMORY_GC_OK)) {
// TODO: handle out of memory here
fprintf(stderr, "Cannot handle out of memory.\n");
globalcontext_get_process_unlock(ctx->global, target);
globalcontext_get_process_unlock(glb, target);
AVM_ABORT();
}

// Prepare the message on ctx's heap which will be freed afterwards.
term info_tuple = term_alloc_tuple(3, &ctx->heap);
term_put_tuple_element(info_tuple, 0, EXIT_ATOM);
term_put_tuple_element(info_tuple, 1, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 2, ctx->exit_reason);
mailbox_send(target, info_tuple);
} else {
mailbox_send_term_signal(target, KillSignal, ctx->exit_reason);
}
} else if (!monitor->linked) {
int required_terms = REF_SIZE + TUPLE_SIZE(5);
if (UNLIKELY(memory_ensure_free(ctx, required_terms) != MEMORY_GC_OK)) {
// TODO: handle out of memory here
fprintf(stderr, "Cannot handle out of memory.\n");
globalcontext_get_process_unlock(ctx->global, target);
AVM_ABORT();
}

// Prepare the message on ctx's heap which will be freed afterwards.
term ref = term_from_ref_ticks(monitor->ref_ticks, &ctx->heap);
term ref = term_from_ref_ticks(monitor->ref_ticks, &ctx->heap);

term info_tuple = term_alloc_tuple(5, &ctx->heap);
term_put_tuple_element(info_tuple, 0, DOWN_ATOM);
term_put_tuple_element(info_tuple, 1, ref);
if (ctx->native_handler != NULL) {
term_put_tuple_element(info_tuple, 2, PORT_ATOM);
} else {
term_put_tuple_element(info_tuple, 2, PROCESS_ATOM);
}
term_put_tuple_element(info_tuple, 3, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 4, ctx->exit_reason);

term info_tuple = term_alloc_tuple(5, &ctx->heap);
term_put_tuple_element(info_tuple, 0, DOWN_ATOM);
term_put_tuple_element(info_tuple, 1, ref);
if (ctx->native_handler != NULL) {
term_put_tuple_element(info_tuple, 2, PORT_ATOM);
} else {
term_put_tuple_element(info_tuple, 2, PROCESS_ATOM);
mailbox_send(target, info_tuple);
}
term_put_tuple_element(info_tuple, 3, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 4, ctx->exit_reason);

mailbox_send(target, info_tuple);
globalcontext_get_process_unlock(glb, target);
}
globalcontext_get_process_unlock(ctx->global, target);
free(monitor);
}
}

uint64_t context_monitor(Context *ctx, term monitor_pid, bool linked)
int context_link(Context *ctx, term link_pid)
{
struct Monitor *monitor = malloc(sizeof(struct Monitor));
if (IS_NULL_PTR(monitor)) {
return -1;
}
monitor->monitor_obj = link_pid;
monitor->ref_ticks = 0;
list_append(&ctx->monitors_head, &monitor->monitor_list_head);

return 0;
}

uint64_t context_monitor(Context *ctx, term monitor_pid)
{
uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global);

struct Monitor *monitor = malloc(sizeof(struct Monitor));
if (IS_NULL_PTR(monitor)) {
return 0;
}
monitor->monitor_pid = monitor_pid;
monitor->monitor_obj = monitor_pid;
monitor->ref_ticks = ref_ticks;
monitor->linked = linked;
list_append(&ctx->monitors_head, &monitor->monitor_list_head);

return ref_ticks;
}

void context_demonitor(Context *ctx, term monitor_pid, bool linked)
struct ResourceMonitor *context_resource_monitor(Context *ctx, void *resource)
{
uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global);

struct ResourceMonitor *monitor = malloc(sizeof(struct ResourceMonitor));
if (IS_NULL_PTR(monitor)) {
return NULL;
}
// Not really boxed, but sufficient to distinguish from pids
monitor->base.monitor_obj = ((term) resource) | TERM_BOXED_VALUE_TAG;
monitor->base.ref_ticks = ref_ticks;
list_append(&ctx->monitors_head, &monitor->base.monitor_list_head);

return monitor;
}

void context_unlink(Context *ctx, term link_pid)
{
struct ListHead *item;
LIST_FOR_EACH (item, &ctx->monitors_head) {
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
if ((monitor->monitor_pid == monitor_pid) && (monitor->linked == linked)) {
if ((monitor->monitor_obj == link_pid) && (monitor->ref_ticks == 0)) {
list_remove(&monitor->monitor_list_head);
free(monitor);
return;
Expand Down
62 changes: 53 additions & 9 deletions src/libAtomVM/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,23 @@ struct Context
typedef struct Context Context;
#endif

/**
* @brief A regular monitor or a half link.
*/
struct Monitor
{
struct ListHead monitor_list_head;
uint64_t ref_ticks; // 0 for links
term monitor_obj;
};

term monitor_pid;
uint64_t ref_ticks;

// this might be replaced with a handler function, this might be useful as a replacement
// to leader process field or for any other purposes.
// TODO: we might save useful bytes by assuming that ref_links == 0 means linked
bool linked : 1;
/**
* @brief A resource monitor.
*/
struct ResourceMonitor
{
struct Monitor base;
struct ListHead resource_list_head;
};

/**
Expand Down Expand Up @@ -379,8 +385,46 @@ void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool
*/
bool context_get_process_info(Context *ctx, term *out, term atom_key);

uint64_t context_monitor(Context *ctx, term monitor_pid, bool linked);
void context_demonitor(Context *ctx, term monitor_pid, bool linked);
/**
* @brief Half-link process to another process
* @details Caller must hold the global process lock. This creates one half of
* the link.
*
* @param ctx context to link
* @param link_pid process to link ctx to
* @return 0 on success
*/
int context_link(Context *ctx, term monitor_pid);

/**
* @brief Create a monitor on a process.
* @details Caller must hold the global process lock.
*
* @param ctx context to monitor
* @param monitor_pid monitoring process
* @return the ref ticks
*/
uint64_t context_monitor(Context *ctx, term monitor_pid);

/**
* @brief Create a resource monitor on a process.
* @details Caller must hold the global process lock. The returned resource
* monitor is not added to the monitors list on the resource type.
*
* @param ctx context to monitor
* @param resource resource object
* @return the resource monitor
*/
struct ResourceMonitor *context_resource_monitor(Context *ctx, void *resource);
/**
* @brief Remove a half-link from a process.
* @details Caller must hold the global process lock. This removes one half of
* the link.
*
* @param ctx context to monitor
* @param link_id process to unlink
*/
void context_unlink(Context *ctx, term monitor_pid);

#ifdef __cplusplus
}
Expand Down
43 changes: 43 additions & 0 deletions src/libAtomVM/erl_nif.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ typedef int32_t ErlNifPid;
*/
typedef struct ResourceType ErlNifResourceType;

/**
* @brief Opaque monitor type
*/
typedef uint64_t ErlNifMonitor;

/**
* @brief Selectable event.
*/
Expand All @@ -68,6 +73,11 @@ typedef void ErlNifResourceDtor(ErlNifEnv *caller_env, void *obj);
*/
typedef void ErlNifResourceStop(ErlNifEnv *caller_env, void *obj, ErlNifEvent event, int is_direct_call);

/**
* @brief Resource monitor callback
*/
typedef void ErlNifResourceDown(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNifMonitor *mon);

/**
* @brief Resource callbacks.
* @details Members should be set to 0, 1 or 2 depending on provided callbacks.
Expand All @@ -78,6 +88,7 @@ typedef struct
int members;
ErlNifResourceDtor *dtor;
ErlNifResourceStop *stop;
ErlNifResourceDown *down;
} ErlNifResourceTypeInit;

/**
Expand Down Expand Up @@ -202,6 +213,38 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj);
*/
int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref);

/**
* @brief Monitor a process by using a resource object.
* @details The monitor is automatically removed after being triggered or if the
* associated resource is deallocated.
*
* @param env current environment
* @param obj resource to use for monitor
* @param target_pid process to monitor
* @param mon on output, monitor object (can be NULL)
* @return 0 on success, <0 if no down callback is provided with resource (badarg), >0 if the process is no longer alive
*/
int enif_monitor_process(ErlNifEnv *env, void *obj, const ErlNifPid *target_pid, ErlNifMonitor *mon);

/**
* @brief Unmonitor a process
*
* @param env current environment
* @param obj resource used by monitor
* @param mon monitor
* @return 0 on success
*/
int enif_demonitor_process(ErlNifEnv *caller_env, void *obj, const ErlNifMonitor *mon);

/**
* @brief compare two monitors
*
* @param monitor1 first monitor
* @param monitor2 second monitor
* @return 0 if equals, < 0 if `monitor1` < `monitor2`, > 0 if `monitor1` > `monitor2`.
*/
int enif_compare_monitors(const ErlNifMonitor *monitor1, const ErlNifMonitor *monitor2);

#ifdef __cplusplus
}
#endif
Expand Down
10 changes: 9 additions & 1 deletion src/libAtomVM/globalcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,16 @@ Module *globalcontext_get_module_by_index(GlobalContext *global, int index);
*/
Module *globalcontext_get_module(GlobalContext *global, AtomString module_name_atom);

/**
* @brief remove a monitor
*
* @details iterate on the list of all processes and then on each monitor
* to find a given monitor, and remove it
* @param global the global context
* @param ref_ticks the reference to the monitor
* @return true if the monitor was found
*/
bool globalcontext_demonitor(GlobalContext *global, uint64_t ref_ticks);
void globalcontext_unlink(GlobalContext *global, term pid);

#ifndef __cplusplus
static inline uint64_t globalcontext_get_ref_ticks(GlobalContext *global)
Expand Down
Loading

0 comments on commit 513d03c

Please sign in to comment.