From b5b4c265fdf004e9d0834cb96f724d1c581654b8 Mon Sep 17 00:00:00 2001 From: jzhou Date: Fri, 25 Apr 2008 16:59:55 +0000 Subject: [PATCH] add multi-thread simulator for multi-core version codes. Also add two new files in Runtime directory to hold multi-core related codes. --- Robust/src/Analysis/Scheduling/Schedule.java | 54 +- .../Analysis/Scheduling/ScheduleAnalysis.java | 36 +- Robust/src/IR/Flat/BuildCodeMultiCore.java | 188 +- Robust/src/IR/State.java | 1 + Robust/src/Main/Main.java | 7 +- Robust/src/Runtime/multicoreruntime.c | 201 +++ Robust/src/Runtime/multicoretask.c | 1526 +++++++++++++++++ Robust/src/Runtime/socket.c | 21 +- Robust/src/Runtime/task.c | 131 -- Robust/src/buildscript | 43 +- 10 files changed, 2016 insertions(+), 192 deletions(-) create mode 100644 Robust/src/Runtime/multicoreruntime.c create mode 100644 Robust/src/Runtime/multicoretask.c diff --git a/Robust/src/Analysis/Scheduling/Schedule.java b/Robust/src/Analysis/Scheduling/Schedule.java index 38d7339a..3fedbb6c 100644 --- a/Robust/src/Analysis/Scheduling/Schedule.java +++ b/Robust/src/Analysis/Scheduling/Schedule.java @@ -15,6 +15,8 @@ public class Schedule { private Vector tasks; private Hashtable> targetCores; private Hashtable targetFState; + private Vector ancestorCores; + private Vector childCores; public Schedule(int coreNum) { super(); @@ -22,6 +24,7 @@ public class Schedule { this.tasks = null; this.targetCores = null; this.targetFState = null; + this.ancestorCores = null; } public int getCoreNum() { @@ -92,5 +95,54 @@ public class Schedule { if(!this.tasks.contains(task)) { this.tasks.add(task); } - } + } + + public Vector getAncestorCores() { + return ancestorCores; + } + + public void setAncestorCores(Vector ancestorCores) { + this.ancestorCores = ancestorCores; + } + + public void addAncestorCores(Integer ancestorCore) { + if(this.ancestorCores == null) { + this.ancestorCores = new Vector(); + } + if((ancestorCore.intValue() != this.coreNum) && (!this.ancestorCores.contains(ancestorCore))) { + this.ancestorCores.addElement(ancestorCore); + } + } + + public int ancestorCoresNum() { + if(this.ancestorCores == null) { + return 0; + } + return this.ancestorCores.size(); + } + + public Vector getChildCores() { + return childCores; + } + + public void setChildCores(Vector childCores) { + this.childCores = childCores; + } + + public void addChildCores(Integer childCore) { + if(this.childCores == null) { + this.childCores = new Vector(); + } + if((childCore.intValue() != this.coreNum) && (!this.childCores.contains(childCore))) { + this.childCores.addElement(childCore); + } + } + + public int childCoresNum() { + if(this.childCores == null) { + return 0; + } + return this.childCores.size(); + } + } \ No newline at end of file diff --git a/Robust/src/Analysis/Scheduling/ScheduleAnalysis.java b/Robust/src/Analysis/Scheduling/ScheduleAnalysis.java index a44cbab6..937f7d42 100644 --- a/Robust/src/Analysis/Scheduling/ScheduleAnalysis.java +++ b/Robust/src/Analysis/Scheduling/ScheduleAnalysis.java @@ -77,7 +77,7 @@ public class ScheduleAnalysis { Vector sFStates = FlagState.DFS.topology(fStates, null); Vector rootnodes = taskanalysis.getRootNodes(cd); - if(((rootnodes != null) && (rootnodes.size() > 0)) || (cd.getSymbol().equals("StartupObject"))) { + if(((rootnodes != null) && (rootnodes.size() > 0)) || (cd.getSymbol().equals(TypeUtil.StartupClass))) { ClassNode cNode = new ClassNode(cd, sFStates); cNode.setSorted(true); classNodes.add(cNode); @@ -730,6 +730,14 @@ public class ScheduleAnalysis { for(j = 0; j < scheduleGraph.size(); j++) { sn2coreNum.put(scheduleGraph.elementAt(j), j); } + int startupcore = 0; + boolean setstartupcore = false; + Schedule startup = null; + Vector leafcores = new Vector(); + Vector[] ancestorCores = new Vector[this.coreNum]; + for(j = 0; j < ancestorCores.length; ++j) { + ancestorCores[j] = new Vector(); + } for(j = 0; j < scheduleGraph.size(); j++) { Schedule tmpSchedule = new Schedule(j); ScheduleNode sn = scheduleGraph.elementAt(j); @@ -743,22 +751,39 @@ public class ScheduleAnalysis { while(it_edges.hasNext()) { TaskDescriptor td = ((FEdge)it_edges.next()).getTask(); tmpSchedule.addTask(td); + if(td.getParamType(0).getClassDesc().getSymbol().equals(TypeUtil.StartupClass)) { + assert(!setstartupcore); + startupcore = j; + startup = tmpSchedule; + setstartupcore = true; + } } } } // For each of the ScheduleEdge out of this ScheduleNode, add the target ScheduleNode into the queue inside sn Iterator it_edges = sn.edges(); + if(!it_edges.hasNext()) { + // leaf core, considered as ancestor of startup core + if(!leafcores.contains(Integer.valueOf(j))) { + leafcores.addElement(Integer.valueOf(j)); + } + } while(it_edges.hasNext()) { ScheduleEdge se = (ScheduleEdge)it_edges.next(); ScheduleNode target = (ScheduleNode)se.getTarget(); + Integer targetcore = sn2coreNum.get(target); if(se.getIsNew()) { for(int k = 0; k < se.getNewRate(); k++) { - tmpSchedule.addTargetCore(se.getFstate(), sn2coreNum.get(target)); + tmpSchedule.addTargetCore(se.getFstate(), targetcore); } } else { // 'transmit' edge - tmpSchedule.addTargetCore(se.getFstate(), sn2coreNum.get(target), se.getTargetFState()); + tmpSchedule.addTargetCore(se.getFstate(), targetcore, se.getTargetFState()); + } + tmpSchedule.addChildCores(targetcore); + if((targetcore.intValue() != j) && (!ancestorCores[targetcore.intValue()].contains((Integer.valueOf(j))))) { + ancestorCores[targetcore.intValue()].addElement(Integer.valueOf(j)); } } it_edges = sn.getScheduleEdgesIterator(); @@ -775,6 +800,11 @@ public class ScheduleAnalysis { } scheduling.add(tmpSchedule); } + leafcores.removeElement(Integer.valueOf(startupcore)); + ancestorCores[startupcore] = leafcores; + for(j = 0; j < this.coreNum; ++j) { + scheduling.elementAt(j).setAncestorCores(ancestorCores[j]); + } this.schedulings.add(scheduling); } diff --git a/Robust/src/IR/Flat/BuildCodeMultiCore.java b/Robust/src/IR/Flat/BuildCodeMultiCore.java index 4251ae47..efa0031c 100644 --- a/Robust/src/IR/Flat/BuildCodeMultiCore.java +++ b/Robust/src/IR/Flat/BuildCodeMultiCore.java @@ -33,12 +33,14 @@ public class BuildCodeMultiCore extends BuildCode { int coreNum; Schedule currentSchedule; Hashtable[] fsate2qnames; - String objqs4startupprefix= "objqueuearray4startup"; - String objqs4socketprefix= "objqueuearray4socket"; + //String objqs4startupprefix= "objqueuearray4startup"; + //String objqs4socketprefix= "objqueuearray4socket"; + String objqarrayprefix= "objqueuearray4class"; String objqueueprefix = "objqueue4parameter_"; String taskprefix = "task_"; String taskarrayprefix = "taskarray_core"; String otqueueprefix = "___otqueue"; + int startupcorenum; // record the core containing startup task, suppose only one core can hava startup object public BuildCodeMultiCore(State st, Hashtable temptovar, TypeUtil typeutil, SafetyAnalysis sa, Vector scheduling, int coreNum) { super(st, temptovar, typeutil, sa); @@ -46,6 +48,7 @@ public class BuildCodeMultiCore extends BuildCode { this.coreNum = coreNum; this.currentSchedule = null; this.fsate2qnames = null; + this.startupcorenum = 0; } public void buildCode() { @@ -145,22 +148,20 @@ public class BuildCodeMultiCore extends BuildCode { taskits[i] = null; } int[] numtasks = new int[this.coreNum]; + int[][] numqueues = new int[this.coreNum][numclasses]; // arrays record the queues for startup object & socket object - int[][] numqueues = new int[2][this.coreNum]; - /*Vector qnames[][]= new Vector[2][this.coreNum]; - for(int i = 0; i < qnames.length; ++i) { - qnames[i] = null; - }*/ + //int[][] numqueues = new int[2][this.coreNum]; /* Output code for tasks */ for(int i = 0; i < this.scheduling.size(); ++i) { this.currentSchedule = this.scheduling.elementAt(i); - outputTaskCode(outtaskdefs, outmethod, outtask, taskits, numtasks, numqueues);//, qnames); + outputTaskCode(outtaskdefs, outmethod, outtask, taskits, numtasks, numqueues); /*outputTaskCode(outtaskdefs[this.currentSchedule.getCoreNum()], outmethod); outtaskdefs[this.currentSchedule.getCoreNum()].close();*/ } // Output task descriptors boolean comma = false; + /* for(int index = 0; index < 2; ++index) { if(index == 0) { outtaskdefs.println("struct parameterwrapper ** objq4startupobj[] = {"); @@ -174,7 +175,7 @@ public class BuildCodeMultiCore extends BuildCode { } else { comma = true; } - outtaskdefs.println("/* object queue array for core " + i + "*/"); + outtaskdefs.println("/* object queue array for core " + i + "* /"); outtaskdefs.print(this.objqs4startupprefix + "_core" + i); } outtaskdefs.println("};"); @@ -194,7 +195,51 @@ public class BuildCodeMultiCore extends BuildCode { outtaskdefs.print(tmparray[i]); } outtaskdefs.println("};"); + }*/ + outtaskdefs.println("struct parameterwrapper ** objectqueues[][NUMCLASSES] = {"); + boolean needcomma = false; + for(int i = 0; i < numqueues.length ; ++i) { + if(needcomma) { + outtaskdefs.println(","); + } else { + needcomma = true; + } + outtaskdefs.println("/* object queue array for core " + i + "*/"); + outtaskdefs.print("{"); + comma = false; + for(int j = 0; j < numclasses; ++j) { + if(comma) { + outtaskdefs.println(","); + } else { + comma = true; + } + outtaskdefs.print(this.objqarrayprefix + j + "_core" + i); + } + outtaskdefs.print("}"); + } + outtaskdefs.println("};"); + needcomma = false; + outtaskdefs.println("int numqueues[][NUMCLASSES] = {"); + for(int i = 0; i < numqueues.length; ++i) { + if(needcomma) { + outtaskdefs.println(","); + } else { + needcomma = true; + } + int[] tmparray = numqueues[i]; + comma = false; + outtaskdefs.print("{"); + for(int j = 0; j < tmparray.length; ++j) { + if(comma) { + outtaskdefs.print(","); + } else { + comma = true; + } + outtaskdefs.print(tmparray[j]); + } + outtaskdefs.print("}"); } + outtaskdefs.println("};"); for(int i = 0; i < taskits.length; ++i) { outtaskdefs.println("struct taskdescriptor * " + this.taskarrayprefix + i + "[]={"); @@ -225,12 +270,12 @@ public class BuildCodeMultiCore extends BuildCode { outtaskdefs.println("};"); outtaskdefs.print("int numtasks[]= {"); + comma = false; for(int i = 0; i < taskits.length; ++i) { - boolean first=true; - if (first) - first=false; - else + if (comma) outtaskdefs.print(","); + else + comma=true; outtaskdefs.print(numtasks[i]); } outtaskdefs.println("};"); @@ -243,11 +288,17 @@ public class BuildCodeMultiCore extends BuildCode { outtaskdefs.println("#endif"); outtaskdefs.close(); - outtask.println("#endif"); outtask.close(); /* Record maximum number of task parameters */ outstructs.println("#define MAXTASKPARAMS "+maxtaskparams); + /* Record maximum number of all types, i.e. length of classsize[] */ + outstructs.println("#define NUMTYPES "+(state.numClasses() + state.numArrays())); + /* Record number of cores */ + outstructs.println("#define NUMCORES "+this.coreNum); + /* Record number of core containing startup task */ + outstructs.println("#define STARTUPCORE "+this.startupcorenum); + //outstructs.println("#define STARTUPCORESTR \""+this.startupcorenum+"\""); } //else if (state.main!=null) { /* Generate main method */ // outputMainMethod(outmethod); @@ -343,7 +394,7 @@ public class BuildCodeMultiCore extends BuildCode { /* This method outputs code for each task. */ private void outputTaskCode(PrintWriter outtaskdefs, PrintWriter outmethod, PrintWriter outtask, Iterator[] taskits, int[] numtasks, - int[][] numqueues) {//, Vector[] qnames) { + int[][] numqueues) { /* Compile task based program */ outtaskdefs.println("#include \"task.h\""); outtaskdefs.println("#include \"methodheaders.h\""); @@ -351,7 +402,12 @@ public class BuildCodeMultiCore extends BuildCode { /* Output object transfer queues into method.c*/ generateObjectTransQueues(outmethod); - Vector[] qnames = new Vector[2]; + //Vector[] qnames = new Vector[2]; + int numclasses = numqueues[0].length; + Vector qnames[]= new Vector[numclasses]; + for(int i = 0; i < qnames.length; ++i) { + qnames[i] = null; + } Iterator taskit=this.currentSchedule.getTasks().iterator(); while(taskit.hasNext()) { TaskDescriptor td=taskit.next(); @@ -363,11 +419,11 @@ public class BuildCodeMultiCore extends BuildCode { // generate queuearray for this core int num = this.currentSchedule.getCoreNum(); boolean comma = false; - for(int i = 0; i < 2; ++i) { + /*for(int i = 0; i < 2; ++i) { if(i == 0) { - outtaskdefs.println("/* object queue array for class StartupObject on core " + num + "*/"); + outtaskdefs.println("/* object queue array for class StartupObject on core " + num + "* /"); } else { - outtaskdefs.println("/* object queue array for class Socket on core " + num + "*/"); + outtaskdefs.println("/* object queue array for class Socket on core " + num + "* /"); } if(i == 0) { outtaskdefs.println("struct parameterwrapper * " + this.objqs4startupprefix + "_core" + num + "[] = {"); @@ -391,6 +447,27 @@ public class BuildCodeMultiCore extends BuildCode { } outtaskdefs.println(); outtaskdefs.println("};"); + }*/ + for(int i = 0; i < qnames.length; ++i) { + outtaskdefs.println("/* object queue array for class " + i + " on core " + num + "*/"); + outtaskdefs.println("struct parameterwrapper * " + this.objqarrayprefix + i + "_core" + num + "[] = {"); + //outtaskdefs.print("0"); + comma = false; + Vector tmpvector = qnames[i]; + if(tmpvector != null) { + for(int j = 0; j < tmpvector.size(); ++j) { + if(comma) { + outtaskdefs.println(","); + } else { + comma = true; + } + outtaskdefs.print("&" + tmpvector.elementAt(j)); + } + numqueues[num][i] = tmpvector.size();// + 1; + } else { + numqueues[num][i] = 0;//1; + } + outtaskdefs.println("};"); } // record the iterator of tasks on this core @@ -424,7 +501,7 @@ public class BuildCodeMultiCore extends BuildCode { outtask.println("};"); outtask.println(); outtask.println("struct parameterwrapper {"); - outtask.println(" //struct parameterwrapper *next;"); + outtask.println(" //int type;"); outtask.println(" struct ObjectHash * objectset;"); outtask.println(" int numberofterms;"); outtask.println(" int * intarray;"); @@ -435,10 +512,14 @@ public class BuildCodeMultiCore extends BuildCode { outtask.println(" struct tagobjectiterator iterators[MAXTASKPARAMS-1];"); outtask.println("};"); outtask.println(); + /* outtask.println("extern struct parameterwrapper ** objq4startupobj[];"); outtask.println("extern int numqueues4startupobj[];"); outtask.println("extern struct parameterwrapper ** objq4socketobj[];"); outtask.println("extern int numqueues4socketobj[];"); + */ + outtask.println("extern struct parameterwrapper ** objectqueues[][NUMCLASSES];"); + outtask.println("extern int numqueues[][NUMCLASSES];"); outtask.println(); outtask.println("struct parameterdescriptor {"); outtask.println(" int type;"); @@ -475,32 +556,34 @@ public class BuildCodeMultiCore extends BuildCode { Hashtable flag2qname = new Hashtable(); this.fsate2qnames[num] = flag2qname; Hashtable> targetCoreTbl = this.currentSchedule.getTargetCoreTable(); - Object[] keys = targetCoreTbl.keySet().toArray(); - output.println(); - output.println("/* Object transfer queues for core" + num + ".*/"); - for(int i = 0; i < keys.length; ++i) { - FlagState tmpfstate = (FlagState)keys[i]; - Object[] targetcores = targetCoreTbl.get(tmpfstate).toArray(); - String queuename = this.otqueueprefix + tmpfstate.getClassDescriptor().getCoreSafeSymbol(num) + tmpfstate.getuid() + "___"; - String queueins = queuename + "ins"; - flag2qname.put(tmpfstate, queuename); - output.println("struct " + queuename + " {"); - output.println(" int * cores;"); - output.println(" int index;"); - output.println(" int length;"); - output.println("};"); - output.print("int " + queuename + "cores[] = {"); - for(int j = 0; j < targetcores.length; ++j) { - if(j > 0) { - output.print(", "); + if(targetCoreTbl != null) { + Object[] keys = targetCoreTbl.keySet().toArray(); + output.println(); + output.println("/* Object transfer queues for core" + num + ".*/"); + for(int i = 0; i < keys.length; ++i) { + FlagState tmpfstate = (FlagState)keys[i]; + Object[] targetcores = targetCoreTbl.get(tmpfstate).toArray(); + String queuename = this.otqueueprefix + tmpfstate.getClassDescriptor().getCoreSafeSymbol(num) + tmpfstate.getuid() + "___"; + String queueins = queuename + "ins"; + flag2qname.put(tmpfstate, queuename); + output.println("struct " + queuename + " {"); + output.println(" int * cores;"); + output.println(" int index;"); + output.println(" int length;"); + output.println("};"); + output.print("int " + queuename + "cores[] = {"); + for(int j = 0; j < targetcores.length; ++j) { + if(j > 0) { + output.print(", "); + } + output.print(((Integer)targetcores[j]).intValue()); } - output.print(((Integer)targetcores[j]).intValue()); + output.println("};"); + output.println("struct " + queuename + " " + queueins + "= {"); + output.println(/*".cores = " + */queuename + "cores,"); + output.println(/*".index = " + */"0,"); + output.println(/*".length = " +*/ targetcores.length + "};"); } - output.println("};"); - output.println("struct " + queuename + " " + queueins + "= {"); - output.println(/*".cores = " + */queuename + "cores,"); - output.println(/*".index = " + */"0,"); - output.println(/*".length = " +*/ targetcores.length + "};"); } output.println(); } @@ -677,6 +760,9 @@ public class BuildCodeMultiCore extends BuildCode { // generate object queue for this parameter String qname = this.objqueueprefix+i+"_"+task.getCoreSafeSymbol(num); if(param_type.getClassDesc().getSymbol().equals("StartupObject")) { + this.startupcorenum = num; + } + /*if(param_type.getClassDesc().getSymbol().equals("StartupObject")) { if(qnames[0] == null) { qnames[0] = new Vector(); } @@ -686,9 +772,14 @@ public class BuildCodeMultiCore extends BuildCode { qnames[1] = new Vector(); } qnames[1].addElement(qname); + }*/ + if(qnames[param_type.getClassDesc().getId()] == null) { + qnames[param_type.getClassDesc().getId()] = new Vector(); } + qnames[param_type.getClassDesc().getId()].addElement(qname); outtask.println("extern struct parameterwrapper " + qname + ";"); output.println("struct parameterwrapper " + qname + "={"); + //output.println(".type = " + param_type.getClassDesc().getId() + ","); // type output.println(".objectset = 0,"); // objectset output.println("/* number of DNF terms */ .numberofterms = "+dnfterms+","); // numberofterms output.println(".intarray = parameterdnf_"+i+"_"+task.getCoreSafeSymbol(num)+","); // intarray @@ -869,7 +960,10 @@ public class BuildCodeMultiCore extends BuildCode { Vector tmpfstates = (Vector)targetFStates[j]; for(int i = 0; i < tmpfstates.size(); ++i) { FlagState tmpFState = tmpfstates.elementAt(i); - Queue queue = targetCoreTbl.get(tmpFState); + Queue queue = null; + if(targetCoreTbl != null) { + queue = targetCoreTbl.get(tmpFState); + } if((queue != null) && ((queue.size() != 1) || ((queue.size() == 1) && (queue.element().intValue() != num)))) { @@ -899,6 +993,8 @@ public class BuildCodeMultiCore extends BuildCode { output.println("/* transfer to core " + targetcore.toString() + "*/"); // method call of transfer objects output.println("transferObject("+super.generateTemp(fm, temp, lb)+", " + targetcore.toString() + ");"); + //output.println("transferObject("+super.generateTemp(fm, temp, lb)+", " + targetcore.toString() + + // ", \"" + targetcore.toString() + "\"" + ");"); } output.println("break;"); } @@ -907,6 +1003,8 @@ public class BuildCodeMultiCore extends BuildCode { output.println("/* transfer to core " + targetcore.toString() + "*/"); // method call of transfer objectts output.println("transferObject("+super.generateTemp(fm, temp, lb)+", " + targetcore.toString() + ");"); + //output.println("transferObject("+super.generateTemp(fm, temp, lb)+", " + targetcore.toString() + + // ", \"" + targetcore.toString() + "\"" + ");"); } output.println("/* increase index*/"); output.println("++" + queueins + ".index;"); diff --git a/Robust/src/IR/State.java b/Robust/src/IR/State.java index 25c7f42d..63345fcd 100644 --- a/Robust/src/IR/State.java +++ b/Robust/src/IR/State.java @@ -62,6 +62,7 @@ public class State { public boolean INSTRUCTIONFAILURE=false; public static double TRUEPROB=0.8; public static boolean PRINTFLAT=false; + public int CORENUM = 1; public String structfile; public String main; diff --git a/Robust/src/Main/Main.java b/Robust/src/Main/Main.java index ee8abe84..d3bc0def 100644 --- a/Robust/src/Main/Main.java +++ b/Robust/src/Main/Main.java @@ -64,6 +64,10 @@ public class Main { state.excprefetch.add(args[++i]); else if (option.equals("-classlibrary")) ClassLibraryPrefix=args[++i]+"/"; + else if(option.equals("-numcore")) { + ++i; + state.CORENUM = Integer.parseInt(args[i]); + } else if (option.equals("-mainclass")) state.main=args[++i]; else if (option.equals("-trueprob")) { @@ -330,8 +334,7 @@ public class Main { scheduleAnalysis.preSchedule(); scheduleAnalysis.scheduleAnalysis(); //scheduleAnalysis.setCoreNum(scheduleAnalysis.getSEdges4Test().size()); - scheduleAnalysis.setCoreNum(1); - //scheduleAnalysis.setCoreNum(2); + scheduleAnalysis.setCoreNum(state.CORENUM); scheduleAnalysis.schedule(); //simulate these schedulings diff --git a/Robust/src/Runtime/multicoreruntime.c b/Robust/src/Runtime/multicoreruntime.c new file mode 100644 index 00000000..efd6ac71 --- /dev/null +++ b/Robust/src/Runtime/multicoreruntime.c @@ -0,0 +1,201 @@ +#include "runtime.h" +#include "structdefs.h" +#include +#include "mem.h" +#include +#include +#include +#include +#include "option.h" + +extern int classsize[]; +jmp_buf error_handler; +int instructioncount; + +char *options; +int injectfailures=0; +float failurechance=0; +int debugtask=0; +int injectinstructionfailures; +int failurecount; +float instfailurechance=0; +int numfailures; +int instaccum=0; +#ifdef DMALLOC +#include "dmalloc.h" +#endif + +void exithandler(int sig, siginfo_t *info, void * uap) { +#ifdef DEBUG + printf("exit in exithandler\n"); +#endif + exit(0); +} + +void initializeexithandler() { + struct sigaction sig; + sig.sa_sigaction=&exithandler; + sig.sa_flags=SA_SIGINFO; + sigemptyset(&sig.sa_mask); + sigaction(SIGUSR2, &sig, 0); +} + + +/* This function inject failures */ + +void injectinstructionfailure() { +#ifdef TASK + if (injectinstructionfailures) { + if (numfailures==0) + return; + instructioncount=failurecount; + instaccum+=failurecount; + if ((((double)random())/RAND_MAX)0) + numfailures--; + printf("FAILURE!!! %d\n",numfailures); + longjmp(error_handler,11); + } + } +#else +#ifdef THREADS + if (injectinstructionfailures) { + if (numfailures==0) + return; + instaccum+=failurecount; + if ((((double)random())/RAND_MAX)0) + numfailures--; + printf("FAILURE!!! %d\n",numfailures); + threadexit(); + } + } +#endif +#endif +} + +void CALL11(___System______exit____I,int ___status___, int ___status___) { +#ifdef DEBUG + printf("exit in CALL11\n"); +#endif + exit(___status___); +} + +long CALL00(___System______currentTimeMillis____) { + struct timeval tv; long long retval; + gettimeofday(&tv, NULL); + retval = tv.tv_sec; /* seconds */ + retval*=1000; /* milliseconds */ + retval+= (tv.tv_usec/1000); /* adjust milliseconds & add them in */ + return retval; +} + +void CALL01(___System______printString____L___String___,struct ___String___ * ___s___) { + struct ArrayObject * chararray=VAR(___s___)->___value___; + int i; + int offset=VAR(___s___)->___offset___; + for(i=0;i___count___;i++) { + short sc=((short *)(((char *)& chararray->___length___)+sizeof(int)))[i+offset]; + putchar(sc); + } +} + +/* Object allocation function */ + +#ifdef PRECISE_GC +void * allocate_new(void * ptr, int type) { + struct ___Object___ * v=(struct ___Object___ *) mygcmalloc((struct garbagelist *) ptr, classsize[type]); + v->type=type; +#ifdef THREADS + v->tid=0; + v->lockentry=0; + v->lockcount=0; +#endif + return v; +} + +/* Array allocation function */ + +struct ArrayObject * allocate_newarray(void * ptr, int type, int length) { + struct ArrayObject * v=mygcmalloc((struct garbagelist *) ptr, sizeof(struct ArrayObject)+length*classsize[type]); + v->type=type; + if (length<0) { + printf("ERROR: negative array\n"); + return NULL; + } + v->___length___=length; +#ifdef THREADS + v->tid=0; + v->lockentry=0; + v->lockcount=0; +#endif + return v; +} + +#else +void * allocate_new(int type) { + struct ___Object___ * v=FREEMALLOC(classsize[type]); + v->type=type; + return v; +} + +/* Array allocation function */ + +struct ArrayObject * allocate_newarray(int type, int length) { + struct ArrayObject * v=FREEMALLOC(sizeof(struct ArrayObject)+length*classsize[type]); + v->type=type; + v->___length___=length; + return v; +} +#endif + + +/* Converts C character arrays into Java strings */ +#ifdef PRECISE_GC +struct ___String___ * NewString(void * ptr, const char *str,int length) { +#else +struct ___String___ * NewString(const char *str,int length) { +#endif + int i; +#ifdef PRECISE_GC + struct ArrayObject * chararray=allocate_newarray((struct garbagelist *)ptr, CHARARRAYTYPE, length); + int ptrarray[]={1, (int) ptr, (int) chararray}; + struct ___String___ * strobj=allocate_new((struct garbagelist *) &ptrarray, STRINGTYPE); + chararray=(struct ArrayObject *) ptrarray[2]; +#else + struct ArrayObject * chararray=allocate_newarray(CHARARRAYTYPE, length); + struct ___String___ * strobj=allocate_new(STRINGTYPE); +#endif + strobj->___value___=chararray; + strobj->___count___=length; + strobj->___offset___=0; + + for(i=0;i___length___)+sizeof(int)))[i]=(short)str[i]; } + return strobj; +} + +/* Generated code calls this if we fail a bounds check */ + +void failedboundschk() { +#ifndef TASK + printf("Array out of bounds\n"); +#ifdef THREADS + threadexit(); +#else + exit(-1); +#endif +#else + longjmp(error_handler,2); +#endif +} + +/* Abort task call */ +void abort_task() { +#ifdef TASK + longjmp(error_handler,4); +#else + printf("Aborting\n"); + exit(-1); +#endif +} diff --git a/Robust/src/Runtime/multicoretask.c b/Robust/src/Runtime/multicoretask.c new file mode 100644 index 00000000..cdffe9ed --- /dev/null +++ b/Robust/src/Runtime/multicoretask.c @@ -0,0 +1,1526 @@ +#ifdef TASK +#include "runtime.h" +#include "structdefs.h" +#include "mem.h" +#include "checkpoint.h" +#include "Queue.h" +#include "SimpleHash.h" +#include "GenericHashtable.h" +#include +#include +#include +#include +#include +#include +#include +#ifdef RAW +#elif defined THREADSIMULATE +#if 0 +#include // for mmap +#include +#include +#include + +int offset_transObj = 0; +#endif + +// use POSIX message queue +// for each core, its message queue named as +// /msgqueue_corenum +#include +#include +#endif +/* +extern int injectfailures; +extern float failurechance; +*/ +extern int debugtask; +extern int instaccum; + +#ifdef CONSCHECK +#include "instrument.h" +#endif + +struct genhashtable * activetasks; +struct genhashtable * failedtasks; +struct taskparamdescriptor * currtpd; +struct RuntimeHash * forward; +struct RuntimeHash * reverse; + +int corestatus[NUMCORES]; // records status of each core + // 1: running tasks + // 0: stall +int numsendobjs[NUMCORES]; // records how many objects a core has sent out +int numreceiveobjs[NUMCORES]; // records how many objects a core has received +#ifdef THREADSIMULATE +struct thread_data { + int corenum; + int argc; + char** argv; + int numsendobjs; + int numreceiveobjs; +}; +struct thread_data thread_data_array[NUMCORES]; +mqd_t mqd[NUMCORES]; +static pthread_key_t key; +bool transStallMsg(int targetcore); +void transTerminateMsg(int targetcore); +void run(void * arg); +#endif + +int main(int argc, char **argv) { +#ifdef THREADSIMULATE + int tids[NUMCORES]; + int rc[NUMCORES]; + pthread_t threads[NUMCORES]; + int i = 0; + + // initialize three arrays and msg queue array + char * pathhead = "/msgqueue_"; + int targetlen = strlen(pathhead); + for(i = 0; i < NUMCORES; ++i) { + corestatus[i] = 1; + numsendobjs[i] = 0; + numreceiveobjs[i] = 0; + + char corenumstr[3]; + int sourcelen = 0; + if(i < 10) { + corenumstr[0] = i + '0'; + corenumstr[1] = '\0'; + sourcelen = 1; + } else if(i < 100) { + corenumstr[1] = i %10 + '0'; + corenumstr[0] = (i / 10) + '0'; + corenumstr[2] = '\0'; + sourcelen = 2; + } else { + printf("Error: i >= 100\n"); + fflush(stdout); + exit(-1); + } + char path[targetlen + sourcelen + 1]; + strcpy(path, pathhead); + strncat(path, corenumstr, sourcelen); + int oflags = O_RDONLY|O_CREAT|O_NONBLOCK; + int omodes = S_IRWXU|S_IRWXG|S_IRWXO; + mq_unlink(path); + mqd[i]= mq_open(path, oflags, omodes, NULL); + } + + // create the key + pthread_key_create(&key, NULL); + +/* if(argc < 2) { + printf("Usage: \n"); + fflush(stdout); + exit(-1); + } + + int cnum = 0; + char * number = argv[1]; + int len = strlen(number); + for(i = 0; i < len; ++i) { + cnum = (number[i] - '0') + cnum * 10; + } +*/ + for(i = 0; i < NUMCORES; ++i) { + /* if(STARTUPCORE == i) { + continue; + }*/ + thread_data_array[i].corenum = i; + thread_data_array[i].argc = argc; + thread_data_array[i].argv = argv; + thread_data_array[i].numsendobjs = 0; + thread_data_array[i].numreceiveobjs = 0; + printf("In main: creating thread %d\n", i); + rc[i] = pthread_create(&threads[i], NULL, run, (void *)&thread_data_array[i]); + if (rc[i]){ + printf("ERROR; return code from pthread_create() is %d\n", rc[i]); + fflush(stdout); + exit(-1); + } + }//*/ + + /*// do stuff of startup core + thread_data_array[STARTUPCORE].corenum = STARTUPCORE; + thread_data_array[STARTUPCORE].argc = argc;// - 1; + thread_data_array[STARTUPCORE].argv = argv;//&argv[1]; + thread_data_array[STARTUPCORE].numsendobjs = 0; + thread_data_array[STARTUPCORE].numreceiveobjs = 0; + run(&thread_data_array[STARTUPCORE]);*/ + pthread_exit(NULL); +} + +void run(void* arg) { + struct thread_data * my_tdata = (struct thread_data *)arg; + //corenum = my_tdata->corenum; + //void * ptr = malloc(sizeof(int)); + //*((int*)ptr) = my_tdata->corenum; + pthread_setspecific(key, (void *)my_tdata->corenum); + int argc = my_tdata->argc; + char** argv = my_tdata->argv; + printf("Thread %d runs: %x\n", my_tdata->corenum, (int)pthread_self()); + fflush(stdout); + +#endif + +#ifdef BOEHM_GC + GC_init(); // Initialize the garbage collector +#endif +#ifdef CONSCHECK + initializemmap(); +#endif + processOptions(); + initializeexithandler(); + /* Create table for failed tasks */ + failedtasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd, + (int (*)(void *,void *)) &comparetpd); + /* Create queue of active tasks */ + activetasks=genallocatehashtable((unsigned int (*)(void *)) &hashCodetpd, + (int (*)(void *,void *)) &comparetpd); + + /* Process task information */ + processtasks(); + + /* Create startup object */ + createstartupobject(argc, argv); + + /* Start executing the tasks */ + executetasks(); + +#ifdef THREADSIMULATE + + int i = 0; + // check if there are new objects coming + bool sendStall = false; + + int numofcore = pthread_getspecific(key); + while(true) { + switch(receiveObject()) { + case 0: { + printf("[run] receive an object\n"); + sendStall = false; + // received an object + // check if there are new active tasks can be executed + executetasks(); + break; + } + case 1: { + //printf("[run] no msg\n"); + // no msg received + if(STARTUPCORE == numofcore) { + corestatus[numofcore] = 0; + // check the status of all cores + bool allStall = true; + for(i = 0; i < NUMCORES; ++i) { + if(corestatus[i] != 0) { + allStall = false; + break; + } + } + if(allStall) { + // check if the sum of send objs and receive obj are the same + // yes->terminate + // no->go on executing + int sumsendobj = 0; + for(i = 0; i < NUMCORES; ++i) { + sumsendobj += numsendobjs[i]; + } + for(i = 0; i < NUMCORES; ++i) { + sumsendobj -= numreceiveobjs[i]; + } + if(0 == sumsendobj) { + // terminate + // TODO + /* for(i = 0; i < NUMCORES; ++i) { + if(i != corenum) { + transTerminateMsg(i); + } + } + mq_close(mqd[corenum]);*/ + printf("[run] terminate!\n"); + fflush(stdout); + exit(0); + } + } + } else { + if(!sendStall) { + // send StallMsg to startup core + sendStall = transStallMsg(STARTUPCORE); + } + } + break; + } + case 2: { + printf("[run] receive a stall msg\n"); + // receive a Stall Msg, do nothing + assert(STARTUPCORE == numofcore); // only startup core can receive such msg + sendStall = false; + break; + } + /* case 3: { + printf("[run] receive a terminate msg\n"); + // receive a terminate Msg + assert(STARTUPCORE != corenum); // only non-startup core can receive such msg + mq_close(mqd[corenum]); + fflush(stdout); + exit(0); + break; + }*/ + default: { + printf("Error: invalid message type.\n"); + fflush(stdout); + exit(-1); + break; + } + } + } +#endif +} + +void createstartupobject(int argc, char ** argv) { + int i; + + /* Allocate startup object */ +#ifdef PRECISE_GC + struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(NULL, STARTUPTYPE); + struct ArrayObject * stringarray=allocate_newarray(NULL, STRINGARRAYTYPE, argc-1); +#else + struct ___StartupObject___ *startupobject=(struct ___StartupObject___*) allocate_new(STARTUPTYPE); + struct ArrayObject * stringarray=allocate_newarray(STRINGARRAYTYPE, argc-1); +#endif + /* Build array of strings */ + startupobject->___parameters___=stringarray; + for(i=1;i___length___)+sizeof(int)))[i-1]=newstring; + } + + /* Set initialized flag for startup object */ + flagorandinit(startupobject,1,0xFFFFFFFF); + enqueueObject(startupobject, NULL, 0); + //enqueueObject(startupobject, objq4startupobj[corenum], numqueues4startupobj[corenum]); +} + +int hashCodetpd(struct taskparamdescriptor *ftd) { + int hash=(int)ftd->task; + int i; + for(i=0;inumParameters;i++){ + hash^=(int)ftd->parameterArray[i]; + } + return hash; +} + +int comparetpd(struct taskparamdescriptor *ftd1, struct taskparamdescriptor *ftd2) { + int i; + if (ftd1->task!=ftd2->task) + return 0; + for(i=0;inumParameters;i++) + if(ftd1->parameterArray[i]!=ftd2->parameterArray[i]) + return 0; + return 1; +} + +/* This function sets a tag. */ +#ifdef PRECISE_GC +void tagset(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) { +#else +void tagset(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) { +#endif + struct ___Object___ * tagptr=obj->___tags___; + if (tagptr==NULL) { + obj->___tags___=(struct ___Object___ *)tagd; + } else { + /* Have to check if it is already set */ + if (tagptr->type==TAGTYPE) { + struct ___TagDescriptor___ * td=(struct ___TagDescriptor___ *) tagptr; + if (td==tagd) + return; +#ifdef PRECISE_GC + int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd}; + struct ArrayObject * ao=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL); + obj=(struct ___Object___ *)ptrarray[2]; + tagd=(struct ___TagDescriptor___ *)ptrarray[3]; + td=(struct ___TagDescriptor___ *) obj->___tags___; +#else + struct ArrayObject * ao=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL); +#endif + ARRAYSET(ao, struct ___TagDescriptor___ *, 0, td); + ARRAYSET(ao, struct ___TagDescriptor___ *, 1, tagd); + obj->___tags___=(struct ___Object___ *) ao; + ao->___cachedCode___=2; + } else { + /* Array Case */ + int i; + struct ArrayObject *ao=(struct ArrayObject *) tagptr; + for(i=0;i___cachedCode___;i++) { + struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___*, i); + if (td==tagd) + return; + } + if (ao->___cachedCode______length___) { + ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, tagd); + ao->___cachedCode___++; + } else { +#ifdef PRECISE_GC + int ptrarray[]={2,(int) ptr, (int) obj, (int) tagd}; + struct ArrayObject * aonew=allocate_newarray(&ptrarray,TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___); + obj=(struct ___Object___ *)ptrarray[2]; + tagd=(struct ___TagDescriptor___ *) ptrarray[3]; + ao=(struct ArrayObject *)obj->___tags___; +#else + struct ArrayObject * aonew=allocate_newarray(TAGARRAYTYPE,TAGARRAYINTERVAL+ao->___length___); +#endif + aonew->___cachedCode___=ao->___length___+1; + for(i=0;i___length___;i++) { + ARRAYSET(aonew, struct ___TagDescriptor___*, i, ARRAYGET(ao, struct ___TagDescriptor___*, i)); + } + ARRAYSET(aonew, struct ___TagDescriptor___ *, ao->___length___, tagd); + } + } + } + + { + struct ___Object___ * tagset=tagd->flagptr; + if(tagset==NULL) { + tagd->flagptr=obj; + } else if (tagset->type!=OBJECTARRAYTYPE) { +#ifdef PRECISE_GC + int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd}; + struct ArrayObject * ao=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL); + obj=(struct ___Object___ *)ptrarray[2]; + tagd=(struct ___TagDescriptor___ *)ptrarray[3]; +#else + struct ArrayObject * ao=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL); +#endif + ARRAYSET(ao, struct ___Object___ *, 0, tagd->flagptr); + ARRAYSET(ao, struct ___Object___ *, 1, obj); + ao->___cachedCode___=2; + tagd->flagptr=(struct ___Object___ *)ao; + } else { + struct ArrayObject *ao=(struct ArrayObject *) tagset; + if (ao->___cachedCode______length___) { + ARRAYSET(ao, struct ___Object___*, ao->___cachedCode___++, obj); + } else { + int i; +#ifdef PRECISE_GC + int ptrarray[]={2, (int) ptr, (int) obj, (int)tagd}; + struct ArrayObject * aonew=allocate_newarray(&ptrarray,OBJECTARRAYTYPE,OBJECTARRAYINTERVAL+ao->___length___); + obj=(struct ___Object___ *)ptrarray[2]; + tagd=(struct ___TagDescriptor___ *)ptrarray[3]; + ao=(struct ArrayObject *)tagd->flagptr; +#else + struct ArrayObject * aonew=allocate_newarray(OBJECTARRAYTYPE,OBJECTARRAYINTERVAL); +#endif + aonew->___cachedCode___=ao->___cachedCode___+1; + for(i=0;i___length___;i++) { + ARRAYSET(aonew, struct ___Object___*, i, ARRAYGET(ao, struct ___Object___*, i)); + } + ARRAYSET(aonew, struct ___Object___ *, ao->___cachedCode___, obj); + tagd->flagptr=(struct ___Object___ *) aonew; + } + } + } +} + +/* This function clears a tag. */ +#ifdef PRECISE_GC +void tagclear(void *ptr, struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) { +#else +void tagclear(struct ___Object___ * obj, struct ___TagDescriptor___ * tagd) { +#endif + /* We'll assume that tag is alway there. + Need to statically check for this of course. */ + struct ___Object___ * tagptr=obj->___tags___; + + if (tagptr->type==TAGTYPE) { + if ((struct ___TagDescriptor___ *)tagptr==tagd) + obj->___tags___=NULL; + else + printf("ERROR 1 in tagclear\n"); + } else { + struct ArrayObject *ao=(struct ArrayObject *) tagptr; + int i; + for(i=0;i___cachedCode___;i++) { + struct ___TagDescriptor___ * td=ARRAYGET(ao, struct ___TagDescriptor___ *, i); + if (td==tagd) { + ao->___cachedCode___--; + if (i___cachedCode___) + ARRAYSET(ao, struct ___TagDescriptor___ *, i, ARRAYGET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___)); + ARRAYSET(ao, struct ___TagDescriptor___ *, ao->___cachedCode___, NULL); + if (ao->___cachedCode___==0) + obj->___tags___=NULL; + goto PROCESSCLEAR; + } + } + printf("ERROR 2 in tagclear\n"); + } + PROCESSCLEAR: + { + struct ___Object___ *tagset=tagd->flagptr; + if (tagset->type!=OBJECTARRAYTYPE) { + if (tagset==obj) + tagd->flagptr=NULL; + else + printf("ERROR 3 in tagclear\n"); + } else { + struct ArrayObject *ao=(struct ArrayObject *) tagset; + int i; + for(i=0;i___cachedCode___;i++) { + struct ___Object___ * tobj=ARRAYGET(ao, struct ___Object___ *, i); + if (tobj==obj) { + ao->___cachedCode___--; + if (i___cachedCode___) + ARRAYSET(ao, struct ___Object___ *, i, ARRAYGET(ao, struct ___Object___ *, ao->___cachedCode___)); + ARRAYSET(ao, struct ___Object___ *, ao->___cachedCode___, NULL); + if (ao->___cachedCode___==0) + tagd->flagptr=NULL; + goto ENDCLEAR; + } + } + printf("ERROR 4 in tagclear\n"); + } + } + ENDCLEAR: + return; +} + +/* This function allocates a new tag. */ +#ifdef PRECISE_GC +struct ___TagDescriptor___ * allocate_tag(void *ptr, int index) { + struct ___TagDescriptor___ * v=(struct ___TagDescriptor___ *) mygcmalloc((struct garbagelist *) ptr, classsize[TAGTYPE]); +#else +struct ___TagDescriptor___ * allocate_tag(int index) { + struct ___TagDescriptor___ * v=FREEMALLOC(classsize[TAGTYPE]); +#endif + v->type=TAGTYPE; + v->flag=index; + return v; +} + + + +/* This function updates the flag for object ptr. It or's the flag + with the or mask and and's it with the andmask. */ + +void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length, bool isnew); + + int flagcomp(const int *val1, const int *val2) { + return (*val1)-(*val2); + } + +void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) { + { + int oldflag=((int *)ptr)[1]; + int flag=ormask|oldflag; + flag&=andmask; + flagbody(ptr, flag, queues, length, false); + } +} + +bool intflagorand(void * ptr, int ormask, int andmask) { + { + int oldflag=((int *)ptr)[1]; + int flag=ormask|oldflag; + flag&=andmask; + if (flag==oldflag) /* Don't do anything */ + return false; + else { + flagbody(ptr, flag, NULL, 0, false); + return true; + } + } +} + +void flagorandinit(void * ptr, int ormask, int andmask) { + int oldflag=((int *)ptr)[1]; + int flag=ormask|oldflag; + flag&=andmask; + flagbody(ptr,flag,NULL,0,true); +} + +void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** vqueues, int vlength, bool isnew) { + struct parameterwrapper * flagptr = NULL; + int i = 0; + struct parameterwrapper ** queues = vqueues; + int length = vlength; + if((!isnew) && (queues == NULL)) { +#ifdef THREADSIMULATE + int numofcore = pthread_getspecific(key); + queues = objectqueues[numofcore][ptr->type]; + length = numqueues[numofcore][ptr->type]; +#else + queues = objectqueues[corenum][ptr->type]; + length = numqueues[corenum][ptr->type]; +#endif + } + ptr->flag=flag; + + /*Remove object from all queues */ + for(i = 0; i < length; ++i) { + flagptr = queues[i]; + int next; + int UNUSED, UNUSED2; + int * enterflags; + ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2); + ObjectHashremove(flagptr->objectset, (int)ptr); + if (enterflags!=NULL) + free(enterflags); + } + } + + void enqueueObject(void * vptr, struct parameterwrapper ** vqueues, int vlength) { + struct ___Object___ *ptr = (struct ___Object___ *)vptr; + + { + struct QueueItem *tmpptr; + struct parameterwrapper * parameter=NULL; + int j; + struct parameterwrapper ** queues = vqueues; + int length = vlength; + if(queues == NULL) { +#ifdef THREADSIMULATE + int numofcore = pthread_getspecific(key); + queues = objectqueues[numofcore][ptr->type]; + length = numqueues[numofcore][ptr->type]; +#else + queues = objectqueues[corenum][ptr->type]; + length = numqueues[corenum][ptr->type]; +#endif + } + int i; + struct parameterwrapper * prevptr=NULL; + struct ___Object___ *tagptr=ptr->___tags___; + + /* Outer loop iterates through all parameter queues an object of + this type could be in. */ + + for(j = 0; j < length; ++j) { + parameter = queues[j]; + /* Check tags */ + if (parameter->numbertags>0) { + if (tagptr==NULL) + goto nextloop;//that means the object has no tag but that param needs tag + else if(tagptr->type==TAGTYPE) {//one tag + struct ___TagDescriptor___ * tag=(struct ___TagDescriptor___*) tagptr; + for(i=0;inumbertags;i++) { + //slotid is parameter->tagarray[2*i]; + int tagid=parameter->tagarray[2*i+1]; + if (tagid!=tagptr->flag) + goto nextloop; /*We don't have this tag */ + } + } else {//multiple tags + struct ArrayObject * ao=(struct ArrayObject *) tagptr; + for(i=0;inumbertags;i++) { + //slotid is parameter->tagarray[2*i]; + int tagid=parameter->tagarray[2*i+1]; + int j; + for(j=0;j___cachedCode___;j++) { + if (tagid==ARRAYGET(ao, struct ___TagDescriptor___*, j)->flag) + goto foundtag; + } + goto nextloop; + foundtag: + ; + } + } + } + + /* Check flags */ + for(i=0;inumberofterms;i++) { + int andmask=parameter->intarray[i*2]; + int checkmask=parameter->intarray[i*2+1]; + if ((ptr->flag&andmask)==checkmask) { + enqueuetasks(parameter, prevptr, ptr, NULL, 0); + prevptr=parameter; + break; + } + } + nextloop: + ; + } + } +} + +// transfer an object to targetcore +// format: object +void transferObject(void * obj, int targetcore) { + int type=((int *)obj)[0]; + assert(type < NUMCLASSES); // can only transfer normal object + int size=classsize[type]; + +#ifdef RAW + +#elif defined THREADSIMULATE +#if 0 + // use shared memory to transfer objects between cores + int fd = 0; // mapped file + void * p_map = NULL; + char * filepath = "/scratch/transObj/file_" + targetcore + ".txt"; + int offset; + // open the file + fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file + offset = lseek(fd, 0, SEEK_CUR); + if(offset == -1) { + printf("fail to open file " + filepath + " in transferObject.\n"); + fflush(stdout); + exit(-1); + } + lseek(fd, size + sizeof(int)*2, SEEK_CUR); + write(fd, "", 1); + p_map = (void *)mmap(NULL,size+sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset); + close(fd); + memcpy(p_map, type, sizeof(int)); + memcpy(p_map+sizeof(int), corenum, sizeof(int)); + memcpy((p_map+sizeof(int)*2), obj, size); + munmap(p_map, size+sizeof(int)*2); + //printf( "umap ok \n" ); +#endif + + // use POSIX message queue to transfer objects between cores + mqd_t mqdnum; + char corenumstr[3]; + int sourcelen = 0; + if(targetcore < 10) { + corenumstr[0] = targetcore + '0'; + corenumstr[1] = '\0'; + sourcelen = 1; + } else if(targetcore < 100) { + corenumstr[1] = targetcore % 10 + '0'; + corenumstr[0] = (targetcore / 10) + '0'; + corenumstr[2] = '\0'; + sourcelen = 2; + } else { + printf("Error: targetcore >= 100\n"); + fflush(stdout); + exit(-1); + } + char * pathhead = "/msgqueue_"; + int targetlen = strlen(pathhead); + char path[targetlen + sourcelen + 1]; + strcpy(path, pathhead); + strncat(path, corenumstr, sourcelen); + int oflags = O_WRONLY|O_CREAT|O_NONBLOCK; + int omodes = S_IRWXU|S_IRWXG|S_IRWXO; + mqdnum = mq_open(path, oflags, omodes, NULL); + if(mqdnum==-1) { + printf("[transferObject] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno)); + fflush(stdout); + exit(-1); + } + int ret; + do { + ret=mq_send(mqdnum, obj, size, 0); // send the object into the queue + if(ret != 0) { + printf("[transferObject] mq_send returned: %d, error: %s\n", ret, strerror(errno)); + } + }while(ret!=0); + int numofcore = pthread_getspecific(key); + if(numofcore == STARTUPCORE) { + ++numsendobjs[numofcore]; + } else { + ++(thread_data_array[numofcore].numsendobjs); + } + printf("[transferObject] mq_send returned: $%x\n",ret); +#endif +} + +// send terminate message to targetcore +// format: -1 +bool transStallMsg(int targetcore) { + struct ___Object___ newobj; + // use the first four int field to hold msgtype/corenum/sendobj/receiveobj + newobj.type = -1; + +#ifdef RAW + newobj.flag = corenum; + newobj.___cachedHash___ = thread_data_array[corenum].numsendobjs; + newobj.___cachedCode___ = thread_data_array[corenum].numreceiveobjs; + +#elif defined THREADSIMULATE + int numofcore = pthread_getspecific(key); + newobj.flag = numofcore; + newobj.___cachedHash___ = thread_data_array[numofcore].numsendobjs; + newobj.___cachedCode___ = thread_data_array[numofcore].numreceiveobjs; +#if 0 + // use shared memory to transfer objects between cores + int fd = 0; // mapped file + void * p_map = NULL; + char * filepath = "/scratch/transObj/file_" + targetcore + ".txt"; + int offset; + // open the file + fd = open(filepath, O_CREAT|O_WRONLY|O_APPEND, 00777); // append to end of the file + offset = lseek(fd, 0, SEEK_CUR); + if(offset == -1) { + printf("fail to open file " + filepath + " in transferObject.\n"); + fflush(stdout); + exit(-1); + } + lseek(fd, sizeof(int)*2, SEEK_CUR); + write(fd, "", 1); + p_map = (void *)mmap(NULL,sizeof(int)*2,PROT_WRITE,MAP_SHARED,fd,offset); + close(fd); + memcpy(p_map, type, sizeof(int)); + memcpy(p_map+sizeof(int), corenum, sizeof(int)); + munmap(p_map, sizeof(int)*2); + //printf( "umap ok \n" ); +#endif + + // use POSIX message queue to send stall msg to startup core + assert(targetcore == STARTUPCORE); + mqd_t mqdnum; + char corenumstr[3]; + int sourcelen = 0; + if(targetcore < 10) { + corenumstr[0] = targetcore + '0'; + corenumstr[1] = '\0'; + sourcelen = 1; + } else if(targetcore < 100) { + corenumstr[1] = targetcore % 10 + '0'; + corenumstr[0] = (targetcore / 10) + '0'; + corenumstr[2] = '\0'; + sourcelen = 2; + } else { + printf("Error: targetcore >= 100\n"); + fflush(stdout); + exit(-1); + } + char * pathhead = "/msgqueue_"; + int targetlen = strlen(pathhead); + char path[targetlen + sourcelen + 1]; + strcpy(path, pathhead); + strncat(path, corenumstr, sourcelen); + int oflags = O_WRONLY|O_CREAT|O_NONBLOCK; + int omodes = S_IRWXU|S_IRWXG|S_IRWXO; + mqdnum = mq_open(path, oflags, omodes, NULL); + if(mqdnum==-1) { + printf("[transStallMsg] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno)); + fflush(stdout); + exit(-1); + } + int ret; + ret=mq_send(mqdnum, (void *)&newobj, sizeof(struct ___Object___), 0); // send the object into the queue + if(ret != 0) { + printf("[transStallMsg] mq_send returned: %d, error: %s\n", ret, strerror(errno)); + return false; + } else { + printf("[transStallMsg] mq_send returned: $%x\n", ret); + printf("index: %d, sendobjs: %d, receiveobjs: %d\n", newobj.flag, newobj.___cachedHash___, newobj.___cachedCode___); + return true; + } +#endif +} +#if 0 +// send terminate message to targetcore +// format: -1 +void transTerminateMsg(int targetcore) { + // use the first four int field to hold msgtype/corenum/sendobj/receiveobj + int type = -2; + +#ifdef RAW + +#elif defined THREADSIMULATE + + // use POSIX message queue to send stall msg to startup core + assert(targetcore != STARTUPCORE); + mqd_t mqdnum; + char corenumstr[3]; + int sourcelen = 0; + if(targetcore < 10) { + corenumstr[0] = targetcore + '0'; + corenumstr[1] = '\0'; + sourcelen = 1; + } else if(corenum < 100) { + corenumstr[1] = targetcore % 10 + '0'; + corenumstr[0] = (targetcore / 10) + '0'; + corenumstr[2] = '\0'; + sourcelen = 2; + } else { + printf("Error: targetcore >= 100\n"); + fflush(stdout); + exit(-1); + } + char * pathhead = "/msgqueue_"; + int targetlen = strlen(pathhead); + char path[targetlen + sourcelen + 1]; + strcpy(path, pathhead); + strncat(path, corenumstr, sourcelen); + int oflags = O_WRONLY|O_CREAT|O_NONBLOCK; + int omodes = S_IRWXU|S_IRWXG|S_IRWXO; + mqdnum = mq_open(path, oflags, omodes, NULL); + if(mqdnum==-1) { + printf("[transStallMsg] mq_open fail: %d, error: %s\n", mqdnum, strerror(errno)); + fflush(stdout); + exit(-1); + } + int ret; + do { + ret=mq_send(mqdnum, (void *)&type, sizeof(int), 0); // send the object into the queue + if(ret != 0) { + printf("[transStallMsg] mq_send returned: %d, error: %s\n", ret, strerror(errno)); + } + }while(ret != 0); +#endif +} +#endif +// receive object transferred from other cores +// or the terminate message from other cores +// format: type [+ object] +// type: -1--stall msg +// !-1--object +// return value: 0--received an object +// 1--received nothing +// 2--received a Stall Msg +int receiveObject() { +#ifdef RAW + +#elif defined THREADSIMULATE +#if 0 + char * filepath = "/scratch/transObj/file_" + corenum + ".txt"; + int fd = 0; + void * p_map = NULL; + int type = 0; + int sourcecorenum = 0; + int size = 0; + fd = open(filepath, O_CREAT|O_RDONLY, 00777); + lseek(fd, offset_transObj, SEEK_SET); + p_map = (void*)mmap(NULL,sizeof(int)*2,PROT_READ,MAP_SHARED,fd,offset_transObj); + type = *(int*)p_map; + sourcecorenum = *(int*)(p_map+sinzeof(int)); + offset_transObj += sizeof(int)*2; + munmap(p_map,sizeof(int)*2); + if(type == -1) { + // sourecorenum has terminated + ++offset_transObj; + return; + } + size = classsize[type]; + p_map = (void*)mmap(NULL,size,PROT_READ,MAP_SHARED,fd,offset_transObj); + struct ___Object___ * newobj=RUNMALLOC(size); + memcpy(newobj, p_map, size); + ++offset_transObj; + enqueueObject(newobj,NULL,0); +#endif + int numofcore = pthread_getspecific(key); + // use POSIX message queue to transfer object + int msglen = 0; + struct mq_attr mqattr; + mq_getattr(mqd[numofcore], &mqattr); + void * msgptr =RUNMALLOC(mqattr.mq_msgsize); + msglen=mq_receive(mqd[numofcore], msgptr, mqattr.mq_msgsize, NULL); // receive the object into the queue + if(-1 == msglen) { + // no msg + free(msgptr); + return 1; + } + //printf("msg: %s\n",msgptr); + if(((int*)msgptr)[0] == -1) { + // StallMsg + int* tmpptr = (int*)msgptr; + int index = tmpptr[1]; + corestatus[index] = 0; + numsendobjs[index] = tmpptr[2]; + numreceiveobjs[index] = ((int *)(msgptr + sizeof(int) * 3 + sizeof(void *)))[0]; + printf("index: %d, sendobjs: %d, reveiveobjs: %d\n", index, numsendobjs[index], numreceiveobjs[index]); + free(msgptr); + return 2; + } /*else if(((int*)msgptr)[0] == -2) { + // terminate msg + return 3; + } */else { + // an object + if(numofcore == STARTUPCORE) { + ++(numreceiveobjs[numofcore]); + } else { + ++(thread_data_array[numofcore].numreceiveobjs); + } + struct ___Object___ * newobj=RUNMALLOC(msglen); + memcpy(newobj, msgptr, msglen); + free(msgptr); + enqueueObject(newobj, NULL, 0); + return 0; + } +#endif +} + +int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *prevptr, struct ___Object___ *ptr, int * enterflags, int numenterflags) { + void * taskpointerarray[MAXTASKPARAMS]; + int j; + int numparams=parameter->task->numParameters; + int numiterators=parameter->task->numTotal-1; + int retval=1; + int addnormal=1; + int adderror=1; + + struct taskdescriptor * task=parameter->task; + + ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper + + /* Add enqueued object to parameter vector */ + taskpointerarray[parameter->slot]=ptr; + + /* Reset iterators */ + for(j=0;jiterators[j]); + } + + /* Find initial state */ + for(j=0;jiterators[j], taskpointerarray OPTARG(failed))) + toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)); + else if (j>0) { + /* Need to backtrack */ + toiReset(¶meter->iterators[j]); + j--; + goto backtrackinit; + } else { + /* Nothing to enqueue */ + return retval; + } + } + + + while(1) { + /* Enqueue current state */ + int launch = 0; + struct taskparamdescriptor *tpd=RUNMALLOC(sizeof(struct taskparamdescriptor)); + tpd->task=task; + tpd->numParameters=numiterators+1; + tpd->parameterArray=RUNMALLOC(sizeof(void *)*(numiterators+1)); + for(j=0;j<=numiterators;j++){ + tpd->parameterArray[j]=taskpointerarray[j];//store the actual parameters + } + /* Enqueue task */ + if ((!gencontains(failedtasks, tpd)&&!gencontains(activetasks,tpd))) { + genputtable(activetasks, tpd, tpd); + } else { + RUNFREE(tpd->parameterArray); + RUNFREE(tpd); + } + + /* This loop iterates to the next parameter combination */ + if (numiterators==0) + return retval; + + for(j=numiterators-1; jiterators[j], taskpointerarray OPTARG(failed))) + toiNext(¶meter->iterators[j], taskpointerarray OPTARG(failed)); + else if (j>0) { + /* Need to backtrack */ + toiReset(¶meter->iterators[j]); + j--; + goto backtrackinc; + } else { + /* Nothing more to enqueue */ + return retval; + } + } + } + return retval; +} + +/* Handler for signals. The signals catch null pointer errors and + arithmatic errors. */ + +void myhandler(int sig, siginfo_t *info, void *uap) { + sigset_t toclear; +#ifdef DEBUG + printf("sig=%d\n",sig); + printf("signal\n"); +#endif + sigemptyset(&toclear); + sigaddset(&toclear, sig); + sigprocmask(SIG_UNBLOCK, &toclear,NULL); + longjmp(error_handler,1); +} + +fd_set readfds; +int maxreadfd; +struct RuntimeHash *fdtoobject; + +void addreadfd(int fd) { + if (fd>=maxreadfd) + maxreadfd=fd+1; + FD_SET(fd, &readfds); +} + +void removereadfd(int fd) { + FD_CLR(fd, &readfds); + if (maxreadfd==(fd+1)) { + maxreadfd--; + while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds)) + maxreadfd--; + } +} + +#ifdef PRECISE_GC +#define OFFSET 2 +#else +#define OFFSET 0 +#endif + +void executetasks() { + void * taskpointerarray[MAXTASKPARAMS+OFFSET]; + + /* Set up signal handlers */ + struct sigaction sig; + sig.sa_sigaction=&myhandler; + sig.sa_flags=SA_SIGINFO; + sigemptyset(&sig.sa_mask); + + /* Catch bus errors, segmentation faults, and floating point exceptions*/ + sigaction(SIGBUS,&sig,0); + sigaction(SIGSEGV,&sig,0); + sigaction(SIGFPE,&sig,0); + sigaction(SIGPIPE,&sig,0); + + /* Zero fd set */ + FD_ZERO(&readfds); + maxreadfd=0; + fdtoobject=allocateRuntimeHash(100); + + /* Map first block of memory to protected, anonymous page */ + mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0); + + newtask: + while((hashsize(activetasks)>0)||(maxreadfd>0)) { + + /* Check if any filedescriptors have IO pending */ + if (maxreadfd>0) { + int i; + struct timeval timeout={0,0}; + fd_set tmpreadfds; + int numselect; + tmpreadfds=readfds; + numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout); + if (numselect>0) { + /* Process ready fd's */ + int fd; + for(fd=0;fd0) { + int i; + currtpd=(struct taskparamdescriptor *) getfirstkey(activetasks); + genfreekey(activetasks, currtpd); + + /* Check if this task has failed, allow a task that contains optional objects to fire */ + if (gencontains(failedtasks, currtpd)) { + // Free up task parameter descriptor + RUNFREE(currtpd->parameterArray); + RUNFREE(currtpd); + goto newtask; + } + int numparams=currtpd->task->numParameters; + int numtotal=currtpd->task->numTotal; + + /* Make sure that the parameters are still in the queues */ + for(i=0;iparameterArray[i]; + struct parameterdescriptor * pd=currtpd->task->descriptorarray[i]; + struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue; + int j; + /* Check that object is still in queue */ + { + if (!ObjectHashcontainskey(pw->objectset, (int) parameter)) { + RUNFREE(currtpd->parameterArray); + RUNFREE(currtpd); + goto newtask; + } + } + parameterpresent: + ; + /* Check that object still has necessary tags */ + for(j=0;jnumbertags;j++) { + int slotid=pd->tagarray[2*j]+numparams; + struct ___TagDescriptor___ *tagd=currtpd->parameterArray[slotid]; + if (!containstag(parameter, tagd)) { + RUNFREE(currtpd->parameterArray); + RUNFREE(currtpd); + goto newtask; + } + } + + taskpointerarray[i+OFFSET]=parameter; + } + /* Copy the tags */ + for(;iparameterArray[i]; + } + + { + /* Checkpoint the state */ + forward=allocateRuntimeHash(100); + reverse=allocateRuntimeHash(100); + //void ** checkpoint=makecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, forward, reverse); + int x; + if (x=setjmp(error_handler)) { + int counter; + /* Recover */ + +#ifdef DEBUG + printf("Fatal Error=%d, Recovering!\n",x); +#endif + /* + genputtable(failedtasks,currtpd,currtpd); + //restorecheckpoint(currtpd->task->numParameters, currtpd->parameterArray, checkpoint, forward, reverse); + + freeRuntimeHash(forward); + freeRuntimeHash(reverse); + freemalloc(); + forward=NULL; + reverse=NULL; + */ + fflush(stdout); + exit(-1); + } else { + /*if (injectfailures) { + if ((((double)random())/RAND_MAX)task->name); + longjmp(error_handler,10); + } + }*/ + /* Actually call task */ +#ifdef PRECISE_GC + ((int *)taskpointerarray)[0]=currtpd->numParameters; + taskpointerarray[1]=NULL; +#endif + if(debugtask){ + printf("ENTER %s count=%d\n",currtpd->task->name, (instaccum-instructioncount)); + ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray); + printf("EXIT %s count=%d\n",currtpd->task->name, (instaccum-instructioncount)); + } else + ((void (*) (void **)) currtpd->task->taskptr)(taskpointerarray); + + freeRuntimeHash(forward); + freeRuntimeHash(reverse); + freemalloc(); + // Free up task parameter descriptor + RUNFREE(currtpd->parameterArray); + RUNFREE(currtpd); + forward=NULL; + reverse=NULL; + } + } + } + } +} + +/* This function processes an objects tags */ +void processtags(struct parameterdescriptor *pd, int index, struct parameterwrapper *parameter, int * iteratorcount, int *statusarray, int numparams) { + int i; + + for(i=0;inumbertags;i++) { + int slotid=pd->tagarray[2*i]; + int tagid=pd->tagarray[2*i+1]; + + if (statusarray[slotid+numparams]==0) { + parameter->iterators[*iteratorcount].istag=1; + parameter->iterators[*iteratorcount].tagid=tagid; + parameter->iterators[*iteratorcount].slot=slotid+numparams; + parameter->iterators[*iteratorcount].tagobjectslot=index; + statusarray[slotid+numparams]=1; + (*iteratorcount)++; + } + } +} + + +void processobject(struct parameterwrapper *parameter, int index, struct parameterdescriptor *pd, int *iteratorcount, int * statusarray, int numparams) { + int i; + int tagcount=0; + struct ObjectHash * objectset=((struct parameterwrapper *)pd->queue)->objectset; + + parameter->iterators[*iteratorcount].istag=0; + parameter->iterators[*iteratorcount].slot=index; + parameter->iterators[*iteratorcount].objectset=objectset; + statusarray[index]=1; + + for(i=0;inumbertags;i++) { + int slotid=pd->tagarray[2*i]; + int tagid=pd->tagarray[2*i+1]; + if (statusarray[slotid+numparams]!=0) { + /* This tag has already been enqueued, use it to narrow search */ + parameter->iterators[*iteratorcount].tagbindings[tagcount]=slotid+numparams; + tagcount++; + } + } + parameter->iterators[*iteratorcount].numtags=tagcount; + + (*iteratorcount)++; +} + +/* This function builds the iterators for a task & parameter */ + +void builditerators(struct taskdescriptor * task, int index, struct parameterwrapper * parameter) { + int statusarray[MAXTASKPARAMS]; + int i; + int numparams=task->numParameters; + int iteratorcount=0; + for(i=0;idescriptorarray[index], index, parameter, & iteratorcount, statusarray, numparams); + + while(1) { + loopstart: + /* Check for objects with existing tags */ + for(i=0;idescriptorarray[i]; + int j; + for(j=0;jnumbertags;j++) { + int slotid=pd->tagarray[2*j]; + if(statusarray[slotid+numparams]!=0) { + processobject(parameter, i, pd, &iteratorcount, statusarray, numparams); + processtags(pd, i, parameter, &iteratorcount, statusarray, numparams); + goto loopstart; + } + } + } + } + + /* Next do objects w/ unbound tags*/ + + for(i=0;idescriptorarray[i]; + if (pd->numbertags>0) { + processobject(parameter, i, pd, &iteratorcount, statusarray, numparams); + processtags(pd, i, parameter, &iteratorcount, statusarray, numparams); + goto loopstart; + } + } + } + + /* Nothing with a tag enqueued */ + + for(i=0;idescriptorarray[i]; + processobject(parameter, i, pd, &iteratorcount, statusarray, numparams); + processtags(pd, i, parameter, &iteratorcount, statusarray, numparams); + goto loopstart; + } + } + + /* Nothing left */ + return; + } +} + + void printdebug() { + int i; + int j; +#ifdef THREADSIMULATE + int numofcore = pthread_getspecific(key); + for(i=0;iname); + for(j=0;jnumParameters;j++) { + struct parameterdescriptor *param=task->descriptorarray[j]; + struct parameterwrapper *parameter=param->queue; + struct ObjectHash * set=parameter->objectset; + struct ObjectIterator objit; + printf(" Parameter %d\n", j); + ObjectHashiterator(set, &objit); + while(ObjhasNext(&objit)) { + struct ___Object___ * obj=(struct ___Object___ *)Objkey(&objit); + struct ___Object___ * tagptr=obj->___tags___; + int nonfailed=Objdata4(&objit); + int numflags=Objdata3(&objit); + int flags=Objdata2(&objit); + Objnext(&objit); + printf(" Contains %lx\n", obj); + printf(" flag=%d\n", obj->flag); + if (tagptr==NULL) { + } else if (tagptr->type==TAGTYPE) { + printf(" tag=%lx\n",tagptr); + } else { + int tagindex=0; + struct ArrayObject *ao=(struct ArrayObject *)tagptr; + for(;tagindex___cachedCode___;tagindex++) { + printf(" tag=%lx\n",ARRAYGET(ao, struct ___TagDescriptor___*, tagindex)); + } + } + } + } + } + } + + +/* This function processes the task information to create queues for + each parameter type. */ + +void processtasks() { + int i; +#ifdef THREADSIMULATE + int numofcore = pthread_getspecific(key); + for(i=0;inumParameters;j++) { + struct parameterdescriptor *param=task->descriptorarray[j]; + struct parameterwrapper *parameter=param->queue; + parameter->objectset=allocateObjectHash(10); + parameter->task=task; + builditerators(task, j, parameter); + } + } +} + +void toiReset(struct tagobjectiterator * it) { + if (it->istag) { + it->tagobjindex=0; + } else if (it->numtags>0) { + it->tagobjindex=0; + } else { + ObjectHashiterator(it->objectset, &it->it); + } +} + +int toiHasNext(struct tagobjectiterator *it, void ** objectarray OPTARG(int * failed)) { + if (it->istag) { + /* Iterate tag */ + /* Get object with tags */ + struct ___Object___ *obj=objectarray[it->tagobjectslot]; + struct ___Object___ *tagptr=obj->___tags___; + if (tagptr->type==TAGTYPE) { + if ((it->tagobjindex==0)&& /* First object */ + (it->tagid==((struct ___TagDescriptor___ *)tagptr)->flag)) /* Right tag type */ + return 1; + else + return 0; + } else { + struct ArrayObject *ao=(struct ArrayObject *) tagptr; + int tagindex=it->tagobjindex; + for(;tagindex___cachedCode___;tagindex++) { + struct ___TagDescriptor___ *td=ARRAYGET(ao, struct ___TagDescriptor___ *, tagindex); + if (td->flag==it->tagid) { + it->tagobjindex=tagindex; /* Found right type of tag */ + return 1; + } + } + return 0; + } + } else if (it->numtags>0) { + /* Use tags to locate appropriate objects */ + struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]]; + struct ___Object___ *objptr=tag->flagptr; + int i; + if (objptr->type!=OBJECTARRAYTYPE) { + if (it->tagobjindex>0) + return 0; + if (!ObjectHashcontainskey(it->objectset, (int) objptr)) + return 0; + for(i=1;inumtags;i++) { + struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]]; + if (!containstag(objptr,tag2)) + return 0; + } + return 1; + } else { + struct ArrayObject *ao=(struct ArrayObject *) objptr; + int tagindex; + int i; + for(tagindex=it->tagobjindex;tagindex___cachedCode___;tagindex++) { + struct ___Object___ *objptr=ARRAYGET(ao, struct ___Object___*, tagindex); + if (!ObjectHashcontainskey(it->objectset, (int) objptr)) + continue; + for(i=1;inumtags;i++) { + struct ___TagDescriptor___ *tag2=objectarray[it->tagbindings[i]]; + if (!containstag(objptr,tag2)) + goto nexttag; + } + it->tagobjindex=tagindex; + return 1; + nexttag: + ; + } + it->tagobjindex=tagindex; + return 0; + } + } else { + return ObjhasNext(&it->it); + } +} + +int containstag(struct ___Object___ *ptr, struct ___TagDescriptor___ *tag) { + int j; + struct ___Object___ * objptr=tag->flagptr; + if (objptr->type==OBJECTARRAYTYPE) { + struct ArrayObject *ao=(struct ArrayObject *)objptr; + for(j=0;j___cachedCode___;j++) { + if (ptr==ARRAYGET(ao, struct ___Object___*, j)) + return 1; + } + return 0; + } else + return objptr==ptr; +} + +void toiNext(struct tagobjectiterator *it , void ** objectarray OPTARG(int * failed)) { + /* hasNext has all of the intelligence */ + if(it->istag) { + /* Iterate tag */ + /* Get object with tags */ + struct ___Object___ *obj=objectarray[it->tagobjectslot]; + struct ___Object___ *tagptr=obj->___tags___; + if (tagptr->type==TAGTYPE) { + it->tagobjindex++; + objectarray[it->slot]=tagptr; + } else { + struct ArrayObject *ao=(struct ArrayObject *) tagptr; + objectarray[it->slot]=ARRAYGET(ao, struct ___TagDescriptor___ *, it->tagobjindex++); + } + } else if (it->numtags>0) { + /* Use tags to locate appropriate objects */ + struct ___TagDescriptor___ *tag=objectarray[it->tagbindings[0]]; + struct ___Object___ *objptr=tag->flagptr; + if (objptr->type!=OBJECTARRAYTYPE) { + it->tagobjindex++; + objectarray[it->slot]=objptr; + } else { + struct ArrayObject *ao=(struct ArrayObject *) objptr; + objectarray[it->slot]=ARRAYGET(ao, struct ___Object___ *, it->tagobjindex++); + } + } else { + /* Iterate object */ + objectarray[it->slot]=(void *)Objkey(&it->it); + Objnext(&it->it); + } +} +#endif diff --git a/Robust/src/Runtime/socket.c b/Robust/src/Runtime/socket.c index e9ce2ffe..278ad182 100644 --- a/Robust/src/Runtime/socket.c +++ b/Robust/src/Runtime/socket.c @@ -114,7 +114,8 @@ int CALL12(___Socket______nativeBind_____AR_B_I, int ___port___, struct ArrayOb #endif } -struct ArrayObject * CALL01(___InetAddress______getHostByName_____AR_B, struct ___ArrayObject___ * ___hostname___) { +struct ArrayObject * CALL01(___InetAddress______getHostByName_____AR_B, struct ArrayObject * ___hostname___) { +//struct ArrayObject * CALL01(___InetAddress______getHostByName_____AR_B, struct ___ArrayObject___ * ___hostname___) { int length=VAR(___hostname___)->___length___; int i,j,n; char * str=malloc(length+1); @@ -293,8 +294,10 @@ int CALL02(___ServerSocket______nativeaccept____L___Socket___,struct ___ServerSo RuntimeHashadd(fdtoobject, newfd, (int) VAR(___s___)); addreadfd(newfd); #ifdef MULTICORE - flagorand(VAR(___this___),0,0xFFFFFFFE,objq4socketobj[corenum],numqueues4socketobj[corenum]); - enqueueObject(VAR(___this___), objq4socketobj[corenum], numqueues4socketobj[corenum]); + flagorand(VAR(___this___),0,0xFFFFFFFE,NULL,0); + enqueueObject(VAR(___this___), NULL, 0); + //flagorand(VAR(___this___),0,0xFFFFFFFE,objq4socketobj[corenum],numqueues4socketobj[corenum]); + //enqueueObject(VAR(___this___), objq4socketobj[corenum], numqueues4socketobj[corenum]); #else flagorand(VAR(___this___),0,0xFFFFFFFE); enqueueObject(VAR(___this___)); @@ -363,8 +366,10 @@ int CALL02(___Socket______nativeRead_____AR_B, struct ___Socket___ * ___this___, } #ifdef TASK #ifdef MULTICORE - flagorand(VAR(___this___),0,0xFFFFFFFE,objq4socketobj[corenum],numqueues4socketobj[corenum]); - enqueueObject(VAR(___this___),objq4socketobj[corenum],numqueues4socketobj[corenum]); + flagorand(VAR(___this___),0,0xFFFFFFFE,NULL,0); + enqueueObject(VAR(___this___), NULL, 0); + //flagorand(VAR(___this___),0,0xFFFFFFFE,objq4socketobj[corenum],numqueues4socketobj[corenum]); + //enqueueObject(VAR(___this___),objq4socketobj[corenum],numqueues4socketobj[corenum]); #else flagorand(VAR(___this___),0,0xFFFFFFFE); enqueueObject(VAR(___this___)); @@ -381,8 +386,10 @@ void CALL01(___Socket______nativeClose____, struct ___Socket___ * ___this___) { RuntimeHashremove(fdtoobject, fd, data); removereadfd(fd); #ifdef MULTICORE - flagorand(VAR(___this___),0,0xFFFFFFFE,objq4socketobj[corenum],numqueues4socketobj[corenum]); - enqueueObject(VAR(___this___),objq4socketobj[corenum],numqueues4socketobj[corenum]); + flagorand(VAR(___this___),0,0xFFFFFFFE,NULL,0); + enqueueObject(VAR(___this___), NULL, 0); + //flagorand(VAR(___this___),0,0xFFFFFFFE,objq4socketobj[corenum],numqueues4socketobj[corenum]); + //enqueueObject(VAR(___this___),objq4socketobj[corenum],numqueues4socketobj[corenum]); #else flagorand(VAR(___this___),0,0xFFFFFFFE); enqueueObject(VAR(___this___)); diff --git a/Robust/src/Runtime/task.c b/Robust/src/Runtime/task.c index ac045dad..142f076d 100644 --- a/Robust/src/Runtime/task.c +++ b/Robust/src/Runtime/task.c @@ -22,9 +22,7 @@ extern int instaccum; #endif struct genhashtable * activetasks; -#ifndef MULTICORE struct parameterwrapper * objectqueues[NUMCLASSES]; -#endif struct genhashtable * failedtasks; struct taskparamdescriptor * currtpd; struct RuntimeHash * forward; @@ -80,13 +78,8 @@ void createstartupobject(int argc, char ** argv) { } /* Set initialized flag for startup object */ -#ifdef MULTICORE - flagorand(startupobject,1,0xFFFFFFFF,NULL,0); - enqueueObject(startupobject, objq4startupobj[corenum], numqueues4startupobj[corenum]); -#else flagorand(startupobject,1,0xFFFFFFFF); enqueueObject(startupobject); -#endif } int hashCodetpd(struct taskparamdescriptor *ftd) { @@ -296,11 +289,7 @@ struct ___TagDescriptor___ * allocate_tag(int index) { /* This function updates the flag for object ptr. It or's the flag with the or mask and and's it with the andmask. */ -#ifdef MULTICORE -void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length); -#else void flagbody(struct ___Object___ *ptr, int flag); -#endif #ifdef OPTIONAL void enqueueoptional(struct ___Object___ * currobj, int numfailedfses, int * failedfses, struct taskdescriptor * task, int index); #endif @@ -309,11 +298,7 @@ void enqueueoptional(struct ___Object___ * currobj, int numfailedfses, int * fai return (*val1)-(*val2); } -#ifdef MULTICORE -void flagorand(void * ptr, int ormask, int andmask, struct parameterwrapper ** queues, int length) { -#else void flagorand(void * ptr, int ormask, int andmask) { -#endif #ifdef OPTIONAL struct ___Object___ * obj = (struct ___Object___ *)ptr; if(obj->numfses){/*store the information about fses*/ @@ -336,11 +321,7 @@ void flagorand(void * ptr, int ormask, int andmask) { int oldflag=((int *)ptr)[1]; int flag=ormask|oldflag; flag&=andmask; -#ifdef MULTICORE - flagbody(ptr, flag, queues, length); -#else flagbody(ptr, flag); -#endif } } @@ -370,11 +351,7 @@ bool intflagorand(void * ptr, int ormask, int andmask) { if (flag==oldflag) /* Don't do anything */ return false; else { -#ifdef MULTICORE - flagbody(ptr, flag, NULL, 0); -#else flagbody(ptr, flag); -#endif return true; } } @@ -384,67 +361,32 @@ void flagorandinit(void * ptr, int ormask, int andmask) { int oldflag=((int *)ptr)[1]; int flag=ormask|oldflag; flag&=andmask; -#ifdef MULTICORE - flagbody(ptr,flag,NULL,0); -#else flagbody(ptr,flag); -#endif } -#ifdef MULTICORE -void flagbody(struct ___Object___ *ptr, int flag, struct parameterwrapper ** queues, int length) { -#else void flagbody(struct ___Object___ *ptr, int flag) { -#endif -#ifdef MULTICORE - struct parameterwrapper * flagptr = NULL; - int i = 0; -#else struct parameterwrapper *flagptr=(struct parameterwrapper *)ptr->flagptr; -#endif ptr->flag=flag; /*Remove object from all queues */ -#ifdef MULTICORE - for(i = 0; i < length; ++i) { - flagptr = queues[i]; -#else while(flagptr!=NULL) { -#endif -#ifdef MULTICORE - int next; -#else struct parameterwrapper *next; -#endif int UNUSED, UNUSED2; int * enterflags; ObjectHashget(flagptr->objectset, (int) ptr, (int *) &next, (int *) &enterflags, &UNUSED, &UNUSED2); ObjectHashremove(flagptr->objectset, (int)ptr); if (enterflags!=NULL) free(enterflags); -#ifdef MULTICORE - ; -#else flagptr=next; -#endif } } -#ifdef MULTICORE - void enqueueObject(void * vptr, struct parameterwrapper ** queues, int length) { -#else void enqueueObject(void *vptr) { -#endif struct ___Object___ *ptr = (struct ___Object___ *)vptr; { struct QueueItem *tmpptr; -#ifdef MULTICORE - struct parameterwrapper * parameter=NULL; - int j; -#else struct parameterwrapper * parameter=objectqueues[ptr->type]; -#endif int i; struct parameterwrapper * prevptr=NULL; struct ___Object___ *tagptr=ptr->___tags___; @@ -452,12 +394,7 @@ void flagbody(struct ___Object___ *ptr, int flag) { /* Outer loop iterates through all parameter queues an object of this type could be in. */ -#ifdef MULTICORE - for(j = 0; j < length; ++j) { - parameter = queues[j]; -#else while(parameter!=NULL) { -#endif /* Check tags */ if (parameter->numbertags>0) { if (tagptr==NULL) @@ -498,52 +435,12 @@ void flagbody(struct ___Object___ *ptr, int flag) { } } nextloop: -#ifdef MULTICORE - ; -#else parameter=parameter->next; -#endif } -#ifdef MULTICORE - /*if(prevptr != NULL) { - ptr->flagptr=prevptr->arrayindex; - } else { - ptr->flagptr = 0; - }*/ -#else ptr->flagptr=prevptr; -#endif } } -#ifdef MULTICORE - -void transferObject(void * obj, int targetcore) { - int type=((int *)obj)[0]; - // if (type___localcopy___=newobj; - //} else { - /* We have an array */ - /*struct ArrayObject *ao=(struct ArrayObject *)obj; - int elementsize=classsize[type]; - int length=ao->___length___; - int size=sizeof(struct ArrayObject)+length*elementsize; - struct ___Object___ * newobj=FREEMALLOC(size); - memcpy(newobj, obj, size); - obj->___localcopy___=newobj; - }*/ -} - -#endif - #ifdef OPTIONAL int checktags(struct ___Object___ * currobj, struct fsanalysiswrapper * fswrapper) { @@ -987,15 +884,7 @@ int enqueuetasks(struct parameterwrapper *parameter, struct parameterwrapper *pr retval=0; } else { #endif -#ifdef MULTICORE - /*int arrayindex = 0; - if(prevptr != NULL) { - arrayindex = prevptr->arrayindex; - }*/ - ObjectHashadd(parameter->objectset, (int) ptr, 0, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper -#else ObjectHashadd(parameter->objectset, (int) ptr, (int) prevptr, (int) enterflags, numenterflags, enterflags==NULL);//this add the object to parameterwrapper -#endif #ifdef OPTIONAL } #endif @@ -1180,11 +1069,7 @@ void executetasks() { // printf("Setting fd %d\n",fd); if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) { if(intflagorand(objptr,1,0xFFFFFFFF)) { /* Set the first flag to 1 */ -#ifdef MULTICORE - enqueueObject(objptr, NULL, 0); -#else enqueueObject(objptr); -#endif } } } @@ -1468,13 +1353,8 @@ void builditerators(struct taskdescriptor * task, int index, struct parameterwra void printdebug() { int i; int j; -#ifdef MULTICORE - for(i=0;iname); for(j=0;jnumParameters;j++) { struct parameterdescriptor *param=task->descriptorarray[j]; @@ -1518,16 +1398,10 @@ void builditerators(struct taskdescriptor * task, int index, struct parameterwra void processtasks() { int i; -#ifdef MULTICORE - for(i=0;inumParameters;j++) { struct parameterdescriptor *param=task->descriptorarray[j]; struct parameterwrapper * parameter=RUNMALLOC(sizeof(struct parameterwrapper)); @@ -1546,16 +1420,11 @@ void processtasks() { ptr=&((*ptr)->next); (*ptr)=parameter; } -#endif /* Build iterators for parameters */ for(j=0;jnumParameters;j++) { struct parameterdescriptor *param=task->descriptorarray[j]; struct parameterwrapper *parameter=param->queue; -#ifdef MULTICORE - parameter->objectset=allocateObjectHash(10); - parameter->task=task; -#endif builditerators(task, j, parameter); } } diff --git a/Robust/src/buildscript b/Robust/src/buildscript index 4438f199..c03b5802 100755 --- a/Robust/src/buildscript +++ b/Robust/src/buildscript @@ -16,7 +16,9 @@ echo -taskstate do task state analysis echo -tagstate do tag state analysis echo -scheduling do task scheduling echo -multicore generate multi-core version binary -echo "-raw generate raw version binary (should be used with -multicore)" +echo "-numcore set the number of cores (should be used together with -multicore), defaultly set as 1" +echo "-raw generate raw version binary (should be used together with -multicore)" +echo -threadsimulate generate multi-thread simulate version binary echo -optional enable optional echo -debug generate debug symbols echo -prefetch do prefetch analysis @@ -43,6 +45,7 @@ CHECKFLAG=false RECOVERFLAG=false MULTICOREFLAG=false RAWFLAG=false +THREADSIMULATEFLAG=false; USEDMALLOC=false THREADFLAG=false SPECDIR=`pwd` @@ -118,10 +121,17 @@ elif [[ $1 = '-multicore' ]] then MULTICOREFLAG=true JAVAOPTS="$JAVAOPTS -multicore" +elif [[ $1 = '-numcore' ]] +then +JAVAOPTS="$JAVAOPTS -numcore $2" +shift elif [[ $1 = '-raw' ]] then RAWFLAG=true #JAVAOPTS="$JAVAOPTS -raw" +elif [[ $1 = '-threadsimulate' ]] +then +THREADSIMULATEFLAG=true elif [[ $1 = '-optional' ]] then JAVAOPTS="$JAVAOPTS -optional" @@ -194,12 +204,21 @@ fi # Build bristlecone/java sources +if $MULTICOREFLAG +then +if ! java $JAVAFORWARDOPTS -classpath $ROBUSTROOT/../cup/:$ROBUSTROOT Main.Main -classlibrary \ +$ROBUSTROOT/ClassLibrary/ -dir $BUILDDIR \ +$JAVAOPTS $SRCFILES +then exit $? +fi +else #if ! java -Xms5m -Xmx100m $JAVAFORWARDOPTS -cp $ROBUSTROOT/../cup/:$ROBUSTROOT Main.Main -classlibrary \ if ! java $JAVAFORWARDOPTS -classpath $ROBUSTROOT/../cup/:$ROBUSTROOT Main.Main -classlibrary \ $ROBUSTROOT/ClassLibrary/ -dir $BUILDDIR -precise \ $JAVAOPTS $SRCFILES then exit $? fi +fi # Build all of the consistency specs @@ -244,7 +263,14 @@ cd $CURDIR INCLUDES="$INCLUDES -I$ROBUSTROOT/Runtime -I. -IRuntime/include \ -I$BUILDDIR" -FILES="$ROBUSTROOT/Runtime/runtime.c $ROBUSTROOT/Runtime/task.c \ +if $MULTICOREFLAG +then +RUNTIMEFILE="$ROBUSTROOT/Runtime/multicoreruntime.c $ROBUSTROOT/Runtime/multicoretask.c" +else +RUNTIMEFILE="$ROBUSTROOT/Runtime/runtime.c $ROBUSTROOT/Runtime/task.c" +fi + +FILES="$RUNTIMEFILE \ $ROBUSTROOT/Runtime/file.c $ROBUSTROOT/Runtime/Queue.c \ $ROBUSTROOT/Runtime/SimpleHash.c $ROBUSTROOT/Runtime/option.c \ $ROBUSTROOT/Runtime/ObjectHash.c \ @@ -265,11 +291,16 @@ if $MULTICOREFLAG then EXTRAOPTIONS="$EXTRAOPTIONS -DMULTICORE" fi +FILES="$FILES tmpbuilddirectory/taskdefs.c $ROBUSTROOT/Runtime/checkpoint.c" if $RAWFLAG then EXTRAOPTIONS="$EXTRAOPTIONS -DRAW" fi -FILES="$FILES tmpbuilddirectory/taskdefs.c $ROBUSTROOT/Runtime/checkpoint.c" +if $THREADSIMULATEFLAG +then +# -lpthread for pthread functions, -lrt for message queue functions +EXTRAOPTIONS="$EXTRAOPTIONS -DTHREADSIMULATE -lpthread -lrt" +fi fi if $OPTIONALFLAG @@ -294,8 +325,14 @@ then EXTRAOPTIONS="$EXTRAOPTIONS -ldmalloc -DDMALLOC" fi +if $MULTICOREFLAG +then +gcc $INCLUDES $EXTRAOPTIONS \ +tmpbuilddirectory/methods.c $FILES -lm -o $MAINFILE.bin +else gcc $INCLUDES $EXTRAOPTIONS -DPRECISE_GC \ tmpbuilddirectory/methods.c $FILES -lm -o $MAINFILE.bin +fi exit -- 2.34.1