Skip to content

Commit

Permalink
Merge pull request #1714 from BallAerospace/logging_optimizations
Browse files Browse the repository at this point in the history
Logging Optimizations
  • Loading branch information
ryanmelt authored Jun 25, 2022
2 parents 228241b + 09f3962 commit 406e47b
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 118 deletions.
28 changes: 14 additions & 14 deletions cosmos/ext/cosmos/ext/cosmos_io/cosmos_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ VALUE mCosmosIO = Qnil;
static ID id_method_read = 0;

/* Reads a length field and then return the String resulting from reading the
* number of bytes the length field indicates
*
* For example:
* io = StringIO.new
* # where io is "\x02\x01\x02\x03\x04...."
* result = io.read_length_bytes(1)
* # result will be "\x01x02" because the length field was given
* # to be 1 byte. We read 1 byte which is a 2. So we then read two
* # bytes and return.
*
* @param length_num_bytes [Integer] Number of bytes in the length field
* @return [String] A String of "length field" number of bytes
*/
* number of bytes the length field indicates
*
* For example:
* io = StringIO.new
* # where io is "\x02\x01\x02\x03\x04...."
* result = io.read_length_bytes(1)
* # result will be "\x01x02" because the length field was given
* # to be 1 byte. We read 1 byte which is a 2. So we then read two
* # bytes and return.
*
* @param length_num_bytes [Integer] Number of bytes in the length field
* @return [String] A String of "length field" number of bytes
*/
static VALUE read_length_bytes(int argc, VALUE *argv, VALUE self)
{
int length_num_bytes = 0;
Expand Down Expand Up @@ -134,7 +134,7 @@ static VALUE read_length_bytes(int argc, VALUE *argv, VALUE self)
}

/* Read String */
temp_string_length = UINT2NUM(string_length);
temp_string_length = UINT2NUM((unsigned int)string_length);
return_value = rb_funcall(self, id_method_read, 1, temp_string_length);
if (NIL_P(return_value) || (RSTRING_LEN(return_value) != string_length))
{
Expand Down
6 changes: 3 additions & 3 deletions cosmos/ext/cosmos/ext/packet/packet.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,23 @@ static VALUE packet_initialize(int argc, VALUE *argv, VALUE self)
packet_name = argv[1];
default_endianness = symbol_BIG_ENDIAN;
description = Qnil;
buffer = rb_str_new2("");
buffer = Qnil;
item_class = cPacketItem;
break;
case 3:
target_name = argv[0];
packet_name = argv[1];
default_endianness = argv[2];
description = Qnil;
buffer = rb_str_new2("");
buffer = Qnil;
item_class = cPacketItem;
break;
case 4:
target_name = argv[0];
packet_name = argv[1];
default_endianness = argv[2];
description = argv[3];
buffer = rb_str_new2("");
buffer = Qnil;
item_class = cPacketItem;
break;
case 5:
Expand Down
62 changes: 31 additions & 31 deletions cosmos/ext/cosmos/ext/structure/structure.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ static ID id_method_reverse = 0;
static ID id_method_Integer = 0;
static ID id_method_Float = 0;
static ID id_method_kind_of = 0;
static ID id_method_allocate_buffer_if_needed = 0;

static ID id_ivar_buffer = 0;
static ID id_ivar_bit_offset = 0;
Expand Down Expand Up @@ -520,7 +521,8 @@ static VALUE binary_accessor_read(VALUE self, VALUE param_bit_offset, VALUE para
{
string_length = upper_bound - lower_bound + 1;
string = malloc(string_length + 1);
if (string == NULL) {
if (string == NULL)
{
rb_raise(rb_eNoMemError, "malloc of %d returned NULL", string_length + 1);
}
memcpy(string, buffer + lower_bound, string_length);
Expand Down Expand Up @@ -577,7 +579,8 @@ static VALUE binary_accessor_read(VALUE self, VALUE param_bit_offset, VALUE para
string_length = ((bit_size - 1) / 8) + 1;
array_length = string_length + 4; /* Required number of bytes plus slack */
unsigned_char_array = (unsigned char *)malloc(array_length);
if (unsigned_char_array == NULL) {
if (unsigned_char_array == NULL)
{
rb_raise(rb_eNoMemError, "malloc of %d returned NULL", array_length);
}
read_bitfield(lower_bound, upper_bound, bit_offset, bit_size, given_bit_offset, given_bit_size, param_endianness, buffer, (int)buffer_length, unsigned_char_array);
Expand Down Expand Up @@ -692,7 +695,8 @@ static VALUE binary_accessor_read(VALUE self, VALUE param_bit_offset, VALUE para
string_length = ((bit_size - 1) / 8) + 1;
array_length = string_length + 4; /* Required number of bytes plus slack */
unsigned_char_array = (unsigned char *)malloc(array_length);
if (unsigned_char_array == NULL) {
if (unsigned_char_array == NULL)
{
rb_raise(rb_eNoMemError, "malloc of %d returned NULL", array_length);
}
read_bitfield(lower_bound, upper_bound, bit_offset, bit_size, given_bit_offset, given_bit_size, param_endianness, buffer, (int)buffer_length, unsigned_char_array);
Expand Down Expand Up @@ -1147,7 +1151,8 @@ static VALUE binary_accessor_write(VALUE self, VALUE value, VALUE param_bit_offs
string_length = ((bit_size - 1) / 8) + 1;
array_length = string_length + 4; /* Required number of bytes plus slack */
unsigned_char_array = (unsigned char *)malloc(array_length);
if (unsigned_char_array == NULL) {
if (unsigned_char_array == NULL)
{
rb_raise(rb_eNoMemError, "malloc of %d returned NULL", array_length);
}

Expand Down Expand Up @@ -1275,15 +1280,8 @@ static VALUE binary_accessor_write(VALUE self, VALUE value, VALUE param_bit_offs
*/
static int get_int_length(VALUE self)
{
volatile VALUE buffer = rb_ivar_get(self, id_ivar_buffer);
if (RTEST(buffer))
{
return (int)RSTRING_LEN(buffer);
}
else
{
return 0;
}
rb_funcall(self, id_method_allocate_buffer_if_needed, 0);
return (int)RSTRING_LEN(rb_ivar_get(self, id_ivar_buffer));
}

/*
Expand All @@ -1310,24 +1308,21 @@ static VALUE read_item_internal(VALUE self, VALUE item, VALUE buffer)
return Qnil;
}

if (RTEST(buffer))
if (!(RTEST(buffer)))
{
bit_offset = rb_ivar_get(item, id_ivar_bit_offset);
bit_size = rb_ivar_get(item, id_ivar_bit_size);
array_size = rb_ivar_get(item, id_ivar_array_size);
endianness = rb_ivar_get(item, id_ivar_endianness);
if (RTEST(array_size))
{
return rb_funcall(cBinaryAccessor, id_method_read_array, 6, bit_offset, bit_size, data_type, array_size, buffer, endianness);
}
else
{
return binary_accessor_read(cBinaryAccessor, bit_offset, bit_size, data_type, buffer, endianness);
}
buffer = rb_funcall(self, id_method_allocate_buffer_if_needed, 0);
}
bit_offset = rb_ivar_get(item, id_ivar_bit_offset);
bit_size = rb_ivar_get(item, id_ivar_bit_size);
array_size = rb_ivar_get(item, id_ivar_array_size);
endianness = rb_ivar_get(item, id_ivar_endianness);
if (RTEST(array_size))
{
return rb_funcall(cBinaryAccessor, id_method_read_array, 6, bit_offset, bit_size, data_type, array_size, buffer, endianness);
}
else
{
rb_raise(rb_eRuntimeError, "No buffer given to read_item");
return binary_accessor_read(cBinaryAccessor, bit_offset, bit_size, data_type, buffer, endianness);
}
}

Expand Down Expand Up @@ -1404,8 +1399,8 @@ static VALUE structure_item_spaceship(VALUE self, VALUE other_item)
if ((bit_offset == 0) && (other_bit_offset == 0))
{
/* Both bit_offsets are 0 so sort by bit_size
* This allows derived items with bit_size of 0 to be listed first
* Compare based on bit size */
* This allows derived items with bit_size of 0 to be listed first
* Compare based on bit size */
bit_size = FIX2INT(rb_ivar_get(self, id_ivar_bit_size));
other_bit_size = FIX2INT(rb_ivar_get(other_item, id_ivar_bit_size));
if (bit_size == other_bit_size)
Expand Down Expand Up @@ -1518,12 +1513,12 @@ static VALUE structure_initialize(int argc, VALUE *argv, VALUE self)
{
case 0:
default_endianness = HOST_ENDIANNESS;
buffer = rb_str_new2("");
buffer = Qnil;
item_class = cStructureItem;
break;
case 1:
default_endianness = argv[0];
buffer = rb_str_new2("");
buffer = Qnil;
item_class = cStructureItem;
break;
case 2:
Expand Down Expand Up @@ -1592,6 +1587,10 @@ static VALUE resize_buffer(VALUE self)
rb_str_concat(buffer, rb_str_times(ZERO_STRING, INT2FIX(defined_length - current_length)));
}
}
else
{
rb_funcall(self, id_method_allocate_buffer_if_needed, 0);
}

return self;
}
Expand All @@ -1618,6 +1617,7 @@ void Init_structure(void)
id_method_Integer = rb_intern("Integer");
id_method_Float = rb_intern("Float");
id_method_kind_of = rb_intern("kind_of?");
id_method_allocate_buffer_if_needed = rb_intern("allocate_buffer_if_needed");

MIN_INT8 = INT2NUM(-128);
MAX_INT8 = INT2NUM(127);
Expand Down
103 changes: 76 additions & 27 deletions cosmos/lib/cosmos/logs/log_writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,41 @@ class LogWriter
# @return [true/false] Whether logging is enabled
attr_reader :logging_enabled

# @return cycle_time [Integer] The amount of time in seconds before creating
# a new log file. This can be combined with cycle_size but is better used
# independently.
attr_reader :cycle_time

# @return cycle_hour [Integer] The time at which to cycle the log. Combined with
# cycle_minute to cycle the log daily at the specified time. If nil, the log
# will be cycled hourly at the specified cycle_minute.
attr_reader :cycle_hour

# @return cycle_minute [Integer] The time at which to cycle the log. See cycle_hour
# for more information.
attr_reader :cycle_minute

# @return [Time] Time that the current log file started
attr_reader :start_time

# @return [Mutex] Instance mutex protecting file
attr_reader :mutex

# The cycle time interval. Cycle times are only checked at this level of
# granularity.
CYCLE_TIME_INTERVAL = 2
CYCLE_TIME_INTERVAL = 10

# Mutex protecting class variables
@@mutex = Mutex.new

# Array of instances used to keep track of cycling logs
@@instances = []

# Thread used to cycle logs across all log writers
@@cycle_thread = nil

# Sleeper used to delay cycle thread
@@cycle_sleeper = nil

# @param remote_log_directory [String] The s3 path to store the log files
# @param logging_enabled [Boolean] Whether to start with logging enabled
Expand Down Expand Up @@ -89,11 +121,15 @@ def initialize(
# each time we create an entry which we do a LOT!
@entry = String.new

@cycle_thread = nil
if @cycle_time or @cycle_hour or @cycle_minute
@cycle_sleeper = Sleeper.new
@cycle_thread = Cosmos.safe_thread("Log cycle") do
cycle_thread_body()
@@mutex.synchronize do
@@instances << self

unless @@cycle_thread
@@cycle_thread = Cosmos.safe_thread("Log cycle") do
cycle_thread_body()
end
end
end
end
end
Expand All @@ -113,10 +149,13 @@ def stop
# Stop all logging, close the current log file, and kill the logging threads.
def shutdown
stop()
if @cycle_thread
@cycle_sleeper.cancel
Cosmos.kill_thread(self, @cycle_thread)
@cycle_thread = nil
@@mutex.synchronize do
@@instances.delete(self)
if @@instances.length <= 0
@@cycle_sleeper.cancel if @@cycle_sleeper
Cosmos.kill_thread(self, @@cycle_thread) if @@cycle_thread
@@cycle_thread = nil
end
end
end

Expand All @@ -143,28 +182,38 @@ def create_unique_filename(ext = extension)
end

def cycle_thread_body
@@cycle_sleeper = Sleeper.new
while true
# The check against start_time needs to be mutex protected to prevent a packet coming in between the check
# and closing the file
@mutex.synchronize do
utc_now = Time.now.utc
# Logger.debug("start:#{@start_time.to_f} now:#{utc_now.to_f} cycle:#{@cycle_time} new:#{(utc_now - @start_time) > @cycle_time}")
if @logging_enabled and
(
# Cycle based on total time logging
(@cycle_time and (utc_now - @start_time) > @cycle_time) or

# Cycle daily at a specific time
(@cycle_hour and @cycle_minute and utc_now.hour == @cycle_hour and utc_now.min == @cycle_minute and @start_time.yday != utc_now.yday) or

# Cycle hourly at a specific time
(@cycle_minute and not @cycle_hour and utc_now.min == @cycle_minute and @start_time.hour != utc_now.hour)
)
close_file(false)
start_time = Time.now
@@mutex.synchronize do
@@instances.each do |instance|
# The check against start_time needs to be mutex protected to prevent a packet coming in between the check
# and closing the file
instance.mutex.synchronize do
utc_now = Time.now.utc
# Logger.debug("start:#{@start_time.to_f} now:#{utc_now.to_f} cycle:#{@cycle_time} new:#{(utc_now - @start_time) > @cycle_time}")
if instance.logging_enabled and
(
# Cycle based on total time logging
(instance.cycle_time and (utc_now - instance.start_time) > instance.cycle_time) or

# Cycle daily at a specific time
(instance.cycle_hour and instance.cycle_minute and utc_now.hour == instance.cycle_hour and utc_now.min == instance.cycle_minute and instance.start_time.yday != utc_now.yday) or

# Cycle hourly at a specific time
(instance.cycle_minute and not instance.cycle_hour and utc_now.min == instance.cycle_minute and instance.start_time.hour != utc_now.hour)
)
instance.close_file(false)
end
end
end
end

# Only check whether to cycle at a set interval
break if @cycle_sleeper.sleep(CYCLE_TIME_INTERVAL)
run_time = Time.now - start_time
sleep_time = CYCLE_TIME_INTERVAL - run_time
sleep_time = 0 if sleep_time < 0
break if @@cycle_sleeper.sleep(sleep_time)
end
end

Expand Down
2 changes: 1 addition & 1 deletion cosmos/lib/cosmos/packets/packet.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Packet < Structure
# @param buffer [String] String buffer to hold the packet data
# @param item_class [Class] Class used to instantiate items (Must be a
# subclass of PacketItem)
def initialize(target_name, packet_name, default_endianness = :BIG_ENDIAN, description = nil, buffer = '', item_class = PacketItem)
def initialize(target_name, packet_name, default_endianness = :BIG_ENDIAN, description = nil, buffer = nil, item_class = PacketItem)
super(default_endianness, buffer, item_class)
# Explictly call the defined setter methods
self.target_name = target_name
Expand Down
Loading

0 comments on commit 406e47b

Please sign in to comment.