Source code for deeptone.deeptone

import logging
import platform
import sys

from collections import deque
from dataclasses import make_dataclass, field
from itertools import zip_longest
from math import ceil
from typing import Optional

import numpy as np
from scipy.io import wavfile
from scipy.signal import resample

from deeptone import utils
from deeptone.version import __version__
from deeptone.models import MODELS_BY_NAME, Model
from deeptone.models.arousal_model import LOW, HIGH, NEUTRAL, NO_SPEECH
from deeptone.models.emotions_model import HAPPY, TIRED, IRRITATED, NEUTRAL as EMO_NEUTRAL
from deeptone.models.gender_model import FEMALE, MALE, UNKNOWN
from deeptone.models.language_model import DE, EN, ES, FR, IT
from deeptone.models.language_model import UNKNOWN as LANG_UNKNOWN
from deeptone.models.model import EXCLUDED_MODELS
from deeptone.models.base_three_class_speech_model import SPEECH, MUSIC, OTHER, SILENCE
from deeptone.prediction_engine import PredictionEngine, MAX_INT16_VALUE

SAMPLING_RATE = 16000
SAMPLES_PER_MS = SAMPLING_RATE / 1000  # fixed because deeptone only uses 16kHz audio data
MIN_OUTPUT_PERIOD = 64
MODEL_RESOLUTION = 64  # how many milliseconds each prediction uses

# model outputs
AROUSAL_LOW = LOW
AROUSAL_HIGH = HIGH
AROUSAL_NEUTRAL = NEUTRAL
AROUSAL_NO_SPEECH = NO_SPEECH
AROUSAL_SILENCE = SILENCE

GENDER_FEMALE = FEMALE
GENDER_MALE = MALE
GENDER_UNKNOWN = UNKNOWN
GENDER_NO_SPEECH = NO_SPEECH
GENDER_SILENCE = SILENCE

SPEECH_SPEECH = SPEECH
SPEECH_MUSIC = MUSIC
SPEECH_OTHER = OTHER
SPEECH_SILENCE = SILENCE

SPEECH_RT_SPEECH = SPEECH
SPEECH_RT_MUSIC = MUSIC
SPEECH_RT_OTHER = OTHER
SPEECH_RT_SILENCE = SILENCE

EMOTIONS_HAPPY = HAPPY
EMOTIONS_TIRED = TIRED
EMOTIONS_IRRITATED = IRRITATED
EMOTIONS_NEUTRAL = EMO_NEUTRAL
EMOTIONS_NO_SPEECH = NO_SPEECH
EMOTIONS_SILENCE = SILENCE

LANGUAGE_DE = DE
LANGUAGE_EN = EN
LANGUAGE_ES = ES
LANGUAGE_FR = FR
LANGUAGE_IT = IT
LANGUAGE_UNKNOWN = LANG_UNKNOWN
LANGUAGE_NO_SPEECH = NO_SPEECH
LANGUAGE_SILENCE = SILENCE

logger = logging.getLogger(__name__)


class DeeptoneError(Exception):
    pass


class ModelNotFoundError(AttributeError):
    pass


class ChannelNotFound(AttributeError):
    pass


class MismatchedOutputSizeError(Exception):
    pass


[docs]class Deeptone: """ Entry point for the Deeptone SDK. Once this class is initialized, it provides access to the Deeptone Deep Learning models, which allow you to extract insights from your audio files. Three processing modes are supported: * File Processing: This mode allows you to provide a file to Deeptone, which will provide a time series analysis, \ alongside a summary and list of transitions for the entire file. * Audio Bytes Processing: This mode allows you to provide audio bytes to Deeptone. The output will be the same \ as in the File Processing case. * Stream Processing: This mode allows you to provide a real-time audio stream, which results in a continuous \ analysis, which will periodically generate insights as the stream progresses. Performance Considerations: Initialization of the Deep Learning models that power Deeptone is a time-consuming operation. As such, the initialization process of this class can be costly, and thus, we recommend that instances be long-lived. Thread Safety Considerations: Instances of `Deeptone` are thread-safe. However, the actual inference process is done within a critical section, meaning that performance might be limited when using a single instance across multiple threads. If performance is a critical requirement, youshould either ensure each thread has its own `Deeptone` instance (usage of a pool is recommended). Raises: LicenseManagerError: When the License Key is invalid or cannot be validated """ def __init__(self, license_key: str, prediction_engine=None): """ Default Constructor Args: license_key (str): The Deeptone License Key """ # Check if the tensorflow module was imported if "tensorflow" in sys.modules: raise DeeptoneError( 'ERROR: You have imported the "tensorflow" module. The DeepTone python SDK\n' "cannot be used in the same context together with the tensorflow module.\n" "For more information visit https://docs.oto.ai/sdk/troubleshooting#deeptone-sdk-and-tensorflow." ) _platform, platform_version = Deeptone._get_platform_data() self.prediction_engine = ( PredictionEngine(license_key, _platform, platform_version) if prediction_engine is None else prediction_engine ) self.models = self.__expose_available_models() logger.debug(f"Deeptone Engine is initialised.")
[docs] def is_model_available(self, model_name: str) -> bool: """ Check if a model with the given name is available Args: model_name (str): Model name to validate Returns: bool: True if the model name provided is available """ return self.prediction_engine.is_model_available(model_name)
[docs] def get_available_models(self) -> set: """ Retrieve the name of all available models Returns: set: The names of the available models """ return set([x for x in self.models.__dict__.keys() if not x.startswith("__")])
[docs] def process_file( self, filename: str, models: list, output_period: int, channel: Optional[int] = None, include_summary: bool = False, include_transitions: bool = False, include_raw_values: bool = False, use_chunking: bool = False, volume_threshold: float = 0.005, ) -> dict: """ Analyse a WAV File with the list of requested models. Args: filename (str): Path to the file to analyse models (list): List of models to use for the audio analysis output_period (int): How often (in milliseconds) the output of the models should be returned. The provided \ value must be a positive multiple of 64. channel (int, optional): The channel to analyse. If no channel is provided, all channels will be analysed include_summary (bool, optional): Should the file summary be returned include_transitions (bool, optional): Should the file transitions be returned include_raw_values (bool, optional): Should raw model outputs be included use_chunking (bool, optional): Should data be chunked before making predictions. Use this if the file being\ analyzed is large, to avoid issues with high memory consumption volume_threshold (float, optional): Threshold below which input data will be considered as no sound. Should be a number between 0 and 1, where 0 will treat all data as sound and 1 will treat all data as no sound. Defaults to 0.05 which should exclude very quiet fragments from analysis. Returns: dict: The results of the analysis for the request channels. In each channel, a Time Series will be returned, containing the aggregated results for the specific time window. If `include_summary` is set to `True`, the output will contain a summary for the entire file. If `include_transitions` is set to `True`, the transitions output groups the raw model output (1 prediction every 64 ms) into phases where the predicted classification remains the same. If `include_raw_values` is set to `True`, all possible classes with their respective probabilities will be returned in the model in addition to the most likely one. Refer to `Models <https://docs.oto.ai/sdk/models/models-overview>`__ for details on the outputs for each individual model. Example: .. code-block:: python { "channels": { "0": { "time_series": [ { "timestamp" : 0, "results": { "gender": { "result": "female", "confidence": 0.6255, }, "arousal": { "result": "high", "confidence": 0.9245, }, }, "raw": { "gender": { "female": 0.8, "male": 0.2, }, "arousal": { "high": 0.9245, "neutral": 0.0245, "low": 0.01 }, } }, ], "summary": { "gender": { "high_fraction": 0.8451, "low_fraction": 0.1124, "neutral_fraction": 0.0425, }, "arousal": { "high_fraction": 0.9451, "low_fraction": 0.0124, "neutral_fraction": 0.0425, } }, "transitions": { "gender": [ { "timestamp_start": 0, "timestamp_end": 1500, "result": "female", "confidence": 0.96 }, { "timestamp_start": 1500, "timestamp_end": 3420, "result": "male", "confidence": 0.87 }, ... { "timestamp_start": 8560, "timestamp_end": 10000, "result": "female", "confidence": 0.89 } ], "arousal": [ { "timestamp_start": 0, "timestamp_end": 2500, "result": "high", "confidence": 0.92 }, { "timestamp_start": 2500, "timestamp_end": 3420, "result": "low", "confidence": 0.85 }, ... { "timestamp_start": 7560, "timestamp_end": 10000, "result": "neutral", "confidence": 0.87 } ] } } } } """ if output_period < MIN_OUTPUT_PERIOD or output_period % MIN_OUTPUT_PERIOD != 0: raise ValueError( f"Invalid output_period: expected a value >= 64 and multiple of 64 but got {output_period}" ) requested_models = set(models) self.__check_model_availability(requested_models) # read in the wav file rate_in, data = wavfile.read(filename) logger.debug(f"Read file {filename} with sample rate {rate_in}.") self.prediction_engine.increment_metric("processed_file_count") channels = {} # select requested channel if more than one available if len(data.shape) > 1: channel_count = data.shape[1] if channel is not None and channel > channel_count - 1: raise ChannelNotFound( "Requested channel was not found. You requested channel %d but only %d channels in the given file." % (channel, channel_count), ) # if channel not specified - process all if channel is None: for i in range(channel_count): logger.debug(f"Processing channel {i}") channel_data = data[:, i] channels[str(i)] = self.process_audio_bytes( channel_data, models, output_period, include_summary, include_transitions, include_raw_values, rate_in, use_chunking, volume_threshold, ) else: logger.debug(f"Processing channel {channel}") channel_data = data[:, channel] channels[str(channel)] = self.process_audio_bytes( channel_data, models, output_period, include_summary, include_transitions, include_raw_values, rate_in, use_chunking, volume_threshold, ) else: logger.debug(f"Processing the single channel") channels["0"] = self.process_audio_bytes( data, models, output_period, include_summary, include_transitions, include_raw_values, rate_in, use_chunking, volume_threshold, ) return {"channels": channels}
[docs] def process_audio_bytes( self, data: np.ndarray, models: list, output_period: int, include_summary: bool = False, include_transitions: bool = False, include_raw_values: bool = False, rate_in: int = None, use_chunking: bool = False, volume_threshold: float = 0.005, ) -> dict: """ Analyse audio data with the list of requested models. This method can be used to generate timestamped predictions directly from audio bytes provided as a numpy array, rather than an audio file. Args: data (np.ndarray) : Data to analyse models (list): List of models to use for the audio analysis output_period (int): How often (in milliseconds) the output of the model should be returned include_summary (bool, optional): Should the summary be included include_transitions (bool, optional): Should the file transitions be returned include_raw_values (bool, optional): Should raw model outputs be included rate_in (int, optional): Sample rate of the original audio (in Hz). Should only be specified if the rate differs from the recommended one (16000). use_chunking (bool, optional): Should data be chunked before making predictions. Chunking is only recommended in case of very large data arrays, to avoid memory issues. volume_threshold (float, optional): Threshold below which input data will be considered as no sound. Should be a number between 0 and 1, where 0 will treat all data as sound and 1 will treat all data as no sound. Returns: dict: A dictionary containing timestamped results and summary/transitions/raw values, if applicable. If `include_summary` is set to `True`, the output will contain a summary for the entire data array. If `include_transitions` is set to `True`, the transitions output groups the raw model output (1 prediction every 64 ms) into phases where the predicted classification remains the same. If `include_raw_values` is set to `True`, all possible classes with their respective probabilities will be returned in the model in addition to the most likely one. Example: .. code-block:: python { "time_series": [ { "timestamp" : 100, "results": { "gender": { "result": "female", "confidence": 0.6255 }, "another_model": { "result: <>, "confidence": <confidence> }, } }, { "timestamp" : 105, "results: { "gender": {...}, "another_model": {...} } } ] } """ if output_period < MIN_OUTPUT_PERIOD or output_period % MIN_OUTPUT_PERIOD != 0: raise ValueError( f"Invalid output_period: expected a value >= 64 and multiple of 64 but got {output_period}" ) # re-sample to 16 kHz if rate_in is not None and rate_in != SAMPLING_RATE: logger.debug(f"Input sampling rate is {rate_in}, re-sampling to {SAMPLING_RATE}") sample_count = round(len(data) * float(SAMPLING_RATE) / rate_in) # Convert int16 to float32 and scale values with max int16 value if data.dtype == np.int16: data = data.astype(np.float32) / MAX_INT16_VALUE data = resample(data, sample_count) self.prediction_engine.increment_metric("resample_count") _, models_for_engine, optional_models = self.__get_model_names(models) additional_data = utils.initialise_additional_data(models) logger.debug("Generate predictions with requested models" + (" with chunking" if use_chunking else "")) predictions = ( utils.predict_with_chunking(data, models_for_engine, optional_models, self.prediction_engine) if use_chunking else utils.post_process_all_predictions( self.prediction_engine.predict( data, models_for_engine, optional_models, additional_data=additional_data ) ) ) return self.__process_predictions( predictions, len(data), models, output_period, include_summary, include_transitions, include_raw_values, volume_threshold, )
def __process_predictions( self, predictions: dict, sample_count: int, models: list, output_period: int, include_summary: bool = False, include_transitions: bool = False, include_raw_values: bool = False, volume_threshold: float = 0.005, ) -> dict: self.prediction_engine.increment_metric("file_sample_count", sample_count) model_predictions = {} output_size = -1 for index, model_name in enumerate(models): self.prediction_engine.increment_metric(model_name + "_sample_count", sample_count) model_predictions[model_name] = self.__get_model_output( model_name, predictions, output_period, include_summary, include_transitions, include_raw_values, volume_threshold, ) time_series_length = len(model_predictions[model_name]["time_series"]) if index == 0: output_size = time_series_length elif output_size != time_series_length: raise MismatchedOutputSizeError( "Output size for model %s is %d when it was expected to be %d.", output_size, time_series_length, ) time_series = [] # Combine predictions and raw results from multiple models as iter of tuples zipped_raw = zip(*[value["raw"] for value in model_predictions.values()]) if include_raw_values else [] zipped_time_series = zip(*[value["time_series"] for value in model_predictions.values()]) # Extra the timestamp one level high, group predictions under "results" and raw values under "raw" for result_tuple, raw_tuple in zip_longest(zipped_time_series, zipped_raw): combined_dict = {k: v for d in result_tuple for k, v in d.items()} time_step_result = {"timestamp": combined_dict.pop("timestamp"), "results": combined_dict} if include_raw_values and raw_tuple is not None: combined_raw_dict = {k: v for d in raw_tuple for k, v in d.items() if k != "timestamp"} time_step_result["raw"] = combined_raw_dict time_series.append(time_step_result) output = {"time_series": time_series} if include_summary: self.prediction_engine.increment_metric("summary_request_count") output["summary"] = {k: v["summary"][k] for k, v in model_predictions.items()} if include_transitions: self.prediction_engine.increment_metric("transitions_request_count") output["transitions"] = {k: v["transitions"][k] for k, v in model_predictions.items()} return output
[docs] def process_stream( self, input_generator, models: list, output_period: int, include_raw_values: bool = False, volume_threshold: float = 0.005, ): """ Analyse a real-time audio stream with the list of requested models. Args: input_generator (generator): Generator that yields byte arrays representing audio data sampled at 16kHz models (list): List of models to use for the audio analysis output_period (int): How often in ms (milliseconds) the returned generator should yield results. \ The provided value must be a positive multiple of 64. include_raw_values (bool, optional): Should raw model outputs be included volume_threshold (float): Threshold below which input data will be considered as no sound. Should be a number between 0 and 1, where 0 will treat all data as sound and 1 will treat all data as no sound. Returns: generator: A generator that yields aggregated results for every `output_period` milliseconds of audio data received by the `input_generator`. Refer to `Models <https://docs.oto.ai/sdk/models/models-overview>`__ for details on the outputs for each individual model. Example: .. code-block:: python { "timestamp": 0, "results": { "gender": { "result": "female", "confidence": 0.6255 ), "arousal": { "result": "high", "confidence": 0.9431 ) }, "raw": { "gender": { "female": 0.8, "male": 0.2, }, "arousal": { "high": 0.9245, "neutral": 0.0245, "low": 0.01 }, } } Raises: ModelNotFoundError: if any of the models are invalid ValueError: if the output_period provided is invalid """ if output_period < MIN_OUTPUT_PERIOD or output_period % MIN_OUTPUT_PERIOD != 0: raise ValueError( f"Invalid output_period: expected a value >= 64 and multiple of 64 but got {output_period}" ) self.__check_model_availability(set(models)) # min time needed to make all provided models work min_time = max((MODELS_BY_NAME.get(model_name).reliability_threshold for model_name in models)) buffer_time_size = min_time + output_period buffer_samples_size = SAMPLES_PER_MS * buffer_time_size buffer = deque(maxlen=ceil(buffer_samples_size)) update_every = ceil(SAMPLES_PER_MS * output_period) timestamp = 0 new_data_count = 0 self.prediction_engine.increment_metric("processed_stream_count") for chunk in input_generator: data = np.frombuffer(chunk, dtype=np.int16) index = 0 while index < len(data): missing_data_count = update_every - new_data_count read_size = min(len(data[index:]), missing_data_count) buffer.extend(data[index : index + read_size]) new_data_count += read_size index += read_size if new_data_count == update_every: self.prediction_engine.increment_metric("stream_sample_count", new_data_count) new_data_count = 0 result_dict = { "timestamp": timestamp, } context_samples = len(buffer) - update_every result_dict.update( self.__process_chunk( np.asarray(buffer), models, include_raw_values, volume_threshold, context_samples ) ) yield result_dict timestamp += output_period else: logger.debug("Input generator is exhausted.") if new_data_count != 0: self.prediction_engine.increment_metric("stream_sample_count", new_data_count) logger.debug("Final prediction is generated for leftover data.") result_dict = {"timestamp": timestamp} context_samples = len(buffer) - new_data_count result_dict.update( self.__process_chunk( np.asarray(buffer), models, include_raw_values, volume_threshold, context_samples ) ) yield result_dict return
[docs] def process_audio_chunk( self, data: np.ndarray, models: list, include_raw_values: bool = False, volume_threshold: float = 0.005, context_samples: int = 0, ) -> dict: """ Analyse an audio chunk with the list of requested models. This method should be use when a single prediction is needed for the whole chunk. For reliable predictions the duration of the audio should be at least the size of the receptive field of the requested model (approximately 2s for most models). For more info on receptive fields, check `Models <https://docs.oto.ai/sdk/models/models-overview>`__ Args: data (np.ndarray): Data to analyse, representing audio data sampled at 16kHz models (list): List of models to use for the audio analysis include_raw_values (bool, optional): Should raw model outputs be included volume_threshold (float): Threshold below which input data will be considered as no sound. Should be a number between 0 and 1, where 0 will treat all data as sound and 1 will treat all data as no sound. context_samples (int): Number of samples that are used as context (receptive field), the predictions for which should be removed from the final result. Defaults to 0, as not to remove anything. Returns: dict: A dictionary with the results from each model. Refer to `Models <https://docs.oto.ai/sdk/models/models-overview>`__ for details on the outputs for each individual model. Example: .. code-block:: python { "results": { "gender": { "result": "female", "confidence": 0.6255 ), "arousal": { "result": "high", "confidence": 0.9431 ) }, "raw": { "gender": { "female": 0.8, "male": 0.2, }, "arousal": { "high": 0.9245, "neutral": 0.0245, "low": 0.01 }, } } Raises: ModelNotFoundError: if any of the models are invalid """ self.__check_model_availability(set(models)) self.prediction_engine.increment_metric("processed_chunk_count") self.prediction_engine.increment_metric("chunk_sample_count", len(data)) for model in models: logger.warning( "There is not enough data to make a reliable prediction using %s model", model ) if not self.__assert_enough_data(data, model) else False return self.__process_chunk(data, models, include_raw_values, volume_threshold, context_samples)
# Private Methods def __process_chunk( self, data: np.ndarray, models: list, include_raw_values: bool = False, volume_threshold: float = 0.005, context_samples: int = 0, ) -> dict: """ Analyse a chunk of audio with the list of requested models Args: data (np.ndarray): Data to analyse models (list): List of models to use for the audio analysis include_raw_values (bool, optional): Should raw model outputs be included volume_threshold (float): Threshold below which input data will be considered as no sound. Should be a number between 0 and 1, where 0 will treat all data as sound and 1 will treat all data as no sound. context_samples (int): Number of samples that are used as context (receptive field), the predictions for which should be removed from the final result. Defaults to 0, as not to remove anything. Returns: dict: The aggregated results for the requested models. Refer to `Models <https://docs.oto.ai/sdk/models/models-overview>`__ for details on the outputs for each individual model. Example: .. code-block:: python { "gender": Result( value="female", confidence=0.6255 ), "arousal": Result( value="high", confidence=0.9431, ), } """ if set(models).intersection({"speaker-map"}): raise ValueError(f"Invalid model: speaker map cannot be used for streaming") requested_models, models_for_engine, optional_models = self.__get_model_names(models) original_context_samples = context_samples # Reduce context samples to the nearest multiple of samples per model resolution so that the prediction # length matches for models with different resolutions. samples_per_model_resolution = int(MODEL_RESOLUTION * SAMPLES_PER_MS) context_samples = int(context_samples - context_samples % samples_per_model_resolution) # Reduce context samples such that always at least one prediction is returned except if not enough data # available if data.shape[0] - context_samples < samples_per_model_resolution: context_samples = max(0, context_samples - samples_per_model_resolution) logger.debug("Generate predictions with requested models") predictions = utils.post_process_all_predictions( self.prediction_engine.predict(data, models_for_engine, optional_models, context_samples) ) json_output = {"results": {}} for model_name in requested_models: self.prediction_engine.increment_metric(model_name + "_sample_count", len(data) - original_context_samples) logger.debug(f"Transforming output of model {model_name}") model_output = self.__get_model_output( model_name, predictions, volume_threshold=volume_threshold, include_raw_values=include_raw_values ) json_output["results"][model_name] = { "result": model_output["results"]["result"], "confidence": model_output["results"]["confidence"], } if include_raw_values: if "raw" not in json_output.keys(): json_output["raw"] = {} json_output["raw"].update({model_name: model_output["raw"]}) return json_output def __get_model_output( self, model_name: str, predictions: dict, output_period: int = None, include_summary: bool = False, include_transitions: bool = False, include_raw_values: bool = False, volume_threshold: float = 0.005, ) -> dict: """ Args: model_name (str): Name of model to use for predictions predictions (dict): Dictionary containing the predictions returned by Tensorflow output_period (int, optional): How often should the results be returned (in milliseconds) include_summary (bool, optional): Should the summary be included include_transitions (bool, optional): Should the file transitions be returned include_raw_values (bool, optional): Should raw model outputs be included volume_threshold (float): Threshold below which input data will be considered as no sound. Should be a number between 0 and 1, where 0 will treat all data as sound and 1 will treat all data as no sound. Returns: dict: Dictionary with results and confidence Example: { "another_model": {"prediction: <>, "confidence": <confidence>}, } Raises: ModelNotFoundError: if unavailable models are requested """ model: Model = MODELS_BY_NAME.get(model_name) if model is None: raise ModelNotFoundError(f"A model with name {model_name} is not activated") aggregated = True if not output_period else False return model.get_pretty_output( predictions, aggregated=aggregated, output_period=output_period, include_summary=include_summary, include_transitions=include_transitions, include_raw_values=include_raw_values, volume_threshold=volume_threshold, ) def __check_model_availability(self, requested_models: set): """ Check whether all of the requested models are available Args: requested_models (set) : All the models that were requested by the user Raises: ModelNotFoundError: if unavailable models are requested """ available_models = self.__get_available_models() unavailable_models = requested_models - available_models if unavailable_models: raise ModelNotFoundError( "Some of the requested models are not available: %s, please choose from %s" % (",".join(unavailable_models), ",".join(available_models)), ) @staticmethod def __get_model_names(models): requested_models = set(models) models_for_engine = [ model_name for model in models for model_name in MODELS_BY_NAME.get(model).model_dependencies ] optional_models = [ model_name for model in models for model_name in MODELS_BY_NAME.get(model).optional_dependencies ] return requested_models, models_for_engine, optional_models def __expose_available_models(self): """ Prepare a data class containing a user-friendly mapping of model names Returns: dataclass: Mapping of model names (eg. dataclass.Arousal = "arousal") """ available_models = sorted([m for m in self.__get_available_models() if m not in EXCLUDED_MODELS]) available_model_names = sorted(set([MODELS_BY_NAME.get(x).get_model_class_name() for x in available_models])) return make_dataclass( "Models", [(x, str, field(default=y)) for x, y in zip(available_model_names, available_models)], ) def __get_available_models(self) -> set: """ Retrieve the name of all available models Returns: set: The names of the available models """ return self.prediction_engine.get_available_models() def __assert_enough_data(self, data: np.ndarray, model_name: str) -> bool: """ Check whether there is enough data to make a prediction, given receptive field size of a given model. Args: data (np.ndarray): data array that would be submitted to the model model_name (str): name of the model that the check should be performed against Returns: bool: Whether there is enough data for making a reliable prediction """ receptive_field = MODELS_BY_NAME.get(model_name).reliability_threshold return True if data.shape[0] >= receptive_field * SAMPLING_RATE / 1000 else False @staticmethod def _get_platform_data(): return "python_sdk", f"python={platform.python_version()} deeptone={__version__}"