# copyright ############################### #
# This file is part of the Xboinc Package. #
# Copyright (c) CERN, 2025. #
# ######################################### #
import json
from warnings import warn
import pandas as pd
from tqdm.auto import tqdm
from xaux import FsPath, eos_accessible
from .df_wu import query_work_units_by_user
from .server.tools import untar
from .simulation_io import XbState, assert_versions
from .user import get_directory, get_domain
[docs]
class ResultRetriever:
"""
Class to retrieve and manage results from Xboinc simulations.
This class provides functionality to retrieve, index, and manage simulation
results from BOINC work units. It can untar result files, create indexes,
and provide various views and statistics about completed jobs.
Attributes
----------
_user : str
The user that submitted the BOINC jobs
_domain : str
The domain where results are stored (e.g., 'eos')
_directory : FsPath
Path to the directory containing results
_dev_server : bool
Whether using development server
_df : pd.DataFrame
Indexed DataFrame of all available results
Examples
--------
>>> retriever = ResultRetriever('myuser', dev_server=True)
>>> studies = retriever.get_study_list()
>>> for job_name, particles in retriever.iterate_results('my_study'):
... # Process particles data
... pass
"""
[docs]
def _untar_results(self, path: FsPath, silent: bool = False):
"""
Untar all compressed result files in the given path.
Parameters
----------
path : FsPath
Directory path containing .tar.gz files to extract
silent : bool, optional
If True, suppress progress bar output (default: False)
"""
tar_list = list(path.glob("*.tar.gz"))
if len(tar_list) != 0:
for tar_file in tqdm(
tar_list, desc="Untarring results", disable=silent
):
untar(tar_file)
[docs]
def _index_results(self, path: FsPath, silent: bool = False) -> pd.DataFrame:
"""
Index all result files in the given path and create a DataFrame.
Scans for .bin files in subdirectories and extracts metadata from
filenames to create a structured index of available results.
Parameters
----------
path : FsPath
Directory path to scan for result files
silent : bool, optional
If True, suppress progress bar output (default: False)
Returns
-------
pd.DataFrame
DataFrame with columns: user, study_name, job_name, wu_name, bin_file
Each row represents one available result file
"""
df = pd.DataFrame(
columns=["user", "study_name", "job_name", "wu_name", "bin_file"]
)
# iterate all folders in the path
for folder in tqdm(list(path.glob("*/")), desc="Indexing results in folders", disable=silent):
if not folder.is_dir():
continue
# iterate all result bin files in the folder
for bin_file in folder.glob("*.bin"):
# extract user, study_name, and job_name from the bin file name
parts = bin_file.name.split("__")
if len(parts) < 3:
continue
user = parts[0]
study_name = parts[1]
job_name = parts[2]
wu_name = bin_file.name.replace(".bin", "")
# append to the DataFrame
new_row = pd.DataFrame([{
"user": user,
"study_name": study_name,
"job_name": job_name,
"wu_name": wu_name,
"bin_file": bin_file,
}])
df = pd.concat([df, new_row], ignore_index=True)
return df
def __init__(self, user, dev_server=False, silent=False):
"""
Initialize the ResultRetriever for a specific user.
Parameters
----------
user : str
The user that submitted to BOINC. User should be member of the
CERN xboinc-submitters egroup with proper permissions.
dev_server : bool, optional
Whether to retrieve from the development server (default: False)
silent : bool, optional
Whether to suppress output messages and progress bars (default: False)
Raises
------
NotImplementedError
If dev_server=False (regular server not yet operational)
ConnectionError
If EOS is not accessible when domain is 'eos'
Examples
--------
>>> retriever = ResultRetriever('myuser', dev_server=True, silent=True)
>>> overview = retriever.get_overview()
"""
if not dev_server:
raise NotImplementedError(
"Regular server not yet operational. " + "Please use dev_server=True."
)
assert_versions()
[docs]
self._domain = get_domain(user)
if self._domain == "eos":
if not eos_accessible:
raise ConnectionError(
"EOS is not accessible! Please check your connection."
)
if dev_server:
self._directory = get_directory(user) / "output_dev"
else:
self._directory = get_directory(user) / "output"
[docs]
self._dev_server = dev_server
self._untar_results(self._directory, silent=silent)
[docs]
self._df = self._index_results(self._directory, silent=silent)
[docs]
def get_overview(self):
"""
Get a comprehensive overview of all available results.
Returns
-------
pd.DataFrame
DataFrame containing all indexed results with columns:
user, study_name, job_name, wu_name, bin_file
"""
return self._df
[docs]
def get_study_list(self):
"""
Get a list of all unique study names in the available results.
Returns
-------
list of str
Sorted list of unique study names found in the results
"""
return self._df["study_name"].unique().tolist()
[docs]
def get_study_status(self, study_name, verbose=False):
"""
Get detailed status information for a specific study.
Compares local results with server work units to provide comprehensive
status information including completion rates and missing jobs.
Parameters
----------
study_name : str
Name of the study to check status for
verbose : bool, optional
If True, print detailed job lists (default: False)
Returns
-------
tuple of (list, set)
- list: Job names available in results
- set: Job names missing from results but present on server
Raises
------
ValueError
If study_name is not found in results or server work units
Warnings
--------
UserWarning
If there are mismatches between local results and server status
"""
# Is the study name valid?
if study_name not in self._df["study_name"].unique():
raise ValueError(f"Study name {study_name} not found in results.")
# Get server dataframe
server_df = query_work_units_by_user(self._user, dev_server=self._dev_server)
if study_name not in server_df["study_name"].unique():
raise ValueError(f"Study name {study_name} not found in server work units.")
result_job_names = self._df[self._df["study_name"] == study_name][
"job_name"
].to_list()
result_job_names.sort()
remote_job_names = server_df[server_df["study_name"] == study_name][
"job_name"
].to_list()
remote_job_names.sort()
completed_remote_job_names = server_df[
(server_df["study_name"] == study_name)
& (server_df["status"] == "completed")
]["job_name"].to_list()
completed_remote_job_names.sort()
# Check 1: result_job_names should be equal to completed_remote_job_names
if result_job_names != completed_remote_job_names:
warn(
f"Warning: The job names in the results ({result_job_names}) "
f"do not match the completed job names on the server!"
"This might indicate that some results were deleted or not retrieved correctly."
"Please contact the Xboinc team if you think this is an error.",
UserWarning,
)
# Check 2: remote_job_names should contain all job names in the results
if not all(job in remote_job_names for job in result_job_names):
warn(
f"Warning: The job names in the results ({result_job_names}) "
f"are not all present in the remote server job names ({remote_job_names}). "
"This might indicate that some jobs were not submitted correctly or were deleted.",
UserWarning,
)
diff_jobs = set(remote_job_names) - set(result_job_names)
err_jobs = set(result_job_names) - diff_jobs
# Print statistics
print(f"Study: {study_name}")
print(f"Total jobs in results: {len(result_job_names)}")
print(f"Total jobs on server: {len(remote_job_names)}")
print(
f"Percentage of jobs completed: {(len(completed_remote_job_names) / len(result_job_names)) * 100:.2f}%"
)
if verbose:
print("Results available by job name:")
for job_name in result_job_names:
print(f"{job_name}", end=", ")
print("\n")
if diff_jobs:
print("Results not available by job name:")
for job_name in diff_jobs:
print(f"{job_name}", end=", ")
print("\n")
if err_jobs:
print(
"WARNING: The following jobs do not match the remote server job names:"
)
for job_name in err_jobs:
print(f"{job_name}", end=", ")
print("\n")
return result_job_names, diff_jobs
[docs]
def iterate_results(self, study_name):
"""
Iterate over all results for a specific study.
Yields tuples of job names and their corresponding particle data
for all completed jobs in the specified study.
Parameters
----------
study_name : str
Name of the study to iterate over
Yields
------
tuple of (str, xpart.Particles)
Job name and corresponding particles object for each result
Raises
------
ValueError
If study_name is not found in available results
Warnings
--------
UserWarning
If a binary file is incompatible with current Xboinc version
Examples
--------
>>> retriever = ResultRetriever('myuser', dev_server=True)
>>> for job_name, particles in retriever.iterate_results('my_study'):
... print(f"Processing job: {job_name}")
... print(f"Number of particles: {len(particles.x)}")
"""
if study_name not in self._df["study_name"].unique():
raise ValueError(f"Study name {study_name} not found in results.")
for row in self._df[self._df["study_name"] == study_name].itertuples():
job_name = row.job_name
bin_file = row.bin_file
result = XbState.from_binary(bin_file, raise_version_error=False)
if result is None:
warn(
f"Warning: The binary file {bin_file} is not compatible with the current Xboinc version. "
"Skipping this result.",
UserWarning,
)
continue
yield job_name, result.particles
[docs]
def clean(self, study_name):
"""
Clean up results for a specific study.
Removes all binary result files, empty directories, and clears
the study from the internal DataFrame index.
Parameters
----------
study_name : str
Name of the study to clean up
Raises
------
ValueError
If study_name is not found in available results
Warning
-------
This operation is irreversible. All result files for the study
will be permanently deleted.
"""
if study_name not in self._df["study_name"].unique():
raise ValueError(f"Study name {study_name} not found in results.")
for row in self._df[self._df["study_name"] == study_name].itertuples():
bin_file = row.bin_file
if bin_file.exists():
bin_file.unlink()
# Remove empty directories
for folder in self._directory.glob("*/"):
if not any(folder.iterdir()):
folder.rmdir()
# Clear the DataFrame
self._df = self._df[self._df["study_name"] != study_name]
print(f"Cleaned up results for study {study_name}!")
@classmethod
[docs]
def iterate(cls, user, study_name, dev_server=False, silent=False):
"""
Class method to directly iterate over results for a user and study.
Convenient method that creates a ResultRetriever instance and immediately
starts iterating over results without requiring explicit instantiation.
Parameters
----------
user : str
The user that submitted the BOINC jobs
study_name : str
Name of the study to iterate over
dev_server : bool, optional
Whether to use development server (default: False)
silent : bool, optional
Whether to suppress output messages (default: True)
Yields
------
tuple of (str, xpart.Particles)
Job name and corresponding particles object for each result
Examples
--------
>>> for job_name, particles in ResultRetriever.iterate('myuser', 'my_study', dev_server=True):
... # Process particles data
... pass
"""
instance = cls(user, dev_server=dev_server, silent=silent)
return instance.iterate_results(study_name)
@classmethod
[docs]
def overview(cls, user, dev_server=False, silent=False):
"""
Class method to get an overview of results for a specific user.
Parameters
----------
user : str
The user that submitted the BOINC jobs
dev_server : bool, optional
Whether to use development server (default: False)
silent : bool, optional
Whether to suppress output messages (default: True)
Returns
-------
pd.DataFrame
DataFrame with overview of all available results
Examples
--------
>>> overview_df = ResultRetriever.overview('myuser', dev_server=True)
>>> print(overview_df.groupby('study_name').size())
"""
instance = cls(user, dev_server=dev_server, silent=silent)
return instance.get_overview()
@classmethod
[docs]
def status(cls, user, study_name, dev_server=False, silent=False, verbose=False):
"""
Class method to get status of results for a specific user and study.
Parameters
----------
user : str
The user that submitted the BOINC jobs
study_name : str
Name of the study to check status for
dev_server : bool, optional
Whether to use development server (default: False)
silent : bool, optional
Whether to suppress output messages (default: True)
verbose : bool, optional
If True, print detailed job lists (default: False)
Returns
-------
tuple of (list, set)
- list: Job names available in results
- set: Job names missing from results but present on server
Examples
--------
>>> available, missing = ResultRetriever.status('myuser', 'my_study', dev_server=True)
>>> print(f"Available jobs: {len(available)}, Missing jobs: {len(missing)}")
"""
instance = cls(user, dev_server=dev_server, silent=silent)
return instance.get_study_status(study_name=study_name, verbose=verbose)
@classmethod
[docs]
def study_list(cls, user, dev_server=False, silent=False):
"""
Class method to get a list of all studies for a specific user.
Parameters
----------
user : str
The user that submitted the BOINC jobs
dev_server : bool, optional
Whether to use development server (default: False)
silent : bool, optional
Whether to suppress output messages (default: True)
Returns
-------
list of str
Sorted list of unique study names found in the results
Examples
--------
>>> studies = ResultRetriever.study_list('myuser', dev_server=True)
>>> print(studies)
"""
instance = cls(user, dev_server=dev_server, silent=silent)
return instance.get_study_list()