Data stream classes (gaze, eye states, IMU)#

class pyneon.Stream(source: DataFrame | Path | str)#

Bases: BaseTabular

Container for continuous data streams (gaze, eye states, IMU).

Data is indexed by timestamps in nanoseconds.

Parameters:
sourcepandas.DataFrame or pathlib.Path or str

Source of the stream data. Can be either:

  • .csv: Pupil Cloud format CSV file

  • .raw: Native format (requires .time and .dtype files in the same directory)

Note: Native format columns are automatically renamed to Pupil Cloud format for consistency. For example, gyro_x -> gyro x [deg/s].

Attributes:
filepathlib.Path or None

Path to the source file(s). None if initialized from DataFrame.

datapandas.DataFrame

Stream data with timestamp [ns] as index.

type{“gaze”, “eye_states”, “imu”, “custom”}

Inferred stream type based on data columns.

Methods

annotate_events(events[, overwrite, inplace])

Annotate stream data with event IDs based on event time intervals.

compute_azimuth_and_elevation([method, ...])

Compute gaze azimuth and elevation angles (in degrees) based on gaze pixel coordinates and append them to the stream data.

concat(other[, float_kind, other_kind, inplace])

Concatenate additional columns from another Stream to this Stream.

copy()

Create a deep copy of the instance.

crop([tmin, tmax, by, inplace])

Crop data to a specific time range based on timestamps, relative times since start, or row numbers.

interpolate([new_ts, float_kind, ...])

Interpolate the stream to new timestamps.

interpolate_events(events[, buffer, ...])

Interpolate data in the duration of events in the stream data.

restrict(other[, inplace])

Temporally crop the stream to the range of timestamps in another stream.

time_to_ts(time)

Convert relative time(s) in seconds to closest timestamp(s) in nanoseconds.

window_average(new_ts[, window_size, inplace])

Take the average over a time window to obtain smoothed data at new timestamps.

Examples

Load from Pupil Cloud CSV:

>>> gaze = Stream("gaze.csv")

Load from native format:

>>> gaze = Stream("gaze ps1.raw")

Create from DataFrame:

>>> df = pd.DataFrame({"timestamp [ns]": [...], "gaze x [px]": [...]})
>>> gaze = Stream(df)
annotate_events(events: Events, overwrite: bool = False, inplace: bool = False) Stream | None#

Annotate stream data with event IDs based on event time intervals.

Parameters:
eventsEvents

Events object containing the events to annotate. The events must have a valid id_name attribute, as well as start timestamp [ns] and end timestamp [ns] columns.

overwritebool, optional

If True, overwrite existing event ID annotations in the stream data. Defaults to False.

inplacebool, optional

If True, replace current data. Otherwise returns a new instance. Defaults to False.

Returns:
Stream or None

Annotated stream if inplace=False, otherwise None.

Raises:
ValueError

If no event ID column is known for the Events instance.

KeyError

If the expected event ID column is not found in the Events data.

property columns: Index#

Column names of the stream data.

compute_azimuth_and_elevation(method: Literal['linear'] = 'linear', overwrite: bool = False, inplace: bool = False) Stream | None#

Compute gaze azimuth and elevation angles (in degrees) based on gaze pixel coordinates and append them to the stream data.

Parameters:
method{“linear”}, optional

Method to compute gaze angles. Defaults to “linear”.

overwritebool, optional

Only applicable if azimuth and elevation columns already exist. If True, overwrite existing columns. If False, raise an error. Defaults to False.

inplacebool, optional

If True, replace current data. Otherwise returns a new instance. Defaults to False.

Returns:
Stream or None

Stream with gaze angles computed if inplace=False, otherwise None.

Raises:
ValueError

If required gaze columns are not present in the data.

concat(other: Stream, float_kind: str | int = 'linear', other_kind: str | int = 'nearest', inplace: bool = False) Stream | None#

Concatenate additional columns from another Stream to this Stream. The other Stream will be interpolated to the timestamps of this Stream to achieve temporal alignment. See interpolate() for details.

Parameters:
otherStream

The other stream to concatenate.

float_kindstr or int, optional

Kind of interpolation applied to columns of float type. See scipy.interpolate.interp1d for details. Defaults to “linear”.

other_kindstr or int, optional

Kind of interpolation applied to columns of other types. See scipy.interpolate.interp1d for details. Only “nearest”, “nearest-up”, “previous”, and “next” are recommended. Defaults to “nearest”.

inplacebool, optional

If True, replace current data. Otherwise returns a new instance. Defaults to False.

Returns:
Stream or None

Concatenated stream if inplace=False, otherwise None.

copy()#

Create a deep copy of the instance.

crop(tmin: Number | None = None, tmax: Number | None = None, by: Literal['timestamp', 'time', 'row'] = 'timestamp', inplace: bool = False) Stream | None#

Crop data to a specific time range based on timestamps, relative times since start, or row numbers.

Parameters:
tminnumbers.Number, optional

Start timestamp/time/row to crop the data to. If None, the minimum timestamp/time/row in the data is used. Defaults to None.

tmaxnumbers.Number, optional

End timestamp/time/row to crop the data to. If None, the maximum timestamp/time/row in the data is used. Defaults to None.

by“timestamp” or “time” or “row”, optional

Whether tmin and tmax are UTC timestamps in nanoseconds OR relative times in seconds OR row numbers of the stream data. Defaults to “timestamp”.

inplacebool, optional

If True, replace current data. Otherwise returns a new instance. Defaults to False.

Returns:
Stream or None

Cropped stream if inplace=False, otherwise None.

property dtypes: Series#

Data types of the columns in the stream data.

property duration: float#

Duration of the stream in seconds.

property first_ts: int#

First timestamp of the stream.

interpolate(new_ts: ndarray | None = None, float_kind: str | int = 'linear', other_kind: str | int = 'nearest', inplace: bool = False) Stream | None#

Interpolate the stream to new timestamps.

Data columns of float type are interpolated using float_kind, while other columns use other_kind. This distinction allows for appropriate interpolation methods based on data type.

Parameters:
new_tsnumpy.ndarray, optional

An array of new timestamps (in nanoseconds) at which to evaluate the interpolant. If None (default), new and equally-spaced timestamps are generated according to sampling_freq_nominal.

float_kindstr or int, optional

Kind of interpolation applied to columns of float type. See scipy.interpolate.interp1d for details. Defaults to “linear”.

other_kindstr or int, optional

Kind of interpolation applied to columns of other types. See scipy.interpolate.interp1d for details. Only “nearest”, “nearest-up”, “previous”, and “next” are recommended. Defaults to “nearest”.

inplacebool, optional

If True, replace current data. Otherwise returns a new instance. Defaults to False.

Returns:
Stream or None

Interpolated stream if inplace=False, otherwise None.

Notes

  • If new_ts contains timestamps outside the range of self.ts, the corresponding rows will contain NaN.

interpolate_events(events: Events, buffer: Number | tuple[Number, Number] = 0.05, float_kind: str | int = 'linear', other_kind: str | int = 'nearest', inplace: bool = False) Stream | None#

Interpolate data in the duration of events in the stream data. Similar to mne.preprocessing.eyetracking.interpolate_blinks().

Parameters:
eventsEvents

Events object containing the events to interpolate. The events must have start timestamp [ns] and end timestamp [ns] columns.

buffernumbers.Number or , optional

The time before and after an event (in seconds) to consider invalid. If a single number is provided, the same buffer is applied to both before and after the event. Defaults to 0.05.

float_kindstr or int, optional

Kind of interpolation applied to columns of float type. See scipy.interpolate.interp1d for details. Defaults to “linear”.

other_kindstr or int, optional

Kind of interpolation applied to columns of other types. See scipy.interpolate.interp1d for details. Only “nearest”, “nearest-up”, “previous”, and “next” are recommended. Defaults to “nearest”.

inplacebool, optional

If True, replace current data. Otherwise returns a new instance. Defaults to False.

Returns:
Stream or None

Interpolated stream if inplace=False, otherwise None.

Examples

Interpolate eye states data during blinks with a 50 ms buffer before and after:

>>> eye_states = eye_states.interpolate_events(blinks, buffer=0.05)
property is_uniformly_sampled: bool#

Whether the stream is uniformly sampled.

property last_ts: int#

Last timestamp of the stream.

restrict(other: Stream, inplace: bool = False) Stream | None#

Temporally crop the stream to the range of timestamps in another stream. Equivalent to crop(other.first_ts, other.last_ts).

Parameters:
otherStream

The other stream whose timestamps are used to restrict the data.

inplacebool, optional

If True, replace current data. Otherwise returns a new instance. Defaults to False.

Returns:
Stream or None

Restricted stream if inplace=False, otherwise None.

property sampling_freq_effective: float#

Effective sampling frequency of the stream.

property sampling_freq_nominal: int | None#

Nominal sampling frequency in Hz as specified by Pupil Labs (see https://pupil-labs.com/products/neon/specs). None for custom or unknown stream types.

property shape: tuple[int, int]#
time_to_ts(time: Number | ndarray) ndarray#

Convert relative time(s) in seconds to closest timestamp(s) in nanoseconds.

property times: ndarray#

Timestamps converted to seconds relative to stream start.

property timestamps: ndarray#

Timestamps of the stream in nanoseconds.

property ts: ndarray#

Alias for timestamps.

property ts_diff: ndarray#

Difference between consecutive timestamps.

window_average(new_ts: ndarray, window_size: int | None = None, inplace: bool = False) Stream | None#

Take the average over a time window to obtain smoothed data at new timestamps.

Parameters:
new_tsnumpy.ndarray

An array of new timestamps (in nanoseconds) at which to evaluate the averaged signal. Must be coarser than the source sampling, i.e.:

>>> np.median(np.diff(new_ts)) > np.median(np.diff(data.index))
window_sizeint, optional

The size of the time window (in nanoseconds) over which to compute the average around each new timestamp. If None (default), the window size is set to the median interval between the new timestamps, i.e., np.median(np.diff(new_ts)). The window size must be larger than the median interval between the original data timestamps, i.e., window_size > np.median(np.diff(data.index)).

inplacebool, optional

If True, replace current data. Otherwise returns a new instance. Defaults to False.

Returns:
Stream or None

Stream with window average applied on data if inplace=False, otherwise None.