Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ets:update_counter #1406

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Added the ability to run beams from the CLI for Generic Unix platform (it was already possible with nodejs and emscripten).
- Added support for 'erlang:--/2'.
- Added support for list insertion in 'ets:insert/2'.
- Added support for list insertion in 'ets:update_counter/3' and 'ets:update_counter/4'.

### Fixed

Expand Down
44 changes: 42 additions & 2 deletions libs/estdlib/src/ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
insert/2,
lookup/2,
lookup_element/3,
delete/2
delete/2,
update_counter/3,
update_counter/4
]).

-export_type([
Expand Down Expand Up @@ -63,7 +65,7 @@ new(_Name, _Options) ->
%% @doc Insert an entry into an ets table.
%% @end
%%-----------------------------------------------------------------------------
-spec insert(Table :: table(), Entry :: tuple()) -> true.
-spec insert(Table :: table(), Entry :: tuple() | [tuple()]) -> true.
insert(_Table, _Entry) ->
erlang:nif_error(undefined).

Expand Down Expand Up @@ -101,3 +103,41 @@ lookup_element(_Table, _Key, _Pos) ->
-spec delete(Table :: table(), Key :: term()) -> true.
delete(_Table, _Key) ->
erlang:nif_error(undefined).

%%-----------------------------------------------------------------------------
%% @param Table a reference to the ets table
%% @param Key the key used to look up the entry expecting to contain a tuple of integers or a single integer
%% @param Params the increment value or a tuple {Pos, Increment} or {Pos, Increment, Treshold, SetValue},
%% where Pos is an integer (1-based index) specifying the position in the tuple to increment. Value is clamped to SetValue if it exceeds Threshold after update.
%% @returns the updated element's value after performing the increment, or the default value if applicable
%% @doc Updates a counter value at Key in the table. If Params is a single integer, it increments the direct integer value at Key or the first integer in a tuple. If Params is a tuple {Pos, Increment}, it increments the integer at the specified position Pos in the tuple stored at Key.
%% @end
%%-----------------------------------------------------------------------------
-spec update_counter(
Table :: table(),
Key :: term(),
Params ::
integer() | {pos_integer(), integer()} | {pos_integer(), integer(), integer(), integer()}
) -> integer().
update_counter(_Table, _Key, _Params) ->
erlang:nif_error(undefined).

%%-----------------------------------------------------------------------------
%% @param Table a reference to the ets table
%% @param Key the key used to look up the entry expecting to contain a tuple of integers or a single integer
%% @param Params the increment value or a tuple {Pos, Increment} or {Pos, Increment, Treshold, SetValue},
%% where Pos is an integer (1-based index) specifying the position in the tuple to increment. If after incrementation value exceeds the Treshold, it is set to SetValue.
%% @param Default the default value used if the entry at Key doesn't exist or doesn't contain a valid tuple with a sufficient size or integer at Pos
%% @returns the updated element's value after performing the increment, or the default value if applicable
%% @doc Updates a counter value at Key in the table. If Params is a single integer, it increments the direct integer value at Key or the first integer in a tuple. If Params is a tuple {Pos, Increment}, it increments the integer at the specified position Pos in the tuple stored at Key. If the needed element does not exist, uses Default value as a fallback.
%% @end
%%-----------------------------------------------------------------------------
-spec update_counter(
Table :: table(),
Key :: term(),
Params ::
integer() | {pos_integer(), integer()} | {pos_integer(), integer(), integer(), integer()},
Default :: integer()
) -> integer().
update_counter(_Table, _Key, _Params, _Default) ->
erlang:nif_error(undefined).
209 changes: 187 additions & 22 deletions src/libAtomVM/ets.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "ets_hashtable.h"
#include "list.h"
#include "memory.h"
#include "overflow_helpers.h"
#include "term.h"

#ifndef AVM_NO_SMP
Expand Down Expand Up @@ -252,15 +253,9 @@ static void ets_delete_all_tables(struct Ets *ets, GlobalContext *global)
ets_delete_tables_internal(ets, true_pred, NULL, global);
}

EtsErrorCode ets_insert(term ref, term entry, Context *ctx)
static EtsErrorCode ets_table_insert(struct EtsTable *ets_table, term entry, Context *ctx)
{
struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessWrite) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessWrite);
if (ets_table == NULL) {
return EtsTableNotFound;
}

if (ets_table->access_type != EtsAccessPublic && ets_table->owner_process_id != ctx->process_id) {
SMP_UNLOCK(ets_table);
return EtsPermissionDenied;
}

Expand All @@ -271,39 +266,79 @@ EtsErrorCode ets_insert(term ref, term entry, Context *ctx)

Heap *heap = malloc(sizeof(Heap));
if (IS_NULL_PTR(heap)) {
SMP_UNLOCK(ets_table);
return EtsAllocationFailure;
}
size_t size = (size_t) memory_estimate_usage(entry);
if (memory_init_heap(heap, size) != MEMORY_GC_OK) {
free(heap);
SMP_UNLOCK(ets_table);
return EtsAllocationFailure;
}

term new_entry = memory_copy_term_tree(heap, entry);
term key = term_get_tuple_element(new_entry, (int) ets_table->keypos);

EtsErrorCode ret = EtsOk;
EtsErrorCode result = EtsOk;
EtsHashtableErrorCode res = ets_hashtable_insert(ets_table->hashtable, key, new_entry, EtsHashtableAllowOverwrite, heap, ctx->global);
if (UNLIKELY(res != EtsHashtableOk)) {
ret = EtsAllocationFailure;
result = EtsAllocationFailure;
}

SMP_UNLOCK(ets_table);
return result;
}

return ret;
static EtsErrorCode ets_table_insert_list(struct EtsTable *ets_table, term list, Context *ctx)
{
term list1 = list;

while (term_is_nonempty_list(list1)) {
term tuple = term_get_list_head(list1);
list1 = term_get_list_tail(list1);
if (!term_is_tuple(tuple) || term_get_tuple_arity(tuple) < 1) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}
}
if (!term_is_nil(list1)) {
return EtsBadEntry;
}

while (term_is_nonempty_list(list)) {
term tuple = term_get_list_head(list);
EtsErrorCode result = ets_table_insert(ets_table, tuple, ctx);
if (UNLIKELY(result != EtsOk)) {
AVM_ABORT(); // Abort because operation might not be atomic.
}

list = term_get_list_tail(list);
}

return EtsOk;
}

EtsErrorCode ets_lookup(term ref, term key, term *ret, Context *ctx)
EtsErrorCode ets_insert(term name_or_ref, term entry, Context *ctx)
{
struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessRead) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessRead);
struct EtsTable *ets_table = term_is_atom(name_or_ref) ? ets_get_table_by_name(&ctx->global->ets, name_or_ref, TableAccessWrite) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(name_or_ref), TableAccessWrite);
if (ets_table == NULL) {
return EtsTableNotFound;
}
EtsErrorCode result;

if (term_is_tuple(entry) && term_get_tuple_arity(entry) > 0) {
result = ets_table_insert(ets_table, entry, ctx);
} else if (term_is_list(entry)) {
result = ets_table_insert_list(ets_table, entry, ctx);
} else {
result = EtsBadEntry;
}

SMP_UNLOCK(ets_table);

return result;
}

static EtsErrorCode ets_table_lookup(struct EtsTable *ets_table, term key, term *ret, Context *ctx)
{
if (ets_table->access_type == EtsAccessPrivate && ets_table->owner_process_id != ctx->process_id) {
SMP_UNLOCK(ets_table);
return EtsPermissionDenied;
}

Expand All @@ -316,24 +351,35 @@ EtsErrorCode ets_lookup(term ref, term key, term *ret, Context *ctx)
size_t size = (size_t) memory_estimate_usage(res);
// allocate [object]
if (UNLIKELY(memory_ensure_free_opt(ctx, size + CONS_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
SMP_UNLOCK(ets_table);
return EtsAllocationFailure;
}
term new_res = memory_copy_term_tree(&ctx->heap, res);
*ret = term_list_prepend(new_res, term_nil(), &ctx->heap);
}
SMP_UNLOCK(ets_table);

return EtsOk;
}

EtsErrorCode ets_lookup_element(term ref, term key, size_t pos, term *ret, Context *ctx)
EtsErrorCode ets_lookup(term name_or_ref, term key, term *ret, Context *ctx)
{
struct EtsTable *ets_table = term_is_atom(name_or_ref) ? ets_get_table_by_name(&ctx->global->ets, name_or_ref, TableAccessRead) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(name_or_ref), TableAccessRead);
if (ets_table == NULL) {
return EtsTableNotFound;
}

EtsErrorCode result = ets_table_lookup(ets_table, key, ret, ctx);
SMP_UNLOCK(ets_table);

return result;
}

EtsErrorCode ets_lookup_element(term name_or_ref, term key, size_t pos, term *ret, Context *ctx)
{
if (UNLIKELY(pos == 0)) {
return EtsBadPosition;
}

struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessRead) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessRead);
struct EtsTable *ets_table = term_is_atom(name_or_ref) ? ets_get_table_by_name(&ctx->global->ets, name_or_ref, TableAccessRead) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(name_or_ref), TableAccessRead);
if (ets_table == NULL) {
return EtsTableNotFound;
}
Expand Down Expand Up @@ -368,9 +414,9 @@ EtsErrorCode ets_lookup_element(term ref, term key, size_t pos, term *ret, Conte
return EtsOk;
}

EtsErrorCode ets_delete(term ref, term key, term *ret, Context *ctx)
EtsErrorCode ets_delete(term name_or_ref, term key, term *ret, Context *ctx)
{
struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessRead) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessRead);
struct EtsTable *ets_table = term_is_atom(name_or_ref) ? ets_get_table_by_name(&ctx->global->ets, name_or_ref, TableAccessRead) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(name_or_ref), TableAccessRead);
if (ets_table == NULL) {
return EtsTableNotFound;
}
Expand All @@ -388,3 +434,122 @@ EtsErrorCode ets_delete(term ref, term key, term *ret, Context *ctx)

return EtsOk;
}

static bool operation_to_tuple4(term operation, term *position, term *increment, term *threshold, term *set_value)
{
if (term_is_integer(operation)) {
*increment = operation;
*position = term_from_int(2);
*threshold = term_invalid_term();
*set_value = term_invalid_term();
return true;
}

if (!term_is_tuple(operation)) {
return false;
}
int n = term_get_tuple_arity(operation);
if (n != 2 && n != 4) {
return false;
}

term pos = term_get_tuple_element(operation, 0);
term incr = term_get_tuple_element(operation, 1);
if (!term_is_integer(pos) || !term_is_integer(incr)) {
return false;
}

if (n == 2) {
*position = pos;
*increment = incr;
*threshold = term_invalid_term();
*set_value = term_invalid_term();
return true;
}

term tresh = term_get_tuple_element(operation, 2);
term set_val = term_get_tuple_element(operation, 3);
if (!term_is_integer(tresh) || !term_is_integer(set_val)) {
return false;
}

*position = pos;
*increment = incr;
*threshold = tresh;
*set_value = set_val;
return true;
}

EtsErrorCode ets_update_counter(term ref, term key, term operation, term default_value, term *ret, Context *ctx)
{
struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessWrite) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessWrite);
if (IS_NULL_PTR(ets_table)) {
return EtsTableNotFound;
}

term list = term_invalid_term();
EtsErrorCode result = ets_table_lookup(ets_table, key, &list, ctx);
if (result != EtsOk) {
SMP_UNLOCK(ets_table);
return result;
}

term to_insert;
if (term_is_nil(list)) {
if (term_is_invalid_term(default_value)) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}
to_insert = default_value;
} else {
to_insert = term_get_list_head(list);
}

if (!(term_is_tuple(to_insert))) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}
term position_term, increment_term, threshold_term, set_value_term;

if (!operation_to_tuple4(operation, &position_term, &increment_term, &threshold_term, &set_value_term)) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}
int arity = term_get_tuple_arity(to_insert);
int position = term_to_int(position_term) - 1;
if (arity <= position || position < 1) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}

term elem = term_get_tuple_element(to_insert, position);
if (!term_is_integer(elem)) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}
int increment = term_to_int(increment_term);
int elem_value;
if (BUILTIN_ADD_OVERFLOW_INT(increment, term_to_int(elem), &elem_value)) {
SMP_UNLOCK(ets_table);
return EtsOverlfow;
}
if (!term_is_invalid_term(threshold_term) && !term_is_invalid_term(set_value_term)) {
int threshold = term_to_int(threshold_term);
int set_value = term_to_int(set_value_term);

if (increment >= 0 && elem_value > threshold) {
elem_value = set_value;
} else if (increment < 0 && elem_value < threshold) {
elem_value = set_value;
}
}

elem = term_from_int(elem_value);
term_put_tuple_element(to_insert, position, elem);
EtsErrorCode insert_result = ets_table_insert(ets_table, to_insert, ctx);
if (insert_result == EtsOk) {
*ret = elem;
}
SMP_UNLOCK(ets_table);
return insert_result;
}
5 changes: 3 additions & 2 deletions src/libAtomVM/ets.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ typedef enum EtsErrorCode
EtsBadEntry,
EtsAllocationFailure,
EtsEntryNotFound,
EtsBadPosition
EtsBadPosition,
EtsOverlfow
} EtsErrorCode;
struct Ets
{
Expand All @@ -77,7 +78,7 @@ EtsErrorCode ets_insert(term ref, term entry, Context *ctx);
EtsErrorCode ets_lookup(term ref, term key, term *ret, Context *ctx);
EtsErrorCode ets_lookup_element(term ref, term key, size_t pos, term *ret, Context *ctx);
EtsErrorCode ets_delete(term ref, term key, term *ret, Context *ctx);

EtsErrorCode ets_update_counter(term ref, term key, term value, term pos, term *ret, Context *ctx);
#ifdef __cplusplus
}
#endif
Expand Down
Loading
Loading