changes
authorbdemsky <bdemsky>
Mon, 19 Apr 2010 20:06:29 +0000 (20:06 +0000)
committerbdemsky <bdemsky>
Mon, 19 Apr 2010 20:06:29 +0000 (20:06 +0000)
Robust/src/Benchmarks/Recovery/Spider/recovery/QueryTask.java
Robust/src/Benchmarks/Recovery/Spider/recovery/Spider.java
Robust/src/Benchmarks/Recovery/Spider/recovery/Task.java
Robust/src/Benchmarks/Recovery/Spider/recovery/TaskSet.java
Robust/src/Benchmarks/Recovery/Spider/recovery/Worker.java

index 4992deeea77df64b876db91803f48c5ad82b5076..79011c4d62a75cd37810b9792704316cff408deb 100644 (file)
@@ -27,6 +27,7 @@ public class QueryTask extends Task {
     int ldepth;
 
     atomic {
+      System.out.println("trans 2");
       max = this.maxDepth;
       maxSearch = this.maxSearchDepth;
       ldepth=this.depth;
@@ -39,6 +40,7 @@ public class QueryTask extends Task {
       String title;
       
       atomic {
+      System.out.println("trans 3");
        hostname = new String(GlobalString.toLocalCharArray(getHostName()));
        path = new String(GlobalString.toLocalCharArray(getPath()));
        
@@ -65,6 +67,7 @@ public class QueryTask extends Task {
       
       if ((title = grabTitle(lq)) != null) {
        atomic {
+      System.out.println("trans 4");
          //commits everything...either works or fails
          gTitle = global new GlobalString(title);
          processPage(lq);
@@ -74,6 +77,7 @@ public class QueryTask extends Task {
       s.close();
     } else {
       atomic {
+      System.out.println("trans 5");
        dequeueTask();
       }
     }
index 00a2d8f6c5f0dd46b3395abeb9cd7d107d982859..5a29018643710d3669817a7ba85dca0ab3f23858 100644 (file)
@@ -39,7 +39,10 @@ public class Spider {
       //set up workers
       ts=global new TaskSet(NUM_THREADS);
       for (i = 0; i < NUM_THREADS; i++) {
-       ts.threads[i] = global new Worker(ts,i);
+        ts.threads[i] = global new Worker(ts,i,(NUM_THREADS/2));
+      }
+      for (i = 0; i < NUM_THREADS/2; i++) {
+        ts.todo[i] = global new GlobalQueue();
       }
     }
 
@@ -50,7 +53,7 @@ public class Spider {
       DistributedHashMap results = global new DistributedHashMap(100, 100, 0.75f);
       DistributedLinkedList results_list = global new DistributedLinkedList();
       QueryTask firstquery = global new QueryTask(visitedList, maxDepth, maxSearchDepth, results, results_list, firstmachine, firstpage, 0);
-      ts.todo.push(firstquery);
+      ts.todo[0].push(firstquery);
     }
 
     System.printString("Finished to create Objects\n");
index 1398c1c67ed26f6ba72cd9b16b9eadef2410e98f..c45c25e866fdf434667f0a620c44c35306a50abc 100644 (file)
@@ -1,16 +1,19 @@
 public class Task {
   //Current worker thread
   Worker w;
+  int queueid;
   public Task() {}
   public void execute();
-  public void setWorker(Worker w) {
+  public void setWorker(Worker w, int queueid) {
     this.w = w;
+    this.queueid = queueid;
   }
   public void dequeueTask() {
     w.workingtask=null;
   }
   public void enqueueTask(Task t) {
-    w.tasks.todo.push(t);
+    //System.out.println("queueid= " + queueid);
+    w.tasks.todo[queueid].push(t);
   }
   public native void execution();
 }
index c88182d4afd64b60ae17becad3f13744a2f4de7a..1904a89ec89ea7cca2a3effa7f5d964a54c18a0d 100644 (file)
@@ -2,11 +2,11 @@ public class TaskSet {
   public TaskSet(int nt) {
     numthreads=nt;
     threads=global new Worker[nt];
-    todo=global new GlobalQueue();
+    todo=global new GlobalQueue[(nt/2)];
   }
 
   //Tasks to be executed
-  GlobalQueue todo;
+  GlobalQueue[] todo;
   //Vector of worker threads
   Worker threads[];
   int numthreads;
index 92d026fc2506f9069ccc17d6826241639bf3b4fe..0c8991e447bda10bd59ef218eb150c868dbe13de 100644 (file)
@@ -1,13 +1,13 @@
 public class Worker extends Thread {
-  Object[] currentWorkList;
-  int mid;
+  int id;
   TaskSet tasks;
   Task workingtask;
+  int numQueue;
 
-  Worker(TaskSet tasks, int mid) {
+  Worker(TaskSet tasks, int id, int numQueue) {
     this.tasks = tasks;
-    this.currentWorkList = currentWorkList;
-    mid = mid;
+    this.id = id;
+    this.numQueue = 3; // Correct this 3 should be hash defined
   }
   
   public void run() {
@@ -17,29 +17,48 @@ public class Worker extends Thread {
     while(notdone) {
       Task t=null;
       atomic {
-        if (!tasks.todo.isEmpty()) {
+        System.out.println("Transacion 1");
+        int qindex = (id%numQueue);
+        //System.out.println("id= " + id + " numQueue= " + numQueue);
+        if (!tasks.todo[qindex].isEmpty()) {
           //grab segment from todo list
-          t=workingtask=(Task) tasks.todo.pop();
+          t=workingtask=(Task) tasks.todo[qindex].pop();
           if(t!=null)
-            t.setWorker(this);
+            t.setWorker(this, qindex);
         } else {
-          //steal work from dead threads
-          Worker[] threads=tasks.threads;
-         boolean shouldexit=true;
-          for(int i=0;i<threads.length;i++) {
-            Worker w=(Worker)threads[i];
-           if (w.workingtask!=null)
-             shouldexit=false;
-            if (w.getStatus(i)==-1&&w.workingtask!=null) {
-              //steal work from this thread
-              t=workingtask=w.workingtask;
-              w.workingtask=null;
-             t.setWorker(this);
-              break;
+          int newqindex = qindex;
+          boolean skipvisit = false;
+          for(int queuecount=1;queuecount < numQueue;queuecount++) {
+            newqindex = ((newqindex+1)%numQueue);
+            if (!tasks.todo[newqindex].isEmpty()) {
+              //grab segment from another todo list
+              t=workingtask=(Task) tasks.todo[newqindex].pop();
+              if(t!=null) {
+                t.setWorker(this, qindex);
+                skipvisit = true;
+                break;
+              }
             }
           }
-         if (shouldexit)
-           notdone=false;
+          if(!skipvisit) {
+            //steal work from dead threads
+            Worker[] threads=tasks.threads;
+            boolean shouldexit=true;
+            for(int i=0;i<threads.length;i++) {
+              Worker w=(Worker)threads[i];
+              if (w.workingtask!=null)
+                shouldexit=false;
+              if (w.getStatus(i)==-1&&w.workingtask!=null) {
+                //steal work from this thread
+                t=workingtask=w.workingtask;
+                w.workingtask=null;
+                t.setWorker(this, qindex);
+                break;
+              }
+            }
+            if (shouldexit)
+              notdone=false;
+          }
         }
       }
       if (t!=null) {
@@ -47,7 +66,7 @@ public class Worker extends Thread {
         continue;
       } else if (notdone) {
         //System.out.println("Not done");
-       sleep(500000);
+        sleep(10000);
       }
     }
     fi = System.currentTimeMillis();