Skip to content
This repository was archived by the owner on Jul 9, 2025. It is now read-only.
12 changes: 7 additions & 5 deletions core/controller/relval_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def get_task_dict(self, relval, step, step_index):
"Getting step %s dict for %s", step_index, relval.get_prepid()
)
task_dict = {}
# If it's firtst step and not input file - it is generator
# If it's first step and not input file - it is generator
# set Seeding to AutomaticSeeding, RequestNumEvets, EventsPerJob and EventsPerLumi
# It expects --relval attribute
if step_index == 0:
Expand All @@ -264,6 +264,9 @@ def get_task_dict(self, relval, step, step_index):
task_dict["EventsPerLumi"] = int(events_per_job)
else:
input_step = relval.get("steps")[step.get_input_step_index()]

task_dict["SplittingAlgo"] = "LumiBased"

if input_step.get_step_type() == "input_file":
input_dict = input_step.get("input")
# Input file step is not a task
Expand All @@ -273,6 +276,7 @@ def get_task_dict(self, relval, step, step_index):
task_dict["LumiList"] = input_dict["lumisection"]
elif input_dict["run"]:
task_dict["RunWhitelist"] = input_dict["run"]

else:
task_dict["InputTask"] = input_step.get_short_name()
_, input_module = step.get_input_eventcontent(input_step)
Expand All @@ -281,8 +285,6 @@ def get_task_dict(self, relval, step, step_index):
if step.get("lumis_per_job") != "":
task_dict["LumisPerJob"] = int(step.get("lumis_per_job"))

task_dict["SplittingAlgo"] = "LumiBased"

task_dict["TaskName"] = step.get_short_name()
task_dict["ConfigCacheID"] = step.get("config_id")
task_dict["KeepOutput"] = step.get("keep_output")
Expand Down Expand Up @@ -328,15 +330,15 @@ def get_request_priority(self, relval: RelVal) -> int:
priority
)
return priority

if "HighPrio" in campaign:
priority = 500000
self.logger.info(
"Setting `RequestPriority` to %s because it includes the placeholder `HighPrio` in the campaign name",
priority
)
return priority

return 450000

def get_job_dict(self, relval):
Expand Down
17 changes: 17 additions & 0 deletions core/controller/ticket_controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Module that contains TicketController class
"""

import json
from copy import deepcopy
from environment import (
Expand Down Expand Up @@ -365,12 +366,28 @@ def generate_workflows(self, ticket, ssh_executor):
"core/utils/run_the_matrix_pdmv.py",
f"{remote_directory}/run_the_matrix_pdmv.py",
)

ssh_executor.upload_file(
"core/utils/dqm.py",
f"{remote_directory}/dqm.py",
)
ssh_executor.upload_file(
"core/utils/das.py",
f"{remote_directory}/das.py",
)
command = [
f"cd {remote_directory}",
"voms-proxy-init -voms cms --valid 4:00 --out $(pwd)/proxy.txt",
]
ssh_executor.execute_command(command)

# Defined a name for output file
file_name = f"{ticket_prepid}.json"
# Execute run_the_matrix_pdmv.py
matrix_command = run_commands_in_cmsenv(
[
f"cd {remote_directory}",
"export X509_USER_PROXY=$(pwd)/proxy.txt",
"$PYTHON_INT run_the_matrix_pdmv.py "
f"-l={workflow_ids} "
f"-w={matrix} "
Expand Down
26 changes: 4 additions & 22 deletions core/model/relval_step.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""
Module that contains RelValStep class
"""
import weakref
import json
import weakref
from copy import deepcopy

from core.model.model_base import ModelBase
from core_lib.utils.common_utils import get_scram_arch

Expand Down Expand Up @@ -111,14 +112,14 @@ def __init__(self, json_input=None, parent=None, check_attributes=True):
json_input['gpu'] = schema.get('gpu')
json_input['gpu']['requires'] = 'forbidden'
step_input = json_input['input']

for key, default_value in schema['input'].items():
if key not in step_input:
step_input[key] = default_value
else:
json_input['driver'] = {k.lstrip('-'): v for k, v in json_input['driver'].items()}
json_input['input'] = schema.get('input')

if json_input.get('gpu', {}).get('requires') not in ('optional', 'required'):
json_input['gpu'] = schema.get('gpu')
json_input['gpu']['requires'] = 'forbidden'
Expand Down Expand Up @@ -283,25 +284,6 @@ def __build_das_command(self, step_index):
command += f'>> {files_name}\n'
return (comment + '\n' + command).strip()

events = input_dict['events']

## N.B. das-up-to-nevents.py exists only from 14_1_0_pre7
cmssw_components = lambda x: x.strip().split("_")
cmssw_release = cmssw_components(self.get_release())
check_das_up_to_nevents = cmssw_release >= cmssw_components("14_1_0_pre7")

if events > 0 and check_das_up_to_nevents:
self.logger.info('Making a DAS command for step %s with max events', step_index)
files_name = f'step{step_index + 1}_files.txt'
comment = f'# Arguments for step {step_index + 1}:\n'
command = f'# Command for step {step_index + 1}:\n'
comment += f'# dataset: {dataset}\n'
comment += f'# events : {events}\n'
command += f'echo "" > {files_name}\n'
command += f'das-up-to-nevents.py -d {dataset} -e {events} '
command += f'>> {files_name}\n'
return (comment + '\n' + command).strip()

return f'# Step {step_index + 1} is input dataset for next step: {dataset}'

def get_command(self, custom_fragment=None, for_submission=False):
Expand Down
214 changes: 214 additions & 0 deletions core/utils/das.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#!/usr/bin/env python3
import itertools
import subprocess

import numpy as np # pylint: disable=import-error
import pandas as pd # pylint: disable=import-error


def get_lumi_ranges(i):
"""
An helper to transform a list of lumisections into a list of lists (ranges).
It groups contigous elements in a single rangel-like list.

Args:
i: a list of ints.

Returns:
list[list[int]]: a single rangel-like list.
"""
result = []
for _, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]):
b = list(b)
result.append([b[0][1], b[-1][1]])
return result


def das_do_command(query):
"""
A simple wrapper for dasgoclient.

Args:
query: a dasgoclient query.

Returns:
list[str]: the dasgoclient command output split by newlines.

"""
cmd = 'dasgoclient --query="%s"' % (query)
out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode(
"utf8"
)
return out.split("\n")


def das_file_data(dataset):
"""
Given a dataset create a pandas DataFrame with the
list of file names and number of events per file.

Args:
dataset: the dataset name '/PD/GTString/DATA-TIER'

Returns:
A pandas DataFrame having for each row a single file and as columns:
- the file name;
- the number of events in each file.
"""
query = "file dataset=%s | grep file.name, file.nevents" % (dataset)
out = das_do_command(query)
out = [np.array(r.split(" "))[[0, 3]] for r in out if len(r) > 0]

df = pd.DataFrame(out, columns=["file", "events"])
df.events = df.events.values.astype(int)

return df


def das_lumi_data(dataset):
"""
Produces a file by file+lumi+run pandas DataFrame

Args:
dataset: the dataset name '/PD/GTString/DATA-TIER'

Returns:
A pandas DataFrame having for each row a single file and as columns:
- the file name;
- the lumisections.

"""
query = "file,lumi,run dataset=%s " % (dataset)

out = das_do_command(query)
out = [r.split(" ") for r in out if len(r) > 0]

df = pd.DataFrame(out, columns=["file", "run", "lumis"])

return df


def get_events_df(golden, dataset, events):
"""
Produces a file by file pandas DataFrame

Args:
golden: a run by run certification json
dataset: the dataset name as a string '/PD/GTString/DATA-TIER'
events: max number of events (an int).

Returns:
A pandas DataFrame having for each row a single file and as columns:
- the file name;
- the lumisections;
- the run;
- the number of events.

"""

lumi_df = das_lumi_data(dataset)
file_df = das_file_data(dataset)

df = lumi_df.merge(
file_df, on="file", how="inner"
) # merge file informations with run and lumis
df["lumis"] = [
[int(ff) for ff in f.replace("[", "").replace("]", "").split(",")]
for f in df.lumis.values
]

df_rs = []

for r in golden:
cut = df["run"] == r
if not any(cut):
continue

df_r = df[cut]

# jumping very low event count runs
if df_r["events"].sum() < 10000:
continue

good_lumis = np.array(
[len([ll for ll in l if ll in golden[r]]) for l in df_r.lumis]
)
n_lumis = np.array([len(l) for l in df_r.lumis])
df_rs.append(df_r[good_lumis == n_lumis])

if len(df_rs) == 0:
return pd.DataFrame([])
if len(df_rs) == 1:
df = df_rs
else:
df = pd.concat(df_rs)

## lumi sorting
df.loc[:, "min_lumi"] = [min(f) for f in df.lumis]
df.loc[:, "max_lumi"] = [max(f) for f in df.lumis]
df = df.sort_values(["run", "min_lumi", "max_lumi"])

## events skimming
df = df[df["events"] <= events] # jump too big files
df.loc[:, "sum_evs"] = df.loc[:, "events"].cumsum()
df = df[df["sum_evs"] < events]

return df


def get_run_lumi(df):
"""
Produces the lumi mask dict starting from a pandas DataFrame

Args:
df: a pandas DataFrame having for each row a single file and as columns:
- the file name;
- the lumisections;
- the run;
- the number of events.
Returns:
A "CMS"-like lumi mask dict mapping:
- the run number;
- to the list of good lumisection ranges.

E.g. {run : [[lumi_1,lumi_2],[lumi_3,lumi_4]]}
"""
if len(df) == 0:
return {}

run_list = np.unique(df.run.values).tolist()
lumi_list = [
get_lumi_ranges(
np.sort(
np.concatenate(df.loc[df["run"] == r, "lumis"].values).ravel()
).tolist()
)
for r in run_list
]

lumi_ranges = dict(zip(run_list, lumi_list))

return lumi_ranges


def get_lumi_dict(golden, dataset, events):
"""
Produces a lumi mask for a given dataset, up to events, using a certification json

Args:
golden: a run by run certification json
dataset: the dataset name '/PD/GTString/DATA-TIER'
events: max number of events (an int).

Returns:
A "CMS"-like lumi mask dict mapping:
- the run number;
- to the list of good lumisection ranges.

E.g. {run : [[lumi_1,lumi_2],[lumi_3,lumi_4]]}
"""

df = get_events_df(golden, dataset, events)
lumi = get_run_lumi(df)

return lumi
Loading
Loading