Skip to content

Commit

Permalink
Merge branch 'development' into development_py36
Browse files Browse the repository at this point in the history
  • Loading branch information
petersilva committed Jan 10, 2025
2 parents 74e6e6b + 8120ef7 commit 6cc71bd
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 91 deletions.
11 changes: 11 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
metpx-sr3 (3.00.57rc2) UNRELEASED; urgency=medium

* fix #1366 ln -sf a b ... leave tmp file when a does not exist.
* fix #1365 better error handling when makedirs fail.
* fix #1363 hardlink support broken.
* fix #1358 status displays of intervals over 1 month causes crashes.
* fix #1351 download with arbitrary checksum.
* progress #1297 hard ordering of operations (still some use cases left.)

-- peter <[email protected]> Mon, 23 Dec 2024 00:56:15 -0500

metpx-sr3 (3.00.57rc1) unstable; urgency=medium

* New #1350 Add "down" and "disconnected" run states to sr3 status
Expand Down
16 changes: 8 additions & 8 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ count, size

duration
a floating point number indicating a quantity of seconds (0.001 is 1 milisecond)
modified by a unit suffix ( m-minute, h-hour, w-week )
modified by a unit suffix ( m-minute, h-hour, M-month, y-year )

flag
an option that has only True or False values (aka: a boolean value)
Expand Down Expand Up @@ -798,18 +798,18 @@ expire <duration> (default: 5m == five minutes. RECOMMEND OVERRIDING)
The **expire** option is expressed as a duration... it sets how long should live
a queue without connections.

A raw integer is expressed in seconds, if the suffix m,h,d,w
are used, then the interval is in minutes, hours, days, or weeks. After the queue expires,
A raw integer is expressed in seconds, if the suffix m,h,d,M
are used, then the interval is in minutes, hours, days, or months. After the queue expires,
the contents are dropped, and so gaps in the download data flow can arise. A value of
1d (day) or 1w (week) can be appropriate to avoid data loss. It depends on how long
1d (day) or 1m (month) can be appropriate to avoid data loss. It depends on how long
the subscriber is expected to shutdown, and not suffer data loss.

if no units are given, then a decimal number of seconds can be provided, such as
to indicate 0.02 to specify a duration of 20 milliseconds.

The **expire** setting must be overridden for operational use.
The default is set low because it defines how long resources on the broker will be assigned,
and in early use (when default was 1 week) brokers would often get overloaded with very
and in early use (when default was 1 month) brokers would often get overloaded with very
long queues for left-over experiments.

This *subtopic* option should appear after the *expire* setting in files
Expand Down Expand Up @@ -1304,10 +1304,10 @@ process looks for files in the cache that have not been referenced in **cache**
and deletes them, in order to keep the cache size limited. Different settings are
appropriate for different use cases.

A raw integer interval is in seconds, if the suffix m,h,d, or w are used, then the interval
is in minutes, hours, days, or weeks. After the interval expires the contents are
A raw integer interval is in seconds, if the suffix m,h,d, or M are used, then the interval
is in minutes, hours, days, or months. After the interval expires the contents are
dropped, so duplicates separated by a large enough interval will get through.
A value of 1d (day) or 1w (week) can be appropriate. Setting the option without specifying
A value of 1d (day) or 1M (Month) can be appropriate. Setting the option without specifying
a time will result in 300 seconds (or 5 minutes) being the expiry interval.

Default value in a Poll is 8 hours, should be longer than nodupe_fileAgeMax to prevent
Expand Down
10 changes: 5 additions & 5 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ count

duration
un nombre à virgule flottante qui indique une quantité en secondes (0.001 est 1 milliseconde)
modifié par un suffixe unitaire ( m-minute, h-heure, w-semaine ).
modifié par un suffixe unitaire ( m-minute, h-heure, M-mois ).

flag
une option qui a la valeur soit Vrai (True ou on) ou Faux (False ou off) (une valeur booléenne).
Expand Down Expand Up @@ -786,18 +786,18 @@ expire <duration> (défaut: 5m == cinq minutes. RECOMMENDE DE REMPLACER)
L'option *expire* est exprimée sous forme d'une duration... ça fixe combien de temps une fil d’attente devrait
vivre sans connexions.

Un entier brut est exprimé en secondes, et si un des suffixe m,h,d,w est utilisés, l’intervalle est en minutes,
heures, jours ou semaines respectivement. Après l’expiration de la fil d’attente, le contenu est supprimé et
Un entier brut est exprimé en secondes, et si un des suffixe m,h,d,m est utilisés, l’intervalle est en minutes,
heures, jours ou mois respectivement. Après l’expiration de la fil d’attente, le contenu est supprimé et
des différences peuvent donc survenir dans le flux de données de téléchargement. Une valeur de
1d (jour) ou 1w (semaine) peut être approprié pour éviter la perte de données. Cela dépend de combien de temps
1d (jour) ou 1m (mois) peut être approprié pour éviter la perte de données. Cela dépend de combien de temps
l’abonné est sensé s’arrêter et ne pas subir de perte de données.

Si aucune unité n’est donnée, un nombre décimal de secondes peut être fourni, tel que
0,02 pour spécifier une durée de 20 millisecondes.

Le paramètre **expire** doit être remplacé pour une utilisation opérationnelle.
Le défaut est défini par une valeur basse car il définit combien de temps les ressources vont être
assigné au courtier, et dans les premières utilisations (lorsque le défaut était de de 1 semaine), les courtiers
assigné au courtier, et dans les premières utilisations (lorsque le défaut était de de 1 mois), les courtiers
étaient souvent surchargés de très longues files d’attente pour les tests restants.

l´option *subtopic* devrait apparaître après le paramètre *expire* dans les fichiers
Expand Down
76 changes: 59 additions & 17 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,28 +316,51 @@ def timev2tov3str(s):
else:
return s[0:8] + 'T' + s[8:]

"""
So for natural delta, tested stuff, and empirically it looks like humanize thinks this is how many days there are in a month.
At least assuming that makes calculations here match with what humanize does.
"""
days_in_a_month=30.7

def durationToString(d) -> str:
"""
given a numbner of seconds, return a short, human readable string.
naturaldelta does not do weeks...
"""
if (d < 60):
return f"{d:7.2f}s"

first_part= humanize.naturaldelta(d).replace("minutes","m").replace("seconds","s").replace("hours","h").replace("days","d").replace("an hour","1h").replace("a day","1d").replace("a minute","1m").replace(" ","")

second_part=""
if first_part[-1] == 'm':
rem=int(d-int(first_part[0:-1])*60)
if rem > 0:
second_part=f"{rem:d}s"
if first_part[-1] == 'h':
rem=int(( d-int(first_part[0:-1])*60*60 ) / 60 )
if rem > 0:
second_part=f"{rem:d}m"
if first_part[-1] == 'd':
rem=int (( d-int(first_part[0:-1])*60*60*24 ) / (60*60) )
if rem > 0:
second_part=f"{rem:d}h"
hnd = humanize.naturaldelta(d).replace("minute","m").replace("second","T").replace("hour","h").replace("day","d").replace("month","M").replace("year","y").replace(" ","").replace("s","").replace("T","s").replace("an", "1").replace("a","1")

if ',' in hnd:
( first_part, second_part ) = hnd.split(',')
else:
first_part=hnd
second_part=""

if not second_part:
if first_part[-1] == 'm':
rem=int(d-int(first_part[0:-1])*60)
if rem > 0:
second_part=f"{rem:d}s"
if first_part[-1] == 'h':
rem=int(( d-int(first_part[0:-1])*60*60 ) / 60 )
if rem > 0:
second_part=f"{rem:d}m"
if first_part[-1] == 'd':
rem=int (( d-int(first_part[0:-1])*60*60*24 ) / (60*60) )
if rem > 0:
second_part=f"{rem:d}h"
if first_part[-1] == 'M':
rem=int (( d-int(first_part[0:-1])*60*60*24*days_in_a_month ) / (60*60*24) )
if rem > 0:
second_part=f"{rem:d}d"
if first_part[-1] == 'y':
rem=int (( d-int(first_part[0:-1])*60*60*24*365.25 ) / (60*60*24*days_in_a_month ) )
if rem > 0:
second_part=f"{rem:d}M"
return first_part+second_part

def durationToSeconds(str_value, default=None) -> float:
Expand All @@ -364,7 +387,7 @@ def durationToSeconds(str_value, default=None) -> float:
return float(default)

first_unit=None
second_unit=str_value[-1].lower()
second_unit=str_value[-1]
if second_unit in 's':
factor *= 1
first_unit='m'
Expand All @@ -376,9 +399,24 @@ def durationToSeconds(str_value, default=None) -> float:
first_unit='d'
elif second_unit in 'd':
factor *= 60 * 60 * 24
first_unit='w'
if 'y' in str_value:
first_unit='y'
elif 'M' in str_value:
first_unit='M'
else:
first_unit='w'
elif second_unit in 'w':
factor *= 60 * 60 * 24 * 7
if 'y' in str_value:
first_unit='y'
elif 'M' in str_value:
first_unit='M'
elif second_unit in 'M':
factor *= 60 * 60 * 24 * days_in_a_month
if 'y' in str_value:
first_unit='y'
elif second_unit in 'y':
factor *= 60 * 60 * 24 * 365.25

if str_value[-1].isalpha(): str_value = str_value[:-1]

Expand All @@ -394,6 +432,10 @@ def durationToSeconds(str_value, default=None) -> float:
big = big*60*60*24
elif first_unit == 'w':
big = big*60*60*24*7
elif first_unit == 'M':
big = big*60*60*24*days_in_a_month
elif first_unit == 'y':
big = big*60*60*24*365.25
str_value = little
else:
big=0
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.00.57rc1"
__version__ = "3.00.57rc2"
66 changes: 52 additions & 14 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,25 @@ def write_inline_file(self, msg) -> bool:

return True

def compute_local_checksum(self, msg) -> None:
def compute_local_checksum(self, msg, lstat=None) -> None:
"""
For a file whose path is given by the msg, calculate 'local_identity' field.
when checksums for files are stored in extended attributes, it's ideal to retrieve them,
rather than having to read the entire file again and re-calculate.
The extended attributes have a field:
* 'identity' ... the field from the message when the file was written.
* 'mtime' ... the mtime of the file when it was written.
The 'identity' extended attribute should be correct/good/useful if:
* the mtime of the file on disk is not newer than the current file mtime.
If the file has been over-written afterwards, the mtime will be different,
and the local checksum must be re-calculated from scratch.
If the checksum method is arbitrary, no local recalculation is possible.
"""

if sarracenia.filemetadata.supports_extended_attributes:
try:
Expand All @@ -1436,7 +1454,8 @@ def compute_local_checksum(self, msg) -> None:

if s:
metadata_cached_mtime = x.get('mtime')
if ((metadata_cached_mtime >= msg['mtime'])):
lstat_mtime = sarracenia.timeflt2str(lstat.st_mtime)
if (lstat and (metadata_cached_mtime >= lstat_mtime)):
# file has not been modified since checksum value was stored.

if (( 'identity' in msg ) and ( 'method' in msg['identity'] ) and \
Expand All @@ -1453,12 +1472,13 @@ def compute_local_checksum(self, msg) -> None:
except:
pass

# no local recalculation possible.
if msg['identity']['method'] in [ 'arbitrary' ]:
return

local_identity = sarracenia.identity.Identity.factory(
msg['identity']['method'])

if msg['identity']['method'] == 'arbitrary':
local_identity.value = msg['identity']['value']

local_identity.update_file(msg['new_path'])
msg['local_identity'] = {
'method': msg['identity']['method'],
Expand All @@ -1468,7 +1488,7 @@ def compute_local_checksum(self, msg) -> None:

def file_should_be_downloaded(self, msg) -> bool:
"""
determine whether a comparison of local_file and message metadata indicates that it is new enough
Determine whether a comparison of local_file and message metadata indicates that it is new enough
that writing the file locally is warranted.
return True to say downloading is warranted.
Expand Down Expand Up @@ -1555,16 +1575,19 @@ def file_should_be_downloaded(self, msg) -> bool:
return True

try:
self.compute_local_checksum(msg)
self.compute_local_checksum(msg,lstat)
except:
logger.debug(
"something went wrong when computing local checksum... considered different"
)
return True

logger.debug( f"checksum in message: {msg['identity']} vs. local: {msg['local_identity']}" )
if 'local_identity' in msg:
logger.debug( f"checksum in message: {msg['identity']} vs. local: {msg['local_identity']}" )
else:
logger.debug( f"checksum in message: {msg['identity']} vs. local: None" )

if msg['local_identity'] == msg['identity']:
if 'local_identity' in msg and msg['local_identity'] == msg['identity']:
self.reject(msg, 304, f"same checksum {msg['new_path']}" )
return False
else:
Expand Down Expand Up @@ -1595,7 +1618,8 @@ def renameOneItem(self, old, path) -> bool:
for messages with an rename file operation, it is to rename a file.
"""
ok = True
if not os.path.exists(old):
# it turns out that links that exist but point to non-existent files return exists: False.
if not os.path.islink(old) and not os.path.exists(old):
logger.info(
"old file %s not found, if destination (%s) missing, then fall back to copy"
% (old, path))
Expand Down Expand Up @@ -1635,6 +1659,7 @@ def mkdir(self, msg) -> bool:
except Exception as ex:
logger.warning("making %s: %s" % (msg['new_dir'], ex))
logger.debug('Exception details:', exc_info=True)
return False

if os.path.isdir(path):
logger.debug( f"no need to mkdir {path} as it exists" )
Expand Down Expand Up @@ -1663,8 +1688,14 @@ def link1file(self, msg, symbolic=True) -> bool:
imported from v2/subscribe/doit_download "link event, try to link the local product given by message"
"""
logger.debug("message is to link %s to %s" %
(msg['new_file'], msg['fileOp']['link']))
if 'link' in msg['fileOp']:
link=msg['fileOp']['link']
elif 'hlink' in msg['fileOp']:
link=msg['fileOp']['hlink']
else:
link='MALFORMED_LINK_MESSAGE'

logger.debug( f"message is to link {msg['new_file']} to {link}" )

# redundant, check is done in caller.
#if not 'link' in self.o.fileEvents:
Expand All @@ -1679,6 +1710,7 @@ def link1file(self, msg, symbolic=True) -> bool:
except Exception as ex:
logger.warning("making %s: %s" % (msg['new_dir'], ex))
logger.debug('Exception details:', exc_info=True)
return False

ok = True
try:
Expand Down Expand Up @@ -1745,6 +1777,8 @@ def do_download(self) -> None:
except Exception as ex:
logger.warning("making %s: %s" % (msg['new_dir'], ex))
logger.debug('Exception details:', exc_info=True)
self.reject(msg, 422, f"cannot create directory {msg['new_dir']} to put file in it." )
continue

os.chdir(msg['new_dir'])
logger.debug( f"chdir {msg['new_dir']}")
Expand Down Expand Up @@ -1833,10 +1867,14 @@ def do_download(self) -> None:
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxFiles'] += 1
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
continue
else:
# as above...
self.reject(msg, 500, "link %s failed" % msg['fileOp'])
continue
if 'hlink' not in msg['fileOp']:
self.reject(msg, 500, "link %s failed" % msg['fileOp'])
continue

logger.info( f"since hard link failed, fall back to copying from source" )

# all non-files taken care of above... rest of routine is normal file download.

Expand Down
20 changes: 15 additions & 5 deletions sarracenia/flowcb/mdelaylatest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,21 @@ def after_accept(self, worklist):
new_ok_delay = []
for m2 in self.ok_delay:
if m1['relPath'] == m2['relPath']:
logger.info(
f"intermediate version suppressed: {m1['relPath']}")
self.suppressions += 1
new_ok_delay.append(m1)
worklist.rejected.append(m2)
# an mkdir, rmdir, an rm, a rename, an ln: order important, publish immediately.
if ('fileOp' in m2) or ('fileOp' in m1):
if 'fileOp' in m2:
op=m2['fileOp']
else:
op=f"being later: {m1['fileOp']}"

logger.info( f"critically ordered operation: {m2['relPath']} {op}")
new_incoming.append(m2)
new_ok_delay.append(m1)
else:
logger.info( f"intermediate version suppressed: {m1['relPath']}")
self.suppressions += 1
new_ok_delay.append(m1)
worklist.rejected.append(m2)
wait = True
else:
new_ok_delay.append(m2)
Expand Down
Loading

0 comments on commit 6cc71bd

Please sign in to comment.