EDF-to-BIDS conversion
Derived from edf_to_bids and convert.py Hsop: Converting eye-tracking into BIDS - Standard Operating Procedures of the HCPh project (axonlab.org) TheAxonLab/hcph-sops (github.com)
EyeLink eye tracking system produces EDF recording files. In this step we need to first convert the raw edf files to BIDS format for enforcing a standardized structure, naming convention and metadata description. It makes researchers easier to understand and use data from different sources.
0 Package preparation¶
-
To use pyEDFRead python package, we must properly install the open-source software EyeLink on our device/laptop in the very first step. You can find the instructions here.
-
Import the environments
from __future__ import annotations
from pathlib import Path
import pandas as pd
import numpy as np
from pyedfread import read_edf
from collections import defaultdict
from itertools import product, groupby
from warnings import warn
import re
1 Load raw ET data¶
- Use
pyedfread
package to open edf and
DATA_PATH = Path("/Path/to/EDF folder")
edf_name = f"file1.EDF"
file_path = str(DATA_PATH / edf_name)
print(file_path)
ori_recording, ori_events, ori_messages = read_edf(file_path)
ori_recording
: The ET recordings with trajectory information, pupil area and other information.ori_events
: contains information of task events.ori_messages
: log messages including ET calibration, validation and user-defined task messages sent from Psychopy program to the device.
ori_messages = ori_messages.rename(
columns={
# Normalize weird header names generated by pyedfread
"message": "trialid",
"trial": "trial",
# Convert some BIDS columns
"time": "timestamp",
}
)
recording = ori_recording
messages = ori_messages
events = ori_events
print(f'\nThe entire info of `message`: \n{messages}')
recording.columns
2 Parsing the messages¶
- Drop the duplicated codes.
- Check the information about calibration of ET data.
# Extract calibration headers
_cal_hdr = ori_messages.trialid.str.startswith("!CAL")
calibration = ori_messages[_cal_hdr]
# messages = messages.drop(messages.index[_cal_hdr])
print(calibration)
-
Extracting the start time and stop time from metadata. If no information is extracted from metadata, we will keep them as
None
.# Extracting the StartTime and StopTime metadata. message_first_trigger = 'MODE RECORD' message_last_trigger = 'end' metadata = { 'StopTime': None, 'StartTime': None } # Find Start time start_rows = messages.trialid.str.contains( message_first_trigger, case=False, regex=True ) stop_rows = messages.trialid.str.contains( message_last_trigger, case=False, regex=True ) # Extract calibration headers _cal_hdr = messages.trialid.str.startswith("!CAL") calibration = messages[_cal_hdr] messages = messages.drop(messages.index[_cal_hdr]) # Pick the LAST of the start messages metadata["StartTime"] = ( int(messages[start_rows].timestamp.values[-1]) if start_rows.any() else None ) # Pick the FIRST of the stop messages metadata["StopTime"] = ( int(messages[stop_rows].timestamp.values[0]) if stop_rows.any() else None ) # Drop start and stop messages from messages dataframe messages = messages.loc[~start_rows & ~stop_rows, :]
-
Extracting basic metadata
# Extracting basic metadata. # !MODE RECORD CR 1000 2 0 R mode_record = messages.trialid.str.startswith("!MODE RECORD") meta_record = { "freq": DEFAULT_FREQUENCY, "mode": DEFAULT_MODE, "eye": DEFAULT_EYE, } if mode_record.any(): try: meta_record = re.match( r"\!MODE RECORD (?P<mode>\w+) (?P<freq>\d+) \d \d (?P<eye>[RL]+)", messages[mode_record].trialid.iloc[-1].strip(), ).groupdict() meta_record["eye"] = EYE_CODE_MAP[meta_record["eye"]] meta_record["mode"] = ( "P-CR" if meta_record["mode"] == "CR" else meta_record["mode"] ) except AttributeError: warn( "Error extracting !MODE RECORD message, " "using default frequency, mode, and eye" ) finally: messages = messages.loc[~mode_record] eye = ( ("right", "left") if meta_record["eye"] == "both" else (meta_record["eye"],) ) metadata["SamplingFrequency"] = int(meta_record["freq"]) metadata["EyeTrackingMethod"] = meta_record["mode"] metadata["RecordedEye"] = meta_record["eye"]
-
Extracting screen parameters
# Extracting screen parameters. # GAZE_COORDS 0.00 0.00 800.00 600.00 # Extract GAZE_COORDS message signaling start of recording gaze_msg = messages.trialid.str.startswith("GAZE_COORDS") metadata["ScreenAOIDefinition"] = [ "square", DEFAULT_SCREEN, ] if gaze_msg.any(): try: gaze_record = re.match( r"GAZE_COORDS (\d+\.\d+) (\d+\.\d+) (\d+\.\d+) (\d+\.\d+)", messages[gaze_msg].trialid.iloc[-1].strip(), ).groups() metadata["ScreenAOIDefinition"][1] = [ int(round(float(gaze_record[0]))), int(round(float(gaze_record[2]))), int(round(float(gaze_record[1]))), int(round(float(gaze_record[3]))), ] except AttributeError: warn("Error extracting GAZE_COORDS") finally: messages = messages.loc[~gaze_msg] print(metadata)
-
Extracting parameters of the pupil fit model.
# Extracting parameters of the pupil fit model.
# ELCL_PROC ELLIPSE (5)
# ELCL_EFIT_PARAMS 1.01 4.00 0.15 0.05 0.65 0.65 0.00 0.00 0.30
# Extract ELCL_PROC AND ELCL_EFIT_PARAMS to extract pupil fit method
pupilfit_msg = messages.trialid.str.startswith("ELCL_PROC")
if pupilfit_msg.any():
try:
pupilfit_method = [
val
for val in messages[pupilfit_msg]
.trialid.iloc[-1]
.strip()
.split(" ")[1:]
if val
]
metadata["PupilFitMethod"] = pupilfit_method[0].lower()
metadata["PupilFitMethodNumberOfParameters"] = int(
pupilfit_method[1].strip("(").strip(")")
)
except AttributeError:
warn("Error extracting ELCL_PROC (pupil fitting method)")
finally:
messages = messages.loc[~pupilfit_msg]
pupilfit_msg_params = messages.trialid.str.startswith("ELCL_EFIT_PARAMS")
if pupilfit_msg_params.any():
rows = messages[pupilfit_msg_params]
row = rows.trialid.values[-1].strip().split(" ")[1:]
try:
metadata["PupilFitParameters"] = [
tuple(float(val) for val in vals)
for k, vals in groupby(row, key=bool)
if k
]
except AttributeError:
warn("Error extracting ELCL_EFIT_PARAMS (pupil fitting parameters)")
finally:
messages = messages.loc[~pupilfit_msg_params]
- Parsing validation messages
# Calibration validation.
# VALIDATE R 4POINT 4 RIGHT at 752,300 OFFSET 0.35 deg. -8.7,-3.8 pix.
# Extract VALIDATE messages for a calibration validation
validation_msg = messages.trialid.str.startswith("VALIDATE")
if validation_msg.any():
metadata["ValidationPosition"] = []
metadata["ValidationErrors"] = []
for i_row, validate_row in enumerate(messages[validation_msg].trialid.values):
prefix, suffix = validate_row.split("OFFSET")
validation_eye = (
f"eye{eye.index('right') + 1}"
if "RIGHT" in prefix
else f"eye{eye.index('left') + 1}"
)
validation_coords = [
int(val.strip())
for val in prefix.rsplit("at", 1)[-1].split(",")
if val.strip()
]
metadata["ValidationPosition"].append(
[validation_eye, validation_coords]
)
validate_values = [
float(val)
for val in re.match(
r"(-?\d+\.\d+) deg\.\s+(-?\d+\.\d+),(-?\d+\.\d+) pix\.",
suffix.strip(),
).groups()
]
metadata["ValidationErrors"].append(
(validation_eye, validate_values[0], tuple(validate_values[1:]))
)
messages = messages.loc[~validation_msg]
print(messages)
print(metadata)
- Extract final bits of metadata and THRESHOLDS messages prior to the recording.
thresholds_msg = messages.trialid.str.startswith("THRESHOLDS") if thresholds_msg.any(): metadata["PupilThreshold"] = [None] * len(eye) metadata["CornealReflectionThreshold"] = [None] * len(eye) thresholds_chunks = ( messages[thresholds_msg].trialid.iloc[-1].strip().split(" ")[1:] ) eye_index = eye.index(EYE_CODE_MAP[thresholds_chunks[0]]) metadata["PupilThreshold"][eye_index] = int(thresholds_chunks[-2]) metadata["CornealReflectionThreshold"][eye_index] = int( thresholds_chunks[-1] ) messages = messages.loc[~thresholds_msg] print(messages) print(metadata)
- Consume the remaining messages
3 Parsing the recording dataframe¶
- Curation of the input dataframe
# Normalize timestamps (should be int and strictly positive)
recording = recording.astype({"time": int})
recording = recording[recording["time"] > 0]
raw_recording_len = len(recording)
print(f'raw_recording length: {raw_recording_len}')
recording = recording.rename(
columns={
# # Fix buggy header names generated by pyedfread
# "fhxyvel": "fhxvel",
# "frxyvel": "frxvel",
# Normalize weird header names generated by pyedfread
"rx": "screen_ppdeg_x_coordinate",
"ry": "screen_ppdeg_y_coordinate",
# Convert some BIDS columns
"time": "timestamp",
}
)
# Split extra columns from the dataframe
extra = recording[["flags", "input", "htype"]]
recording = recording.drop(columns=["flags", "input", "htype"])
print(len(recording))
# Remove columns that are always very close to zero
recording = recording.loc[:, (recording.abs() > 1e-8).any(axis=0)]
# Remove columns that are always 1e8 or more
recording = recording.loc[:, (recording.abs() < 1e8).any(axis=0)]
# Replace unreasonably high values with NaNs
recording = recording.replace({1e8: np.nan})
assert len(recording) == raw_recording_len
- Clean-up pupil size and gaze position
# These are the parameters we most likely we care for, so special curation is applied:
screen_resolution = [800, 600]
for eyenum, eyename in enumerate(eye):
# Clean-up implausible values for pupil area (pa)
recording.loc[
recording[f"pa_{eyename}"] < 1, f"pa_{eyename}"
] = np.nan
recording = recording.rename(
columns={f"pa_{eyename}": f"eye{eyenum + 1}_pupil_size"}
)
print(f"pa_{eyename} renamed as: eye{eyenum + 1}_pupil_size")
# Clean-up implausible values for gaze x position
recording.loc[
(recording[f"gx_{eyename}"] < 0)
| (recording[f"gx_{eyename}"] > screen_resolution[0]),
f"gx_{eyename}",
] = np.nan
# Clean-up implausible values for gaze y position
recording.loc[
(recording[f"gy_{eyename}"] <= 0)
| (recording[f"gy_{eyename}"] > screen_resolution[1]),
f"gy_{eyename}",
] = np.nan
print(recording)
assert len(recording) == raw_recording_len
- Shape the columns to comply with BIDS format.
# Munging columns to comply with BIDS.
# At this point, the dataframe is almost ready for writing out as BIDS.
# Interpolate BIDS column names
columns = list(
set(recording.columns)
- set(
(
"timestamp",
"screen_ppdeg_x_coordinate",
"screen_ppdeg_y_coordinate",
"eye1_pupil_size",#pa
"eye2_pupil_size",#pa
)
)
)
bids_columns = []
for eyenum, eyename in enumerate(eye):
for name in columns:
colprefix = f"eye{eyenum + 1}" if name.endswith(f"_{eyename}") else ""
_newname = name.split("_")[0]
_newname = re.sub(r"([xy])$", r"_\1_coordinate", _newname)
_newname = re.sub(r"([xy])vel$", r"_\1_velocity", _newname)
_newname = _newname.split("_", 1)
_newname[0] = EDF2BIDS_COLUMNS[_newname[0]]
_newname.insert(0, colprefix)
bids_columns.append("_".join((_n for _n in _newname if _n)))
# Rename columns to be BIDS-compliant
recording = recording.rename(columns=dict(zip(columns, bids_columns)))
# Reorder columns to render nicely (tracking first, pupil size after)
columns = sorted(
set(recording.columns.values).intersection(BIDS_COLUMNS_ORDER),
key=lambda entry: BIDS_COLUMNS_ORDER.index(entry),
)
columns += [c for c in recording.columns.values if c not in columns]
recording = recording.reindex(columns=columns)
print(recording)
assert len(recording) == raw_recording_len
4 Parsing the calibration messages¶
# Parse calibration metadata
metadata["CalibrationCount"] = 0
if not calibration.empty:
warn("Calibration of more than one eye is not implemented")
calibration.trialid = calibration.trialid.str.replace("!CAL", "")
calibration.trialid = calibration.trialid.str.strip()
metadata["CalibrationLog"] = list(
zip(
calibration.timestamp.values.astype(int),
calibration.trialid.values,
)
)
calibrations_msg = calibration.trialid.str.startswith(
"VALIDATION"
) & calibration.trialid.str.contains("ERROR")
metadata["CalibrationCount"] = calibrations_msg.sum()
calibration_last = calibration.index[calibrations_msg][-1]
try:
meta_calib = re.match(
r"VALIDATION (?P<ctype>[\w\d]+) (?P<eyeid>[RL]+) (?P<eye>RIGHT|LEFT) "
r"(?P<result>\w+) ERROR (?P<avg>-?\d+\.\d+) avg\. (?P<max>-?\d+\.\d+) max\s+"
r"OFFSET (?P<offsetdeg>-?\d+\.\d+) deg\. "
r"(?P<offsetxpix>-?\d+\.\d+),(?P<offsetypix>-?\d+\.\d+) pix\.",
calibration.loc[calibration_last, "trialid"].strip(),
).groupdict()
metadata["CalibrationType"] = meta_calib["ctype"]
metadata["AverageCalibrationError"] = [float(meta_calib["avg"])]
metadata["MaximalCalibrationError"] = [float(meta_calib["max"])]
metadata["CalibrationResultQuality"] = [meta_calib["result"]]
metadata["CalibrationResultOffset"] = [
float(meta_calib["offsetdeg"]),
(float(meta_calib["offsetxpix"]), float(meta_calib["offsetypix"])),
]
metadata["CalibrationResultOffsetUnits"] = ["deg", "pixels"]
except AttributeError:
warn("Calibration data found but unsuccessfully parsed for results")
print(calibration)
5 Parsing the events dataframe¶
There are three types of eye movements:
- fixation
- saccade
- blinks
The mask for each event is recorded, with the mask value indicating whether an event occurred at a specific timestamp. A mask value of 1 at a given timestamp signifies that the event was detected at that moment.
# print(events)
print(recording)
# Process events: first generate empty columns
recording["eye1_fixation"] = 0
recording["eye1_saccade"] = 0
recording["eye1_blink"] = 0
# Add fixations
for _, fixation_event in events[
events["type"] == "fixation"
].iterrows():
recording.loc[
(recording["timestamp"] >= fixation_event["start"])
& (recording["timestamp"] <= fixation_event["end"]),
"eye1_fixation",
] = 1
# Add saccades, and blinks, which are a sub-event of saccades
for _, saccade_event in events[
events["type"] == "saccade"
].iterrows():
recording.loc[
(recording["timestamp"] >= saccade_event["start"])
& (recording["timestamp"] <= saccade_event["end"]),
"eye1_saccade",
] = 1
if saccade_event["contains_blink"] == 1: #Note here some version is "blink", depends on the item name
recording.loc[
(recording["timestamp"] >= saccade_event["start"])
& (recording["timestamp"] <= saccade_event["end"]),
"eye1_blink",
] = 1
6 Write the data into BIDS structure¶
from copy import deepcopy
metadata['Columns'] = recording.columns.tolist()
print(metadata)
save_metadata = deepcopy(metadata)
# metadata.pop('CalibrationLog', None)
# print(metadata)
int
type before the conversion.
def convert_to_int(metadata):
if 'CalibrationCount' in metadata:
metadata['CalibrationCount'] = int(metadata['CalibrationCount']) if isinstance(metadata['CalibrationCount'], (np.int32, np.int64, int)) else metadata['CalibrationCount']
if "CalibrationLog" in metadata:
metadata["CalibrationLog"] = [(int(x[0]),x[1]) if isinstance(x[0], (np.int32, np.int64, int)) else x for x in metadata['CalibrationLog']]
return metadata
convert_metadata = convert_to_int(metadata)
Write the dataframe
into bids
out_dir = DATA_PATH
edf_extension = 'EDF'
edf_name = edf_name
filename = edf_name.split('.')[0]
print(f'bid filename: {filename}')
def write_bids_from_df(
recording, metadata,
out_dir,
filename,
# exp_run: str | Path,
) -> List[str]:
"""
Directly save the eye-tracking recording/metadata into a BIDS structure.
Parameters
----------
recording : dataframe
The recording data extracted from the EDF file.
metadata : dict
The metadata extracted from the EDF file.
out_dir : obj:`os.pathlike`
The path of EDF file. Refers to the folder (not the EDF file).
filename: str
The filename of the EDF file. The file name without the suffix, eg: "Subject001"
Returns
-------
List[str]
A list of generated files.
"""
out_json = out_dir / (filename + ".json")
out_json.write_text(
json.dumps(metadata, sort_keys=True, indent=2)
)
# Write out data
out_tsvgz = out_dir / (filename + ".tsv.gz")
recording.to_csv(
out_tsvgz,
sep="\t",
index=True,
header=True,
compression="gzip",
na_rep="n/a",
)
return str(out_tsvgz), str(out_json)
write_bids_from_df(
recording, convert_metadata,
out_dir,
filename,
)
Now the BIDS files are generated: EDF Path
- <filename>.json
- <filename>.tsv.gz