Skip to content

Commit a088b5e

Browse files
committed
delegate specific script to generic script
1 parent f5a861b commit a088b5e

File tree

1 file changed

+11
-299
lines changed

1 file changed

+11
-299
lines changed
Lines changed: 11 additions & 299 deletions
Original file line numberDiff line numberDiff line change
@@ -1,307 +1,19 @@
11
#!/usr/bin/env python3
22

33
#
4-
# A script producing a consistent MC->RECO->AOD workflow with optional embedding.
4+
# A script producing a consistent MC->RECO->AOD workflow
5+
# with optional embedding with parameters for PWGHF.
56
#
67

7-
import argparse
8-
from os import environ
9-
import json
8+
import os
9+
import sys
1010

11-
parser = argparse.ArgumentParser(description='Create a PWGHF embedding pipeline')
12-
parser.add_argument('-nb',help='number of background events / timeframe', default=20)
13-
parser.add_argument('-ns',help='number of signal events / timeframe', default=20)
14-
parser.add_argument('-tf',help='number of timeframes', default=2)
15-
parser.add_argument('-j',help='number of workers (if applicable)', default=8)
16-
parser.add_argument('-e',help='simengine', default='TGeant4')
17-
parser.add_argument('-o',help='output workflow file', default='workflow.json')
18-
parser.add_argument('--rest-digi',action='store_true',help='treat smaller sensors in a single digitization')
19-
parser.add_argument('--embedding',help='whether to embedd into background', default=True)
20-
parser.add_argument('--noIPC',help='disable shared memory in DPL')
21-
parser.add_argument('--upload-bkg-to',help='where to upload background files (alien)')
22-
parser.add_argument('--use-bkg-from',help='use background from GRID instead of simulating from scratch')
23-
args = parser.parse_args()
24-
print (args)
11+
# we simply delegate to main script with some PWGHF settings
12+
command='${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCMS 13000 -col pp -proc ccbar '
2513

26-
# make sure O2DPG + O2 is loaded
27-
O2DPG_ROOT=environ.get('O2DPG_ROOT')
28-
O2_ROOT=environ.get('O2_ROOT')
14+
# and add given user options
15+
for i in range(1, len(sys.argv)):
16+
command += sys.argv[i]
17+
command += ' '
2918

30-
if O2DPG_ROOT == None:
31-
print('Error: This needs O2DPG loaded')
32-
# exit(1)
33-
34-
if O2_ROOT == None:
35-
print('Error: This needs O2 loaded')
36-
# exit(1)
37-
38-
# ----------- START WORKFLOW CONSTRUCTION -----------------------------
39-
40-
NSIGEVENTS=args.ns
41-
NTIMEFRAMES=int(args.tf)
42-
NWORKERS=args.j
43-
NBKGEVENTS=args.nb
44-
MODULES="--skipModules ZDC"
45-
SIMENGINE=args.e
46-
47-
workflow={}
48-
workflow['stages'] = []
49-
50-
taskcounter=0
51-
def createTask(name='', needs=[], tf=-1, cwd='./', lab=[], cpu=0, mem=0):
52-
global taskcounter
53-
taskcounter = taskcounter + 1
54-
return { 'name': name, 'cmd':'', 'needs': needs, 'resources': { 'cpu': cpu , 'mem': mem }, 'timeframe' : tf, 'labels' : lab, 'cwd' : cwd }
55-
56-
def getDPL_global_options(bigshm=False,nosmallrate=False):
57-
if args.noIPC!=None:
58-
return "-b --run --no-IPC " + ('--rate 1','')[nosmallrate]
59-
if bigshm:
60-
return "-b --run --shm-segment-size ${SHMSIZE:-50000000000} --session " + str(taskcounter) + ' --driver-client-backend ws://' + (' --rate 1','')[nosmallrate]
61-
else:
62-
return "-b --run --session " + str(taskcounter) + ' --driver-client-backend ws://' + (' --rate 1','')[nosmallrate]
63-
64-
doembedding=True if args.embedding=='True' or args.embedding==True else False
65-
usebkgcache=args.use_bkg_from!=None
66-
67-
if doembedding:
68-
if not usebkgcache:
69-
# ---- background transport task -------
70-
BKGtask=createTask(name='bkgsim', lab=["GEANT"], cpu='8')
71-
BKGtask['cmd']='o2-sim -e ' + SIMENGINE + ' -j ' + str(NWORKERS) + ' -n ' + str(NBKGEVENTS) + ' -g pythia8hi ' + str(MODULES) + ' -o bkg --configFile ${O2DPG_ROOT}/MC/config/common/ini/basic.ini; for d in tf*; do ln -nfs bkg* ${d}/; done'
72-
workflow['stages'].append(BKGtask)
73-
74-
if args.upload_bkg_to!=None:
75-
BKGuploadtask=createTask(name='bkgupload', needs=[BKGtask['name']], cpu='0')
76-
BKGuploadtask['cmd']='alien.py mkdir ' + args.upload_bkg_to + ';'
77-
BKGuploadtask['cmd']+='alien.py cp -f bkg* ' + args.upload_bkg_to + ';'
78-
workflow['stages'].append(BKGuploadtask)
79-
80-
else:
81-
# when using background caches, we have multiple smaller tasks
82-
# this split makes sense as they are needed at different stages
83-
# 1: --> download bkg_MCHeader.root + grp + geometry
84-
# 2: --> download bkg_Hit files (individually)
85-
# 3: --> download bkg_Kinematics
86-
87-
# Step 1: header and link files
88-
BKG_HEADER_task=createTask(name='bkgdownloadheader', cpu='0', lab=['BKGCACHE'])
89-
BKG_HEADER_task['cmd']='alien.py cp ' + args.use_bkg_from + 'bkg_MCHeader.root .'
90-
BKG_HEADER_task['cmd']=BKG_HEADER_task['cmd'] + ';alien.py cp ' + args.use_bkg_from + 'bkg_geometry.root .'
91-
BKG_HEADER_task['cmd']=BKG_HEADER_task['cmd'] + ';alien.py cp ' + args.use_bkg_from + 'bkg_grp.root .'
92-
workflow['stages'].append(BKG_HEADER_task)
93-
94-
# we split some detectors for improved load balancing --> the precise list needs to be made consistent with geometry and active sensors
95-
smallsensorlist = [ "ITS", "TOF", "FT0", "FV0", "FDD", "MCH", "MID", "MFT", "HMP", "EMC", "PHS", "CPV" ]
96-
97-
BKG_HITDOWNLOADER_TASKS={}
98-
for det in [ 'TPC', 'TRD' ] + smallsensorlist:
99-
if usebkgcache:
100-
BKG_HITDOWNLOADER_TASKS[det] = createTask(str(det) + 'hitdownload', cpu='0', lab=['BKGCACHE'])
101-
BKG_HITDOWNLOADER_TASKS[det]['cmd'] = 'alien.py cp ' + args.use_bkg_from + 'bkg_Hits' + str(det) + '.root .'
102-
workflow['stages'].append(BKG_HITDOWNLOADER_TASKS[det])
103-
else:
104-
BKG_HITDOWNLOADER_TASKS[det] = None
105-
106-
if usebkgcache:
107-
BKG_KINEDOWNLOADER_TASK = createTask(name='bkgkinedownload', cpu='0', lab=['BKGCACHE'])
108-
BKG_KINEDOWNLOADER_TASK['cmd'] = 'alien.py cp ' + args.use_bkg_from + 'bkg_Kine.root .'
109-
workflow['stages'].append(BKG_KINEDOWNLOADER_TASK)
110-
111-
# loop over timeframes
112-
for tf in range(1, NTIMEFRAMES + 1):
113-
timeframeworkdir='tf'+str(tf)
114-
115-
# ---- transport task -------
116-
# function encapsulating the signal sim part
117-
# first argument is timeframe id
118-
RNDSEED=0 # 0 means random seed !
119-
PTHATMIN=0. # [default = 0]
120-
PTHATMAX=-1. # [default = -1]
121-
122-
# produce the signal configuration
123-
SGN_CONFIG_task=createTask(name='gensgnconf_'+str(tf), tf=tf, cwd=timeframeworkdir)
124-
125-
SGN_CONFIG_task['cmd'] = '${O2DPG_ROOT}/MC/config/common/pythia8/utils/mkpy8cfg.py \
126-
--output=pythia8_'+ str(tf) +'.cfg \
127-
--seed='+str(RNDSEED)+' \
128-
--idA=2212 \
129-
--idB=2212 \
130-
--eCM=13000. \
131-
--process=ccbar \
132-
--ptHatMin=' + str(PTHATMIN) + ' \
133-
--ptHatMax=' + str(PTHATMAX)
134-
workflow['stages'].append(SGN_CONFIG_task)
135-
136-
# transport signals
137-
signalprefix='sgn_' + str(tf)
138-
signalneeds=[ SGN_CONFIG_task['name'] ]
139-
embeddinto= "--embedIntoFile ../bkg_MCHeader.root" if doembedding else ""
140-
if doembedding:
141-
if not usebkgcache:
142-
signalneeds = signalneeds + [ BKGtask['name'] ]
143-
else:
144-
signalneeds = signalneeds + [ BKG_HEADER_task['name'] ]
145-
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], cpu='5.')
146-
SGNtask['cmd']='o2-sim -e '+str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' -j ' + str(NWORKERS) + ' -g pythia8 '\
147-
+ ' -o ' + signalprefix + ' ' + embeddinto
148-
workflow['stages'].append(SGNtask)
149-
150-
# some tasks further below still want geometry + grp in fixed names, so we provide it here
151-
# Alternatively, since we have timeframe isolation, we could just work with standard o2sim_ files
152-
153-
# We need to be careful here and distinguish between embedding and non-embedding cases
154-
# (otherwise it can confuse itstpcmatching, see O2-2026). This is because only one of the GRPs is updated during digitization.
155-
if doembedding:
156-
LinkGRPFileTask=createTask(name='linkGRP_'+str(tf), needs=[BKG_HEADER_task['name'] if usebkgcache else BKGtask['name'] ], tf=tf, cwd=timeframeworkdir)
157-
LinkGRPFileTask['cmd']='''
158-
ln -nsf ../bkg_grp.root o2sim_grp.root;
159-
ln -nsf ../bkg_geometry.root o2sim_geometry.root;
160-
ln -nsf ../bkg_geometry.root bkg_geometry.root;
161-
ln -nsf ../bkg_grp.root bkg_grp.root
162-
'''
163-
else:
164-
LinkGRPFileTask=createTask(name='linkGRP_'+str(tf), needs=[SGNtask['name']], tf=tf, cwd=timeframeworkdir)
165-
LinkGRPFileTask['cmd']='ln -nsf ' + signalprefix + '_grp.root o2sim_grp.root ; ln -nsf ' + signalprefix + '_geometry.root o2sim_geometry.root'
166-
workflow['stages'].append(LinkGRPFileTask)
167-
168-
CONTEXTFILE='collisioncontext.root'
169-
170-
simsoption=' --sims ' + ('bkg,'+signalprefix if doembedding else signalprefix)
171-
172-
ContextTask=createTask(name='digicontext_'+str(tf), needs=[SGNtask['name'], LinkGRPFileTask['name']], tf=tf,
173-
cwd=timeframeworkdir, lab=["DIGI"], cpu='1')
174-
ContextTask['cmd'] = 'o2-sim-digitizer-workflow --only-context --interactionRate 50000 ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption
175-
workflow['stages'].append(ContextTask)
176-
177-
tpcdigineeds=[ContextTask['name'], LinkGRPFileTask['name']]
178-
if usebkgcache:
179-
tpcdigineeds += [ BKG_HITDOWNLOADER_TASKS['TPC']['name'] ]
180-
181-
TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=tpcdigineeds,
182-
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu='8', mem='9000')
183-
TPCDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTPC.root . ;')[doembedding]
184-
TPCDigitask['cmd'] += 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TPC --interactionRate 50000 --tpc-lanes ' + str(NWORKERS) + ' --incontext ' + str(CONTEXTFILE) + ' --tpc-chunked-writer'
185-
workflow['stages'].append(TPCDigitask)
186-
187-
trddigineeds = [ContextTask['name']]
188-
if usebkgcache:
189-
trddigineeds += [ BKG_HITDOWNLOADER_TASKS['TRD']['name'] ]
190-
TRDDigitask=createTask(name='trddigi_'+str(tf), needs=trddigineeds,
191-
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu='8', mem='8000')
192-
TRDDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTRD.root . ;')[doembedding]
193-
TRDDigitask['cmd'] += 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TRD --interactionRate 50000 --configKeyValues \"TRDSimParams.digithreads=' + str(NWORKERS) + '\" --incontext ' + str(CONTEXTFILE)
194-
workflow['stages'].append(TRDDigitask)
195-
196-
# these are digitizers which are single threaded
197-
def createRestDigiTask(name, det='ALLSMALLER'):
198-
tneeds = needs=[ContextTask['name']]
199-
if det=='ALLSMALLER':
200-
if usebkgcache:
201-
for d in smallsensorlist:
202-
tneeds += [ BKG_HITDOWNLOADER_TASKS[d]['name'] ]
203-
t = createTask(name=name, needs=tneeds,
204-
tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='8')
205-
t['cmd'] = ('','ln -nfs ../bkg_Hits*.root . ;')[doembedding]
206-
t['cmd'] += 'o2-sim-digitizer-workflow ' + getDPL_global_options(nosmallrate=True) + ' -n ' + str(args.ns) + simsoption + ' --skipDet TPC,TRD --interactionRate 50000 --incontext ' + str(CONTEXTFILE)
207-
workflow['stages'].append(t)
208-
return t
209-
210-
else:
211-
if usebkgcache:
212-
tneeds += [ BKG_HITDOWNLOADER_TASKS[det]['name'] ]
213-
t = createTask(name=name, needs=tneeds,
214-
tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1')
215-
t['cmd'] = ('','ln -nfs ../bkg_Hits' + str(det) + '.root . ;')[doembedding]
216-
t['cmd'] += 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet ' + str(det) + ' --interactionRate 50000 --incontext ' + str(CONTEXTFILE)
217-
workflow['stages'].append(t)
218-
return t
219-
220-
det_to_digitask={}
221-
222-
if args.rest_digi==True:
223-
det_to_digitask['ALLSMALLER']=createRestDigiTask("restdigi_"+str(tf))
224-
225-
for det in smallsensorlist:
226-
name=str(det).lower() + "digi_" + str(tf)
227-
t = det_to_digitask['ALLSMALLER'] if args.rest_digi==True else createRestDigiTask(name, det)
228-
det_to_digitask[det]=t
229-
230-
# -----------
231-
# reco
232-
# -----------
233-
234-
# TODO: check value for MaxTimeBin; A large value had to be set tmp in order to avoid crashes based on "exceeding timeframe limit"
235-
TPCRECOtask1=createTask(name='tpccluster_'+str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='3', mem='16000')
236-
TPCRECOtask1['cmd'] = 'o2-tpc-chunkeddigit-merger --rate 1 --tpc-lanes ' + str(NWORKERS) + ' --session ' + str(taskcounter)
237-
TPCRECOtask1['cmd'] += ' | o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True, nosmallrate=True) + ' --input-type digitizer --output-type clusters,send-clusters-per-sector --configKeyValues "GPU_global.continuousMaxTimeBin=100000;GPU_proc.ompThreads='+str(NWORKERS)+'"'
238-
workflow['stages'].append(TPCRECOtask1)
239-
240-
TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=[TPCRECOtask1['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='3', mem='16000')
241-
TPCRECOtask['cmd'] = 'o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True, nosmallrate=True) + ' --input-type clusters --output-type tracks,send-clusters-per-sector --configKeyValues "GPU_global.continuousMaxTimeBin=100000;GPU_proc.ompThreads='+str(NWORKERS)+'"'
242-
workflow['stages'].append(TPCRECOtask)
243-
244-
ITSRECOtask=createTask(name='itsreco_'+str(tf), needs=[det_to_digitask["ITS"]['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000')
245-
ITSRECOtask['cmd'] = 'o2-its-reco-workflow --trackerCA --tracking-mode async ' + getDPL_global_options()
246-
workflow['stages'].append(ITSRECOtask)
247-
248-
FT0RECOtask=createTask(name='ft0reco_'+str(tf), needs=[det_to_digitask["FT0"]['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
249-
FT0RECOtask['cmd'] = 'o2-ft0-reco-workflow ' + getDPL_global_options()
250-
workflow['stages'].append(FT0RECOtask)
251-
252-
ITSTPCMATCHtask=createTask(name='itstpcMatch_'+str(tf), needs=[TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='8000', cpu='3')
253-
ITSTPCMATCHtask['cmd']= 'o2-tpcits-match-workflow ' + getDPL_global_options(bigshm=True, nosmallrate=True) + ' --tpc-track-reader \"tpctracks.root\" --tpc-native-cluster-reader \"--infile tpc-native-clusters.root\"'
254-
workflow['stages'].append(ITSTPCMATCHtask)
255-
256-
# this can be combined with TRD digitization if benefical
257-
TRDTRAPtask = createTask(name='trdtrap_'+str(tf), needs=[TRDDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI"])
258-
TRDTRAPtask['cmd'] = 'o2-trd-trap-sim'
259-
workflow['stages'].append(TRDTRAPtask)
260-
261-
TRDTRACKINGtask = createTask(name='trdreco_'+str(tf), needs=[TRDTRAPtask['name'], ITSTPCMATCHtask['name'], TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
262-
TRDTRACKINGtask['cmd'] = 'echo "would do TRD tracking"' # 'o2-trd-global-tracking'
263-
workflow['stages'].append(TRDTRACKINGtask)
264-
265-
TOFRECOtask = createTask(name='tofmatch_'+str(tf), needs=[ITSTPCMATCHtask['name'], det_to_digitask["TOF"]['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
266-
TOFRECOtask['cmd'] = 'o2-tof-reco-workflow ' + getDPL_global_options()
267-
workflow['stages'].append(TOFRECOtask)
268-
269-
TOFTPCMATCHERtask = createTask(name='toftpcmatch_'+str(tf), needs=[TOFRECOtask['name'], TPCRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
270-
TOFTPCMATCHERtask['cmd'] = 'o2-tof-matcher-tpc ' + getDPL_global_options()
271-
workflow['stages'].append(TOFTPCMATCHERtask)
272-
273-
PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=[ITSTPCMATCHtask['name'], FT0RECOtask['name'], TOFTPCMATCHERtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='4')
274-
PVFINDERtask['cmd'] = 'o2-primary-vertexing-workflow ' + getDPL_global_options(nosmallrate=True)
275-
workflow['stages'].append(PVFINDERtask)
276-
277-
# -----------
278-
# produce AOD
279-
# -----------
280-
aodneeds = [PVFINDERtask['name'], TOFRECOtask['name'], TRDTRACKINGtask['name']]
281-
if usebkgcache:
282-
aodneeds += [ BKG_KINEDOWNLOADER_TASK['name'] ]
283-
284-
AODtask = createTask(name='aod_'+str(tf), needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='16000', cpu='1')
285-
AODtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding]
286-
AODtask['cmd'] += 'o2-aod-producer-workflow --aod-writer-keep dangling --aod-writer-resfile \"AO2D\" --aod-writer-resmode UPDATE --aod-timeframe-id ' + str(tf) + ' ' + getDPL_global_options(bigshm=True)
287-
AODtask['cmd'] = 'echo \"hello\"' #-> skipping for moment since not optimized
288-
workflow['stages'].append(AODtask)
289-
290-
def trimString(cmd):
291-
return ' '.join(cmd.split())
292-
293-
# insert taskwrapper stuff
294-
for s in workflow['stages']:
295-
s['cmd']='. ${O2_ROOT}/share/scripts/jobutils.sh; taskwrapper ' + s['name']+'.log \'' + s['cmd'] + '\''
296-
297-
# remove whitespaces etc
298-
for s in workflow['stages']:
299-
s['cmd']=trimString(s['cmd'])
300-
301-
302-
# write workflow to json
303-
workflowfile=args.o
304-
with open(workflowfile, 'w') as outfile:
305-
json.dump(workflow, outfile, indent=2)
306-
307-
exit (0)
19+
os.system(command)

0 commit comments

Comments
 (0)