Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: KafkaStream that takes instances from a Kafka Producer #206

Open
wants to merge 24 commits into
base: main
Choose a base branch
from

Conversation

risheetperi
Copy link
Contributor

No description provided.

risheetperi and others added 23 commits August 11, 2024 15:25
@risheetperi
Copy link
Contributor Author

The relevant changes for KafkaStream in src\capymoa\stream\_stream.py in the case of any issues:

from typing import Dict, Optional, Sequence, Union, Any
import json
from confluent_kafka import Consumer, KafkaError
class KafkaStream(Stream):
    """A Kafka-based datastream that buffers instances before processing.
    This assumes that the Kafka Producer has instances in the form of separte JSON items - one per line
    """

    def __init__(
            self,
            dtypes: list = None, # [('column1', np.float64), ('column2', np.int32), ('column3', np.float64), ('column3', str)] reads nominal attributes as str
            values_for_nominal_features = {},  # {i: [1,2,3], k: [Aa, BB]}. Key is integer. Values are turned into strings
            target_type: str = None, # "numeric" for regression, 'categorical' for classification
            target_index: int = -1,
            class_labels: Sequence[str] = None,
            server: str = "localhost:9092",
            topic: str = "topic1",
            group_id: str = "group1",
            buffer_size: int = 100, 
            schema: Optional[Schema] = None
    ):
        """Initialize the Kafka stream.

        :param dtypes: Data Types for each column
        :param values_for_nominal_features: Possible Values for Categorical Nominal Features
        :param target_type: 'numeric' for regression tasks, 'categorical' for classification tasks
        :param target_index: Index of the column containing the target/label variable
        :param class_labels: Possible class labels for Classification Tasks
        :param server: Kafka server that hosts the Kafka topics.
        :param topic: Kafka topic that contains the JSON-formatted instances
        :param group_id: Kafka group id of the Topic
        :param buffer_size: Size of the Stream's buffer which stores messages
        :param schema: Dataset Schema


        Note: The Kafka Producer must have a JSON format - one line per instance
        For example:

        {"period": 0, "nswprice": 0.056443, "nswdemand": 0.439155, "vicprice": 0.003467, "vicdemand": 0.422915, "transfer": 0.414912, "class": 1}
        {"period": 0.021277, "nswprice": 0.051699, "nswdemand": 0.415055, "vicprice": 0.003467, "vicdemand": 0.422915, "transfer": 0.414912, "class": 0}
        {"period": 0.042553, "nswprice": 0.051489, "nswdemand": 0.385004, "vicprice": 0.003467, "vicdemand": 0.422915, "transfer": 0.414912, "class": 0}
        """

        self.dtypes = dtypes
        self.values_for_nominal_features = values_for_nominal_features
        self.target_type = target_type
        if self.target_type != 'numeric' and self.target_type != 'categorical':
            raise ValueError("Type must be 'numeric' for regression or 'categorical' for classification")
        self.target_index = target_index
        self.class_labels = class_labels
        self.server = server
        self.topic = topic
        self.group_id = group_id
        self.buffer_size = buffer_size
        self.schema = schema

        self.features = None
        self.current_instance_index = 0  # Index for iterating over buffered data
        self.number_processed = 0

        kafka_config = { # Kafka Config for the server
            'bootstrap.servers': self.server,
            'group.id': group_id,
            'auto.offset.reset': 'earliest'
        }

        self.buffer: Sequence[Any] = []  # Buffer to store Kafka messages
        self.consumer = Consumer(kafka_config) # Initialise Consumer
        try:
            self.consumer.subscribe([self.topic]) # Subscribe to the specified topic
        except:
            self.consumer.close() # Close the consumer if the subscription fails


    def poll_kafka(self, timeout: float = 1.0) -> None:
        """Poll Kafka for new messages and add them to the buffer."""
        self.buffer.clear() # Clear current buffer
        added_count = 0
        while len(self.buffer) < self.buffer_size:
            msg = self.consumer.poll(timeout)
            if msg is None:
                break  # No message received within timeout
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f"End of partition reached {msg.topic()} [{msg.partition()}]")
                elif msg.error():
                    raise KafkaError(f"Kafka error: {msg.error()}")
            else:
                # Add valid message to buffer
                try:
                    parsed_message = json.loads(msg.value())
                    self.buffer.append(parsed_message)
                    added_count += 1
                    # Generate a schema for the first time
                    if self.schema is None: 
                        self.features = [f for f in parsed_message]
                        if self.target_type == 'categorical': # For Classification Data
                            if self.dtypes is None: # If data type is unknown
                                self.schema = Schema.from_custom(
                                                feature_names=[f for i, f in enumerate(self.features) if i != self.target_index],
                                                values_for_nominal_features = self.values_for_nominal_features,
                                                dataset_name="KafkaClassificationDataset",
                                                target_attribute_name=self.features[self.target_index],
                                                values_for_class_label=self.class_labels)
                            else: # If datatype is known
                                self.__moa_stream_with_only_header, self.moa_header = (
                                    _init_moa_stream_and_create_moa_header(
                                        number_of_instances=1,  # we only need this to initialize the MOA header
                                        feature_names=[data_info[0] for data_info in self.dtypes],
                                        values_for_nominal_features=self.values_for_nominal_features,
                                        values_for_class_label=self.class_labels,
                                        dataset_name="KafkaClassificationDataset",
                                        target_attribute_name=self.features[self.target_index],
                                        target_type=self.target_type,
                                    )
                                )
                                self.schema = Schema(moa_header=self.moa_header)

                        elif self.target_type == "numeric": # For regression datasets
                            if self.dtypes is None: # If datatype is not known
                                self.schema = Schema.from_custom(
                                                feature_names=[f for i, f in enumerate(self.features) if i != self.target_index],
                                                values_for_nominal_features = self.values_for_nominal_features,
                                                dataset_name="KafkaRegressionDataset",
                                                target_attribute_name=self.features[self.target_index],
                                                target_type='numeric')
                            else: # If datatype is known
                                self.__moa_stream_with_only_header, self.moa_header = (
                                    _init_moa_stream_and_create_moa_header(
                                        number_of_instances=1,  # we only need this to initialize the MOA header
                                        feature_names=[data_info[0] for data_info in self.dtypes],
                                        values_for_nominal_features=self.values_for_nominal_features,
                                        dataset_name="KafkaRegressionDataset",
                                        target_attribute_name=self.features[self.target_index],
                                        target_type=self.target_type,
                                    )
                                )
                                self.schema = Schema(moa_header=self.moa_header)
                    
                except json.decoder.JSONDecodeError:
                    raise json.decoder.JSONDecodeError(f"Unable to decode JSON item")
        self.current_instance_index = 0 # Reset Buffer Index
        print(f"Added {added_count} items to buffer")



    def has_more_instances(self) -> bool:
        """Check if more instances are available in the buffer."""
        return self.current_instance_index < len(self.buffer)


    def next_instance(self) -> Union[LabeledInstance, RegressionInstance]:
        """Get the next instance from the buffer.

        :raises ValueError: If no more instances are available.
        """
        if not self.has_more_instances():
            raise ValueError("No more instances in the buffer. Poll new instances if available")
        
        # Extract the next message from the buffer
        message = self.buffer[self.current_instance_index]
        self.current_instance_index += 1

        # Extract features and target based on schema
        features = np.array([message[feature] for feature in self.features])
        target = message[self.features[self.target_index]]
        
        # Convert Kafka message to appropriate instance type
        if self.schema.is_classification():
            self.number_processed += 1
            return LabeledInstance.from_array(self.schema, features, target)
        elif self.schema.is_regression():
            self.number_processed += 1
            return RegressionInstance.from_array(self.schema, features, target)
        else:
            raise ValueError("Unsupported task type: Must be regression or classification.")


    def close(self):
        """Close the Kafka consumer."""
        self.consumer.close()


    def __del__(self):
        """Ensures that Kafka consumer is closed when the object is deleted."""
        self.close()


    def get_schema(self):
        """Returns the schema of the KafkaStream"""
        return self.schema
    

    def get_buffer_size(self):
        """Returns the maximum size of the buffer"""
        return self.buffer_size
    

    def get_processed_instances(self):
        """Returns the number of processed instances from initiation"""
        return self.number_processed


    def get_moa_stream(self):
        raise ValueError("Not a moa_stream, reads from a kafka producer")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant