Skip to content

Hello World

Creating a py4pd Object

To define a new py4pd object, create a subclass of puredata.NewObject, name the object, and finally name the script with the name + .pd_py. For example, pymetro.pd_py for the object pymetro. Place the file in a place where Pd can find it.

Don't forget to load py4pd first

Example

import puredata as pd

class pymetro_class(pd.NewObject):
    name: str = "pymetro"  # Name of the Pure Data object

    def __init__(self, args):
        self.inlets = 2    # Number of inlets
        self.outlets = 1   # Number of outlets

Key Points

The Python class name (e.g., pymetro_class) can be any valid class name. The name attribute determines the name of the object inside Pure Data. self.inlets and self.outlets define the number of inlets and outlets for the object. When loading this object in Pure Data, use the name attribute value (pymetro in this example) as the object name.

Input and Output

Input

The input design is inspired by the mature pd-lua project. For methods, use the format in_<inlet_number>_<method>. For example, to execute code when a float is received on first inlet, define a method called in_0_float. Pd provides predefined methods that do not require a custom selector: bang, float, symbol, list, and anything. You can also create custom selectors (prefixes); for instance, in_0_mymethod will be executed when the message mymethod is sent to the first inlet of the object.

Output

To produce output, use the method self.out. For example, self.out(0, pd.SYMBOL, "test238") sends the symbol "test238" to the firist outlet. The second argument specifies the data type, which can be pd.SYMBOL or pd.FLOAT. To output a list, use pd.LIST instead. To output numpy.ndarray, class and others you must use pd.PYOBJECT.

To receive this pd.PYOBJECT you need to use the PyObject message, which allows you to share Python data types between py4pd objects. This enables the transfer of class instances, NumPy arrays, and other Python objects that are not supported by Pure Data’s traditional data types.

Metronome Example

import puredata as pd


class pymetro(pd.NewObject):
    name: str = "pymetro"

    def __init__(self, args):
        self.inlets = 2
        self.outlets = 1
        self.toggle = False
        if len(args) > 0:
            self.time = float(args[0])
        else:
            self.time = 1000
        self.metro = pd.new_clock(self, self.tick)
        self.args = args

    def in_1_float(self, f: float):
        self.time = f

    def in_0_float(self, f: float):
        if f:
            self.toggle = True
            self.tick()
        else:
            self.metro.unset()
            self.toggle = False

    def tick(self):
        if self.toggle:
            self.metro.delay(self.time)
        self.out(0, pd.SYMBOL, "test238")

Oscillator Example

This is a simple oscillator example

import puredata as pd
import math


class pytest_tilde(pd.NewObject):
    name: str = "pytest~"

    def __init__(self, args):
        self.inlets = pd.SIGNAL
        self.outlets = pd.SIGNAL
        self.phase = 0

    def perform(self, input):
        blocksize = self.blocksize
        samplerate = self.samplerate

        out_buffer = []
        for i in range(blocksize):
            phase_increment = 2 * math.pi * input[0][i] / samplerate
            sample = math.sin(self.phase)
            out_buffer.append(sample)
            self.phase += phase_increment
            if self.phase > 2 * math.pi:
                self.phase -= 2 * math.pi
        out_tuple = tuple(out_buffer)
        return out_tuple

    def dsp(self, sr, blocksize, inchans):
        self.samplerate = sr
        self.blocksize = blocksize
        self.inchans = inchans
        return True

Train AI Example

This is a complex and complet example to train AI using objects from timbreLIBId. This use threading to avoid block the Pd audio thread.

import puredata as pd

import os
import random
import threading
import numpy as np
import librosa

from sklearn.metrics import classification_report
from catboost import CatBoostClassifier


class pytrain(pd.NewObject):
    name = "py.train"

    def __init__(self, args):
        # pd
        self.inlets = 2
        self.outlets = 2
        self.tabname = "train"
        self.redraw_tab_after = 120

        # train parameters
        self.max_offset = 256
        self.n_windows = 10
        self.iterations = 150
        self.fn_estimators = 150
        self.random_state = 42
        self.test_fraction = 0.2

        # folders
        self.trainfolder = ""
        self.folders = {}
        self.currtraindata = []
        self.currtestdata = []

        # datasets
        self.x_train, self.y_train = [], []
        self.x_test, self.y_test = [], []

        # model
        self.clf = self._init_model()

    # ----------------------------
    # Model
    # ----------------------------
    def _init_model(self):
        # TODO: Implement more models
        return CatBoostClassifier(
            iterations=self.iterations,
            depth=6,
            learning_rate=0.1,
            loss_function="MultiClass",
            random_seed=self.random_state,
            verbose=100,
            early_stopping_rounds=20,
        )

    # ----------------------------
    # Folder / Dataset Management
    # ----------------------------
    def _resolve_trainfolder(self, path):
        if os.path.exists(path):
            return path
        candidate = os.path.join(self.get_current_dir(), path)
        if not os.path.exists(candidate):
            raise FileNotFoundError(f"{path} folder not found")
        return candidate

    def in_1_trainfolder(self, args):
        self.trainfolder = self._resolve_trainfolder(args[0])
        self.folders = {
            f: os.path.join(self.trainfolder, f)
            for f in os.listdir(self.trainfolder)
            if os.path.isdir(os.path.join(self.trainfolder, f))
        }
        self.logpost(2, "Train folder: " + self.trainfolder)

    # ----------------------------
    # Audio processing
    # ----------------------------
    def _load_audio(self, filepath):
        sr = pd.get_sample_rate()
        y, _ = librosa.load(filepath, sr=sr)
        return y

    def _split_train_test(self, files):
        n_test = max(1, int(len(files) * self.test_fraction))
        test_files = random.sample(files, n_test)
        train_files = [f for f in files if f not in test_files]
        return train_files, test_files

    def _generate_variants(self, y, sr, mode="traindata"):
        """Retorna uma lista de versões do áudio (original + augmentadas se treino)"""
        variants = [y]  # sempre inclui o original

        if mode == "traindata":
            # Exemplo: time stretch
            value = random.uniform(0.7, 1.2)
            variants.append(librosa.effects.time_stretch(y=y, rate=value))
            value = random.uniform(0.7, 1.2)
            variants.append(librosa.effects.time_stretch(y=y, rate=value))

            # Exemplo: pitch shift
            value = random.uniform(-2, 2)
            variants.append(librosa.effects.pitch_shift(y=y, sr=sr, n_steps=value))
            value = random.uniform(-2, 2)
            variants.append(librosa.effects.pitch_shift(y=y, sr=sr, n_steps=value))

            # Exemplo: adicionar ruído
            value = random.uniform(0.005, 0.009)
            noise = np.random.normal(0, value, len(y))
            variants.append(y + noise)

        return variants

    def _process_file(self, filepath, label, target_list, mode):
        y = self._load_audio(filepath)
        sr = pd.get_sample_rate()
        variants = self._generate_variants(y, sr, mode=mode)

        for signal in variants:
            self._write_tab(signal)

            idx = random.randint(0, 1024)
            while True:
                self.out(1, pd.LIST, [idx, mode])
                if idx >= len(signal) - 2048:
                    break

                if mode == "testdata":
                    assert len(self.currtestdata) > 0
                    target_list.append((self.currtestdata, label))
                    self.currtestdata = []
                else:
                    assert len(self.currtraindata) > 0
                    target_list.append((self.currtraindata, label))
                    self.currtraindata = []

                idx += random.randint(512, 1024)

    def _write_tab(self, y):
        self.redraw += 1

        self.tabwrite(
            "train",
            y.tolist(),
            resize=True,
            redraw=(self.redraw % self.redraw_tab_after == 0),
        )

    # ----------------------------
    # Dataset Build
    # ----------------------------
    def get_train_mir(self):
        self.redraw = 0
        train_data, test_data = [], []

        for label, folder in self.folders.items():
            self.logpost(2, f"Processing {label}")
            all_files = [
                os.path.join(folder, f)
                for f in os.listdir(folder)
                if f.endswith((".aif", ".aiff", ".wav"))
            ]
            train_files, test_files = self._split_train_test(all_files)

            for f in test_files:
                self._process_file(f, label, test_data, "testdata")

            for f in train_files:
                self._process_file(f, label, train_data, "traindata")

        self.x_train, self.y_train = zip(*train_data) if train_data else ([], [])
        self.x_test, self.y_test = zip(*test_data) if test_data else ([], [])

        self.x_np_train = np.array(self.x_train)
        self.y_np_train = np.array(self.y_train)
        self.x_np_test = np.array(self.x_test)
        self.y_np_test = np.array(self.y_test)

        self.logpost(2, "Done!")

    # ----------------------------
    # Training / Export
    # ----------------------------
    def _train(self):
        self.clf.fit(
            self.x_np_train,
            self.y_np_train,
            eval_set=(self.x_np_test, self.y_np_test),
        )
        y_pred = self.clf.predict(self.x_np_test)
        self.logpost(2, classification_report(self.y_np_test, y_pred), prefix=False)
        self.logpost(2, "", prefix=False)
        self.logpost(2, "Training finished!")

    def in_0_train(self, args):
        self.logpost(2, "Training, wait...")
        t = threading.Thread(target=self._train, daemon=True)
        t.start()

    def in_0_export(self, args):
        file = args[0]
        path = os.path.join(self.get_current_dir(), file)
        if os.path.exists(path):
            self.logpost(1, "Model already exists, will replace it!")
        self.clf.save_model(path, format="onnx")
        self.logpost(2, f"Model exported to {path}")

    # ----------------------------
    # Utils
    # ----------------------------
    def in_0_printdata(self, _):
        self.logpost(2, f"Train samples: {len(self.x_train)}")
        self.logpost(2, f"Train labels: {len(self.y_train)}")
        assert len(self.x_train) == len(self.y_train)

        self.logpost(2, f"Test samples: {len(self.x_test)}")
        self.logpost(2, f"Test labels: {len(self.y_test)}")
        assert len(self.x_test) == len(self.y_test)

    def in_1_testdata(self, data):
        self.currtestdata = data

    def in_1_traindata(self, data):
        self.currtraindata = data

    def in_0_analyze(self, _):
        t = threading.Thread(target=self.get_train_mir, daemon=True)
        t.start()

    def in_0_randomload(self, args):
        all_files = [
            os.path.join(folder, f)
            for label, folder in self.folders.items()
            for f in os.listdir(folder)
            if f.endswith((".aif", ".aiff"))
        ]
        if not all_files:
            return None
        random_file = random.choice(all_files)
        y = self._load_audio(random_file)
        self.tabwrite("train", y.tolist(), resize=True)

For patch where I use this object and more info check pd-onnx. Inside the resources folder also this youtube example.