Skip to content

EDF-to-BIDS conversion

Derived from edf_to_bids and convert.py Hsop: Converting eye-tracking into BIDS - Standard Operating Procedures of the HCPh project (axonlab.org) TheAxonLab/hcph-sops (github.com)

EyeLink eye tracking system produces EDF recording files. In this step we need to first convert the raw edf files to BIDS format for enforcing a standardized structure, naming convention and metadata description. It makes researchers easier to understand and use data from different sources.

0 Package preparation

  • To use pyEDFRead python package, we must properly install the open-source software EyeLink on our device/laptop in the very first step. You can find the instructions here.

  • Import the environments

from __future__ import annotations 
from pathlib import Path
import pandas as pd
import numpy as np
from pyedfread import read_edf
from collections import defaultdict
from itertools import product, groupby
from warnings import warn
import re

1 Load raw ET data

  • Use pyedfread package to open edf and

DATA_PATH = Path("/Path/to/EDF folder")
edf_name = f"file1.EDF"

file_path = str(DATA_PATH / edf_name)
print(file_path)
ori_recording, ori_events, ori_messages = read_edf(file_path)
The edf file will generate Pandas dataframes:

  • ori_recording: The ET recordings with trajectory information, pupil area and other information.
  • ori_events: contains information of task events.
  • ori_messages: log messages including ET calibration, validation and user-defined task messages sent from Psychopy program to the device.
ori_messages = ori_messages.rename(
    columns={
        # Normalize weird header names generated by pyedfread
        "message": "trialid",
        "trial": "trial",
        # Convert some BIDS columns
        "time": "timestamp",
    }
)

recording = ori_recording
messages = ori_messages
events = ori_events
print(f'\nThe entire info of `message`: \n{messages}')
recording.columns

2 Parsing the messages

  • Drop the duplicated codes.
    messages = messages.rename(
        columns={c: c.strip() for c in messages.columns.values}
    ).drop_duplicates()
    
  • Check the information about calibration of ET data.
# Extract calibration headers
_cal_hdr = ori_messages.trialid.str.startswith("!CAL")
calibration = ori_messages[_cal_hdr]
# messages = messages.drop(messages.index[_cal_hdr])
print(calibration)
  • Extracting the start time and stop time from metadata. If no information is extracted from metadata, we will keep them as None.

    # Extracting the StartTime and StopTime metadata.
    message_first_trigger = 'MODE RECORD'
    message_last_trigger = 'end'
    metadata = {
        'StopTime': None,
        'StartTime': None
    }
    
    # Find Start time
    start_rows = messages.trialid.str.contains(
        message_first_trigger, case=False, regex=True
    )
    stop_rows = messages.trialid.str.contains(
        message_last_trigger, case=False, regex=True
    )
    
    
    # Extract calibration headers
    _cal_hdr = messages.trialid.str.startswith("!CAL")
    calibration = messages[_cal_hdr]
    messages = messages.drop(messages.index[_cal_hdr])
    
    # Pick the LAST of the start messages
    metadata["StartTime"] = (
        int(messages[start_rows].timestamp.values[-1])
        if start_rows.any()
        else None
    )
    
    # Pick the FIRST of the stop messages
    metadata["StopTime"] = (
        int(messages[stop_rows].timestamp.values[0])
        if stop_rows.any()
        else None
    )
    
    # Drop start and stop messages from messages dataframe
    messages = messages.loc[~start_rows & ~stop_rows, :]
    

  • Extracting basic metadata

    # Extracting basic metadata.
    # !MODE RECORD CR 1000 2 0 R
    
    mode_record = messages.trialid.str.startswith("!MODE RECORD")
    
    meta_record = {
        "freq": DEFAULT_FREQUENCY,
        "mode": DEFAULT_MODE,
        "eye": DEFAULT_EYE,
    }
    
    if mode_record.any():
        try:
            meta_record = re.match(
                r"\!MODE RECORD (?P<mode>\w+) (?P<freq>\d+) \d \d (?P<eye>[RL]+)",
                messages[mode_record].trialid.iloc[-1].strip(),
            ).groupdict()
    
            meta_record["eye"] = EYE_CODE_MAP[meta_record["eye"]]
            meta_record["mode"] = (
                "P-CR" if meta_record["mode"] == "CR" else meta_record["mode"]
            )
        except AttributeError:
            warn(
                "Error extracting !MODE RECORD message, "
                "using default frequency, mode, and eye"
            )
        finally:
            messages = messages.loc[~mode_record]
    
    eye = (
        ("right", "left") if meta_record["eye"] == "both" else (meta_record["eye"],)
    )
    
    metadata["SamplingFrequency"] = int(meta_record["freq"])
    metadata["EyeTrackingMethod"] = meta_record["mode"]
    metadata["RecordedEye"] = meta_record["eye"]
    

  • Extracting screen parameters

    # Extracting screen parameters.
    # GAZE_COORDS 0.00 0.00 800.00 600.00
    
    # Extract GAZE_COORDS message signaling start of recording
    gaze_msg = messages.trialid.str.startswith("GAZE_COORDS")
    
    metadata["ScreenAOIDefinition"] = [
        "square",
        DEFAULT_SCREEN,
    ]
    if gaze_msg.any():
        try:
            gaze_record = re.match(
                r"GAZE_COORDS (\d+\.\d+) (\d+\.\d+) (\d+\.\d+) (\d+\.\d+)",
                messages[gaze_msg].trialid.iloc[-1].strip(),
            ).groups()
            metadata["ScreenAOIDefinition"][1] = [
                int(round(float(gaze_record[0]))),
                int(round(float(gaze_record[2]))),
                int(round(float(gaze_record[1]))),
                int(round(float(gaze_record[3]))),
            ]
        except AttributeError:
            warn("Error extracting GAZE_COORDS")
        finally:
            messages = messages.loc[~gaze_msg]
    
    print(metadata)
    

  • Extracting parameters of the pupil fit model.

# Extracting parameters of the pupil fit model.
# ELCL_PROC ELLIPSE (5)
# ELCL_EFIT_PARAMS 1.01 4.00  0.15 0.05  0.65 0.65  0.00 0.00 0.30
# Extract ELCL_PROC AND ELCL_EFIT_PARAMS to extract pupil fit method
pupilfit_msg = messages.trialid.str.startswith("ELCL_PROC")

if pupilfit_msg.any():
    try:
        pupilfit_method = [
            val
            for val in messages[pupilfit_msg]
            .trialid.iloc[-1]
            .strip()
            .split(" ")[1:]
            if val
        ]
        metadata["PupilFitMethod"] = pupilfit_method[0].lower()
        metadata["PupilFitMethodNumberOfParameters"] = int(
            pupilfit_method[1].strip("(").strip(")")
        )
    except AttributeError:
        warn("Error extracting ELCL_PROC (pupil fitting method)")
    finally:
        messages = messages.loc[~pupilfit_msg]

pupilfit_msg_params = messages.trialid.str.startswith("ELCL_EFIT_PARAMS")
if pupilfit_msg_params.any():
    rows = messages[pupilfit_msg_params]
    row = rows.trialid.values[-1].strip().split(" ")[1:]
    try:
        metadata["PupilFitParameters"] = [
            tuple(float(val) for val in vals)
            for k, vals in groupby(row, key=bool)
            if k
        ]
    except AttributeError:
        warn("Error extracting ELCL_EFIT_PARAMS (pupil fitting parameters)")
    finally:
        messages = messages.loc[~pupilfit_msg_params]     
  • Parsing validation messages
# Calibration validation.
# VALIDATE R 4POINT 4 RIGHT at 752,300 OFFSET 0.35 deg. -8.7,-3.8 pix.
# Extract VALIDATE messages for a calibration validation
validation_msg = messages.trialid.str.startswith("VALIDATE")

if validation_msg.any():
    metadata["ValidationPosition"] = []
    metadata["ValidationErrors"] = []

for i_row, validate_row in enumerate(messages[validation_msg].trialid.values):
    prefix, suffix = validate_row.split("OFFSET")
    validation_eye = (
        f"eye{eye.index('right') + 1}"
        if "RIGHT" in prefix
        else f"eye{eye.index('left') + 1}"
    )
    validation_coords = [
        int(val.strip())
        for val in prefix.rsplit("at", 1)[-1].split(",")
        if val.strip()
    ]
    metadata["ValidationPosition"].append(
        [validation_eye, validation_coords]
    )

    validate_values = [
        float(val)
        for val in re.match(
            r"(-?\d+\.\d+) deg\.\s+(-?\d+\.\d+),(-?\d+\.\d+) pix\.",
            suffix.strip(),
        ).groups()
    ]

    metadata["ValidationErrors"].append(
        (validation_eye, validate_values[0], tuple(validate_values[1:]))
    )
messages = messages.loc[~validation_msg]

print(messages)
print(metadata)
  • Extract final bits of metadata and THRESHOLDS messages prior to the recording.
    thresholds_msg = messages.trialid.str.startswith("THRESHOLDS")
    if thresholds_msg.any():
        metadata["PupilThreshold"] = [None] * len(eye)
        metadata["CornealReflectionThreshold"] = [None] * len(eye)
        thresholds_chunks = (
            messages[thresholds_msg].trialid.iloc[-1].strip().split(" ")[1:]
        )
        eye_index = eye.index(EYE_CODE_MAP[thresholds_chunks[0]])
        metadata["PupilThreshold"][eye_index] = int(thresholds_chunks[-2])
        metadata["CornealReflectionThreshold"][eye_index] = int(
            thresholds_chunks[-1]
        )
    messages = messages.loc[~thresholds_msg]
    print(messages)
    print(metadata)
    
  • Consume the remaining messages
    if not messages.empty:
        metadata["LoggedMessages"] = [
            (int(msg_timestamp), msg.strip())
            for msg_timestamp, msg in messages[["timestamp", "trialid"]].values
        ]
    
    print(messages)
    print(metadata)
    

3 Parsing the recording dataframe

recording = ori_recording
  • Curation of the input dataframe
# Normalize timestamps (should be int and strictly positive)
recording = recording.astype({"time": int})
recording = recording[recording["time"] > 0]
raw_recording_len = len(recording)
print(f'raw_recording length: {raw_recording_len}')

recording = recording.rename(
    columns={
#         # Fix buggy header names generated by pyedfread
#         "fhxyvel": "fhxvel",
#         "frxyvel": "frxvel",
        # Normalize weird header names generated by pyedfread
        "rx": "screen_ppdeg_x_coordinate",
        "ry": "screen_ppdeg_y_coordinate",
        # Convert some BIDS columns
        "time": "timestamp",
    }
)

# Split extra columns from the dataframe
extra = recording[["flags", "input", "htype"]]
recording = recording.drop(columns=["flags", "input", "htype"])
print(len(recording))

# Remove columns that are always very close to zero
recording = recording.loc[:, (recording.abs() > 1e-8).any(axis=0)]
# Remove columns that are always 1e8 or more
recording = recording.loc[:, (recording.abs() < 1e8).any(axis=0)]
# Replace unreasonably high values with NaNs
recording = recording.replace({1e8: np.nan})

assert len(recording) == raw_recording_len
  • Clean-up pupil size and gaze position
# These are the parameters we most likely we care for, so special curation is applied:
screen_resolution = [800, 600]

for eyenum, eyename in enumerate(eye):
    # Clean-up implausible values for pupil area (pa)
    recording.loc[
        recording[f"pa_{eyename}"] < 1, f"pa_{eyename}"
    ] = np.nan
    recording = recording.rename(
        columns={f"pa_{eyename}": f"eye{eyenum + 1}_pupil_size"}
    )
    print(f"pa_{eyename} renamed as: eye{eyenum + 1}_pupil_size")
    # Clean-up implausible values for gaze x position
    recording.loc[
        (recording[f"gx_{eyename}"] < 0)
        | (recording[f"gx_{eyename}"] > screen_resolution[0]),
        f"gx_{eyename}",
    ] = np.nan
    # Clean-up implausible values for gaze y position
    recording.loc[
        (recording[f"gy_{eyename}"] <= 0)
        | (recording[f"gy_{eyename}"] > screen_resolution[1]),
        f"gy_{eyename}",
    ] = np.nan

print(recording)
assert len(recording) == raw_recording_len
  • Shape the columns to comply with BIDS format.
# Munging columns to comply with BIDS. 
# At this point, the dataframe is almost ready for writing out as BIDS.
# Interpolate BIDS column names
columns = list(
    set(recording.columns)
    - set(
        (
            "timestamp",
            "screen_ppdeg_x_coordinate",
            "screen_ppdeg_y_coordinate",
            "eye1_pupil_size",#pa
            "eye2_pupil_size",#pa
        )
    )
)
bids_columns = []
for eyenum, eyename in enumerate(eye):
    for name in columns:
        colprefix = f"eye{eyenum + 1}" if name.endswith(f"_{eyename}") else ""
        _newname = name.split("_")[0]
        _newname = re.sub(r"([xy])$", r"_\1_coordinate", _newname)
        _newname = re.sub(r"([xy])vel$", r"_\1_velocity", _newname)
        _newname = _newname.split("_", 1)
        _newname[0] = EDF2BIDS_COLUMNS[_newname[0]]
        _newname.insert(0, colprefix)
        bids_columns.append("_".join((_n for _n in _newname if _n)))

# Rename columns to be BIDS-compliant
recording = recording.rename(columns=dict(zip(columns, bids_columns)))

# Reorder columns to render nicely (tracking first, pupil size after)
columns = sorted(
    set(recording.columns.values).intersection(BIDS_COLUMNS_ORDER),
    key=lambda entry: BIDS_COLUMNS_ORDER.index(entry),
)
columns += [c for c in recording.columns.values if c not in columns]
recording = recording.reindex(columns=columns)

print(recording)
assert len(recording) == raw_recording_len

4 Parsing the calibration messages

# Parse calibration metadata
metadata["CalibrationCount"] = 0
if not calibration.empty:
    warn("Calibration of more than one eye is not implemented")
    calibration.trialid = calibration.trialid.str.replace("!CAL", "")
    calibration.trialid = calibration.trialid.str.strip()

    metadata["CalibrationLog"] = list(
        zip(
            calibration.timestamp.values.astype(int),
            calibration.trialid.values,
        )
    )

    calibrations_msg = calibration.trialid.str.startswith(
        "VALIDATION"
    ) & calibration.trialid.str.contains("ERROR")
    metadata["CalibrationCount"] = calibrations_msg.sum()

    calibration_last = calibration.index[calibrations_msg][-1]
    try:
        meta_calib = re.match(
            r"VALIDATION (?P<ctype>[\w\d]+) (?P<eyeid>[RL]+) (?P<eye>RIGHT|LEFT) "
            r"(?P<result>\w+) ERROR (?P<avg>-?\d+\.\d+) avg\. (?P<max>-?\d+\.\d+) max\s+"
            r"OFFSET (?P<offsetdeg>-?\d+\.\d+) deg\. "
            r"(?P<offsetxpix>-?\d+\.\d+),(?P<offsetypix>-?\d+\.\d+) pix\.",
            calibration.loc[calibration_last, "trialid"].strip(),
        ).groupdict()

        metadata["CalibrationType"] = meta_calib["ctype"]
        metadata["AverageCalibrationError"] = [float(meta_calib["avg"])]
        metadata["MaximalCalibrationError"] = [float(meta_calib["max"])]
        metadata["CalibrationResultQuality"] = [meta_calib["result"]]
        metadata["CalibrationResultOffset"] = [
            float(meta_calib["offsetdeg"]),
            (float(meta_calib["offsetxpix"]), float(meta_calib["offsetypix"])),
        ]
        metadata["CalibrationResultOffsetUnits"] = ["deg", "pixels"]
    except AttributeError:
        warn("Calibration data found but unsuccessfully parsed for results")


print(calibration)

5 Parsing the events dataframe

There are three types of eye movements:

  • fixation
  • saccade
  • blinks

The mask for each event is recorded, with the mask value indicating whether an event occurred at a specific timestamp. A mask value of 1 at a given timestamp signifies that the event was detected at that moment.

# print(events)
print(recording)

# Process events: first generate empty columns
recording["eye1_fixation"] = 0
recording["eye1_saccade"] = 0
recording["eye1_blink"] = 0

# Add fixations
for _, fixation_event in events[
    events["type"] == "fixation"
].iterrows():
    recording.loc[
        (recording["timestamp"] >= fixation_event["start"])
        & (recording["timestamp"] <= fixation_event["end"]),
        "eye1_fixation",
    ] = 1

# Add saccades, and blinks, which are a sub-event of saccades
for _, saccade_event in events[
    events["type"] == "saccade"
].iterrows():
    recording.loc[
        (recording["timestamp"] >= saccade_event["start"])
        & (recording["timestamp"] <= saccade_event["end"]),
        "eye1_saccade",
    ] = 1

    if saccade_event["contains_blink"] == 1: #Note here some version is "blink", depends on the item name
        recording.loc[
            (recording["timestamp"] >= saccade_event["start"])
            & (recording["timestamp"] <= saccade_event["end"]),
            "eye1_blink",
        ] = 1

6 Write the data into BIDS structure

from copy import deepcopy

metadata['Columns'] = recording.columns.tolist()
print(metadata)
save_metadata = deepcopy(metadata)
# metadata.pop('CalibrationLog', None)
# print(metadata)
We need to convert the 'CalibrationCount' into int type before the conversion.

def convert_to_int(metadata):
    if 'CalibrationCount' in metadata:
        metadata['CalibrationCount'] = int(metadata['CalibrationCount']) if isinstance(metadata['CalibrationCount'], (np.int32, np.int64, int)) else metadata['CalibrationCount']
    if "CalibrationLog" in metadata:
        metadata["CalibrationLog"] = [(int(x[0]),x[1]) if isinstance(x[0], (np.int32, np.int64, int)) else x for x in metadata['CalibrationLog']]
    return metadata


convert_metadata = convert_to_int(metadata)

Write the dataframe into bids

out_dir = DATA_PATH
edf_extension = 'EDF'
edf_name = edf_name
filename = edf_name.split('.')[0]
print(f'bid filename: {filename}')

def write_bids_from_df(
    recording, metadata,
    out_dir,
    filename,
    # exp_run: str | Path,
) -> List[str]:
    """
    Directly save the eye-tracking recording/metadata into a  BIDS structure.

    Parameters
    ----------
    recording : dataframe
        The recording data extracted from the EDF file.
    metadata : dict
        The metadata extracted from the EDF file.
    out_dir : obj:`os.pathlike`
        The path of EDF file. Refers to the folder (not the EDF file).
    filename: str
        The filename of the EDF file. The file name without the suffix, eg: "Subject001"

    Returns
    -------
    List[str]
        A list of generated files.

    """

    out_json = out_dir / (filename + ".json")
    out_json.write_text(
        json.dumps(metadata, sort_keys=True, indent=2)
    )

    # Write out data
    out_tsvgz = out_dir / (filename + ".tsv.gz")

    recording.to_csv(
        out_tsvgz,
        sep="\t",
        index=True,
        header=True,
        compression="gzip",
        na_rep="n/a",
    )

    return str(out_tsvgz), str(out_json)


write_bids_from_df(
    recording, convert_metadata,
    out_dir,
    filename,
)

Now the BIDS files are generated: EDF Path

  • <filename>.json
  • <filename>.tsv.gz