-
Notifications
You must be signed in to change notification settings - Fork 45
Change Log
Prior to version 1.1.3, the only way to register a function was to create a test instance of the function and register the test instance directly or as part of a test pipeline execution. You can now specify all the metadata needed for registration as part of the function definition so that you can register without instantiating.
The bif module contains a number of useful functions. You can register all functions in the bif module as follows:
import iotfunctions.bif as bif
db = Database(credentials = credentials, tenant_id=credentials['tennant_id'])
db.register_module(bif)
You can also register specific functions. In the example below, we register a functions that filters data on status.
from iotfunctions.preprocessor import StatusFilter
db.register_functions(
[StatusFilter]
)
To make a function available for registration in this way you must implement the build_ui() class method. The build_ui() class method has no arguments. It returns a tuple containing a list of function inputs and list of function outputs. Functions inputs and outputs are objects that are instances of a class derived from ui.BaseUIControl. The objects describe how you want the inputs and outputs of your function to be represented in the UI. Here is an example:
@classmethod
def build_ui(cls):
#define arguments that behave as function inputs
inputs = []
inputs['input_item'] = UISingleItem(name = 'input_item',
datatype=None,
description = 'Item to alert on'
).to_metadata()
inputs['upper_threshold'] = UISingle(name = 'upper_threshold',
datatype=float,
description = 'Alert when item value is higher than this value'
).to_metadata()
#define arguments that behave as function outputs
outputs =
outputs['alert_name'] = UIFunctionOutSingle(name = 'alert_name',
datatype=bool,
description='Output of alert function'
).to_metadata()
return (inputs,outputs)
The following UI controls are currently supported:
- UISingleItem: Choose a single AS data item as a function input
- UIMultiItem: Choose multiple AS data items as a function input
- UISingle: Capture a single constant value as a function input
- UIFunctionOutSingle: Name a data items returned as a function output
- UIFunctionOutMulti: Name an array of data items returned as a function ouput
You can create a default sample entity called 'as_sample_entity':
import json
from iotfunctions.metadata import make_sample_entity
from iotfunctions.db import Database
with open('credentials.json', encoding='utf-8') as F:
credentials = json.loads(F.read())
db = Database(credentials=credentials)
entity = make_sample_entity(db=db, schema = None)
If you want different columns or a different name:
numeric_columns = ['fill_time','temp','humidity','wait_time','size_sd']
table_name = 'as_sample_cereal'
entity = make_sample_entity(db=db, schema = db_schema,
float_cols = numeric_columns,
name = table_name,
register = True)
Rather than generating a test pipeline for an entity then adding stages to the pipeline and executing, you can do it all in one step.
fn1 = CustomFunction1()
fn2 = CustomFunction2()
fnN = CustomFunctionN()
df = entity.exec_pipeline(fn1,f2...fnN)
There a new series of "read_" methods on Database objects. These allow you to quickly build dataframes from tables. Here is an example showing how to use a pandas aggregate dict to build a sql group by:
agg_dict = {
'temp': ['mean','std'],
'grade' : ['mean','std'],
'throttle' : ['mean','std']
}
df = db.read_agg(table_name='sample', schema = None, agg_dict = agg_dict, groupby = ['deviceid'])
iotfunctions now supports accessing tables in the non-default schema.
db = Database(credentials = credentials)
entity = EntityType(entity_name,db,
Column('company_code',String(50)),
Column('temp',Float()),
Column('grade',Float()),
Column('throttle',Float()),
**{
'_timestamp' : 'evt_timestamp',
'_db_schema' : 'DASH555'
})
iotfunctions has switched from IAM credentials to HMAC credentials for COS. Refer to the notebook for an example of how to used them. The IAM credentials still work when using util.loadCos etc, but all of the base classes and sample functions now use Database.cos_load, Database.cos_save etc. After this change ibm_boto3 is no longer a mandatory prereq.
Previously all function classes were located in the preprocessor module. They are being gradually reorganized into base classes (base module), samples (still in the preprocessor module) and "built in functions" (bif module). The "built in functions" module contains highly resuable functions that can be registered and used asis.The base module contains abstract classes that you can inherit from and the preprocessor module contains samples that you can adapt and learn from.
As the author of a custom function you can add trace information in your execute() method that will be reported when your function fails. See example:
class IoTAlertExpression(BaseEvent):
'''
Create alerts that are triggered when data values reach a particular range.
'''
def __init__(self, input_items, expression , alert_name):
self.input_items = input_items
self.expression = expression
self.alert_name = alert_name
super().__init__()
# registration metadata
self.inputs = ['input_items', 'expression']
self.constants = ['expression']
self.outputs = ['alert_name']
def execute(self, df):
df = df.copy()
if '${' in self.expression:
expr = re.sub(r"\$\{(\w+)\}", r"df['\1']", self.expression)
msg = 'expression converted to %s' %expr
else:
expr = self.expression
msg = 'expression was not in the form "${item}" so it will evaluated asis (%s)' %expr
self.trace_append(msg)
df[self.alert_name] = np.where(eval(expr), True, np.nan)
return df
To test the trace, I used the wrong column name in anexpression: 'co2**' instead of 'co2'
KeyError: '\'co2**\'
Completed stage IoTPackageInfo -> Completed stage ForecastFermentation -> pipeline failed during execution of stage IoTAlertExpression.
expression was not in the form "${item}" so it will evaluated asis (df["co2**"]>-222)
Dataframe at start of IoTAlertExpression: | df count: 728 | df index: id,obs_timestamp
deviceid : 73001
ncompany_code : JDI
co2 : -0.48930157557849796
ph : -2.356167019872004
temperature : 1.2924017570257476
humidity : 0.5249237599612201
entitydatagenerator : True
_timestamp : 2018-12-09 04:07:27.351009
npackage_url : git+https://github.com/ibm-watson-iot/functions.git@
nmodule : iotfunctions.bif
nversion : 1.1
ndelta_days : -0.1699923513631976
n2_dose : -0.034151879573493235
o2_dose : -0.05440538757276484
temp_change : -0.23274311694669808
The function failed to execute '
This function executes an expression. There is no validation of this string expression in the UI, so a syntax error, invalid reference to a data item or data type error could result in failure. As a best practice we recommend that you introduce custom functions rather than rely on functions like this, but if there is a need to do something like this, you can provide feedback in your code as above using:
self.trace_append('<the message that you would like appended whatever exists in the trace>')
At any point if you want to clear the trace and start fresh, use
self.trace_replace('<the message that that you would like to see instead of whatever is in the trace>')
These methods are defined in BaseFunction. To use this technique on legacy functions that are not derived from BaseFunction you can add a _trace instance variable. The contents of this variable will be reported in the event of an error.
Create new entity types using:
from iotfunctions.metadata import EntityType
from iotfunctions.db import Database
# if environment variables are set for credentials
db = Database(credentials=None,tenant_id='<your tenant id>')
entity = EntityType('my_entity',db)
Model additional input tables for entities
# add an operator slowly changing dimension
entity.add_slowly_changing_dimension('operator',String(50))
# add a maintenance activity table that will be used to
# keep track durations of scheduled and unscheduled maintenance activities
# and keep tabs on 'materials cost'
entity.add_activity_table('widget_maintenance_activity',
['PM','UM'],
Column('materials_cost',Float()))
Using a non default timestamp or schema
parms = {
'schema' : '<my_db_schema>',
'_timestamp' : '<my_timestamp_col>'
}
entity = EntityType('my_entity',db,**parms)
Automatically generate sample data
entity.generate_data(days = 10, drop_existing = True)
Use local pipelines to test and register functions.
Example:
pl = entity.get_calc_pipeline()
pl.add_stage(EntityDataGenerator('temp'))
df = pl.execute(to_csv= True,start_ts=None, register=True)
After assembling a local pipeline for testing you can deliver it to AS by publishing it.
pl.publish()
Note: Publish works best when publishing to a new entity type with no calculated items. You will receive a 409 error if items published clash with items already present on the entity type.
For a walkthrough of entity types and local pipelines see: entity and local pipeline sample script
When functions are run in a pipeline (server or local), the pipeline set the entity type for each function using an instance variable named _entity_type. This variable will contain an EntityType object. This object will give you access a complete set of metadata about the Entity Type including tenant_id, _timestamp (name of the timestamp column). When running on the server, this metadata will be initialized by the engine. When running locally, you will need to define it as per the sample script above.
The entity type object also provides access to a Database object that contains a database connection and other credentials. To get access to the database object use the convenience method get_db():
db = self.get_db()
By using the Database object provided by the pipeline there is no need to incur the cost of establishing additional database connections during function execution.
Tag function outputs as 'DIMENSION's to build aggregate tables
self.itemTags['output_items'] = ['DIMENSION']
For an example, see preprocessor.LookupCompany
db.unregister_functions(['Function1','Function2'])
iotfunctions is pre-installed onto the server. There is no need to install via pip_main in your catalog's setup.py.