bd467152603f92e761196b66df674c11bc7b8e76
[IRC.git] / Robust / src / Runtime / STM / commit.c
1 #include<tm.h>
2 #ifdef DELAYCOMP
3 #include<delaycomp.h>
4 #endif
5
6 #ifdef TRANSSTATS
7 #define TRANSWRAP(x) x
8 #else
9 #define TRANSWRAP(x)
10 #endif
11
12 #ifdef STMSTATS
13 #define STMWRAP(x) x
14 #else
15 #define STMWRAP(x)
16 #endif
17
18 #ifdef STMSTATS
19 #define STATFREE free(oidrdage);
20 #define STATALLOC oidrdage=malloc(size);
21 #define STATASSIGN oidrdage=rdage;
22 #else
23 #define STATFREE
24 #define STATALLOC
25 #define STATASSIGN
26 #endif
27
28 #if defined(STMARRAY)&&defined(DELAYCOMP)
29 #define ARRAYDELAYWRAP(x) x
30 #define ARRAYDELAYWRAP1(x) ,x
31 #else
32 #define ARRAYDELAYWRAP(x)
33 #define ARRAYDELAYWRAP1(x)
34 #endif
35
36 #ifdef DUALVIEW
37 #define DUALVIEWWRAP(x) x
38 #else
39 #define DUALVIEWWRAP(x)
40 #endif
41
42 /* ================================================================
43  * transCommit
44  * - This function initiates the transaction commit process
45  * - goes through the transaction cache and decides
46  * - a final response
47  * ================================================================
48  */
49
50 #ifdef DELAYCOMP
51 int transCommit(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
52 #else
53 int transCommit() {
54 #endif
55 #ifdef SANDBOX
56   abortenabled=0;
57 #endif
58   int softaborted=0;
59   do {
60     /* Look through all the objects in the transaction hash table */
61     int finalResponse;
62 #ifdef DELAYCOMP
63     if (c_numelements<(c_size>>3))
64       finalResponse=alttraverseCache(commitmethod, primitives, locals, params);
65     else
66       finalResponse=traverseCache(commitmethod, primitives, locals, params);
67 #else
68     if (c_numelements<(c_size>>3))
69       finalResponse=alttraverseCache();
70     else
71       finalResponse=traverseCache();
72 #endif
73     if(finalResponse == TRANS_ABORT) {
74       TRANSWRAP(numTransAbort++;if (softaborted) nSoftAbortAbort++;);
75       freenewobjs();
76       STMWRAP(freelockedobjs(););
77       objstrReset();
78       t_chashreset();
79 #ifdef READSET
80       rd_t_chashreset();
81 #endif
82 #ifdef DELAYCOMP
83       dc_t_chashreset();
84       ptrstack.count=0;
85       primstack.count=0;
86       branchstack.count=0;
87 #if defined(STMARRAY)&&!defined(DUALVIEW)
88       arraystack.count=0;
89 #endif
90 #endif
91 #ifdef SANDBOX
92       abortenabled=1;
93 #endif
94
95       return TRANS_ABORT;
96     }
97     if(finalResponse == TRANS_COMMIT) {
98       TRANSWRAP(numTransCommit++;if (softaborted) nSoftAbortCommit++;);
99       freenewobjs();
100       STMWRAP(freelockedobjs(););
101       objstrReset();
102       t_chashreset();
103 #ifdef READSET
104       rd_t_chashreset();
105 #endif
106 #ifdef DELAYCOMP
107       dc_t_chashreset();
108       ptrstack.count=0;
109       primstack.count=0;
110       branchstack.count=0;
111 #if defined(STMARRAY)&&!defined(DUALVIEW)
112       arraystack.count=0;
113 #endif
114 #endif
115       return 0;
116     }
117
118     /* wait a random amount of time before retrying to commit transaction*/
119     if(finalResponse == TRANS_SOFT_ABORT) {
120       TRANSWRAP(nSoftAbort++;);
121       softaborted++;
122 #ifdef SOFTABORT
123       if (softaborted>1) {
124 #else
125       if (1) {
126 #endif
127         //retry if too many soft aborts
128         freenewobjs();
129         STMWRAP(freelockedobjs(););
130         objstrReset();
131         t_chashreset();
132 #ifdef READSET
133         rd_t_chashreset();
134 #endif
135 #ifdef DELAYCOMP
136         dc_t_chashreset();
137         ptrstack.count=0;
138         primstack.count=0;
139         branchstack.count=0;
140 #if defined(STMARRAY)&&!defined(DUALVIEW)
141       arraystack.count=0;
142 #endif
143 #endif
144         return TRANS_ABORT;
145       }
146     } else {
147       printf("Error: in %s() Unknown outcome", __func__);
148       exit(-1);
149     }
150   } while (1);
151 }
152
153 #ifdef DELAYCOMP
154 #define freearrays if (c_numelements>=200) { \
155     STATFREE;                                \
156     STMARRAYFREE;                            \
157     free(oidrdlocked);                       \
158     free(oidrdversion);                      \
159   }                                          \
160   if (t_numelements>=200) {                  \
161     free(oidwrlocked);                       \
162     STMARRAYDELAYFREE;                       \
163   }
164 #else
165 #define freearrays   if (c_numelements>=200) { \
166     STATFREE;                                  \
167     STMARRAYFREE;                              \
168     free(oidrdlocked);                         \
169     free(oidrdversion);                        \
170     free(oidwrlocked);                         \
171   }
172 #endif
173
174 #ifdef DELAYCOMP
175 #define allocarrays int t_numelements=c_numelements+dc_c_numelements;   \
176   if (t_numelements<200) {                                              \
177     oidwrlocked=(struct garbagelist *) &wrlocked;                       \
178     STMARRAYDELAYASSIGN;                                                \
179   } else {                                                              \
180     oidwrlocked=malloc(2*sizeof(INTPTR)+t_numelements*(sizeof(void *))); \
181     STMARRAYDELAYALLOC;                                                 \
182   }                                                                     \
183   if (c_numelements<200) {                                              \
184     oidrdlocked=rdlocked;                                               \
185     oidrdversion=rdversion;                                             \
186     STATASSIGN;                                                         \
187     STMARRAYASSIGN;                                                     \
188   } else {                                                              \
189     int size=c_numelements*sizeof(void*);                               \
190     oidrdlocked=malloc(size);                                           \
191     oidrdversion=malloc(size);                                          \
192     STATALLOC;                                                          \
193     STMARRAYALLOC;                                                      \
194   }                                                                     \
195   dirwrlocked=oidwrlocked->array;
196 #else
197 #define allocarrays if (c_numelements<200) {      \
198     oidrdlocked=rdlocked;                         \
199     oidrdversion=rdversion;                       \
200     oidwrlocked=(struct garbagelist *) &wrlocked; \
201     STATASSIGN;                                   \
202     STMARRAYASSIGN;                               \
203   } else {                                        \
204     int size=c_numelements*sizeof(void*);         \
205     oidrdlocked=malloc(size);                     \
206     oidrdversion=malloc(size);                    \
207     oidwrlocked=malloc(size+2*sizeof(INTPTR));    \
208     STATALLOC;                                    \
209     STMARRAYALLOC;                                \
210   }                                               \
211   dirwrlocked=oidwrlocked->array;
212 #endif
213
214 #ifdef STMSTATS
215 #define ABORTSTAT1 header->abortCount++;                                \
216   ObjSeqId = headeraddr->accessCount;                                   \
217   (typesCausingAbort[TYPE(header)]).numabort++;                         \
218   (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;           \
219   (typesCausingAbort[TYPE(header)]).numtrans+=1;                        \
220   objtypetraverse[TYPE(header)]=1;                                      \
221   if(getTotalAbortCount(i+1, size, (void *)(curr->next), numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse)) \
222     softabort=0;
223 #define ABORTSTAT2                                                      \
224   ObjSeqId = oidrdage[i];                                               \
225   header->abortCount++;                                                 \
226   (typesCausingAbort[TYPE(header)]).numabort++;                         \
227   (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;           \
228   (typesCausingAbort[TYPE(header)]).numtrans+=1;                        \
229   objtypetraverse[TYPE(header)]=1;                                      \
230   if (getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse)) \
231     softabort=0;
232 #define ABORTSTAT3                                                      \
233   header->abortCount++;                                                 \
234   ObjSeqId = headeraddr->accessCount;                                   \
235   (typesCausingAbort[TYPE(header)]).numabort++;                         \
236   (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;           \
237   (typesCausingAbort[TYPE(header)]).numtrans+=1;                        \
238   objtypetraverse[TYPE(header)]=1;                                      \
239   if (getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse)) \
240     softabort=0;
241 #define ABORTSTAT4 ObjSeqId = oidrdage[i];                              \
242   header->abortCount++;                                                 \
243   getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse);
244 #else
245 #define ABORTSTAT1
246 #define ABORTSTAT2
247 #define ABORTSTAT3
248 #define ABORTSTAT4
249 #endif
250
251 #ifdef DELAYCOMP
252 #define DELAYWRAP(x) x
253 #define NUMWRTOTAL numoidwrtotal
254 #else
255 #define DELAYWRAP(x)
256 #define NUMWRTOTAL numoidwrlocked
257 #endif
258
259 #if defined(STMARRAY)
260 #define STMARRAYFREE free(oidrdlockedarray);
261 #define STMARRAYALLOC oidrdlockedarray=malloc(size);
262 #define STMARRAYASSIGN oidrdlockedarray=rdlockedarray;
263
264 //allocation statements for dirwrindex
265 #define STMARRAYDELAYFREE free(dirwrindex);
266 #define STMARRAYDELAYALLOC dirwrindex=malloc(t_numelements*sizeof(int));
267 #define STMARRAYDELAYASSIGN dirwrindex=wrindex;
268
269 #define ARRAYDEFINES int numoidrdlockedarray=0; \
270   void * rdlockedarray[200];                    \
271   void ** oidrdlockedarray;
272
273 #define ABORT                                   \
274   transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked)); \
275   freearrays;                                                           \
276   if (softabort)                                                        \
277     return TRANS_SOFT_ABORT;                                            \
278   else                                                                  \
279     return TRANS_ABORT;
280   
281 #define ABORTREAD                                                       \
282   transAbortProcess(oidwrlocked, numoidwrtotal ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
283   freearrays;                                                           \
284   if (softabort)                                                        \
285     return TRANS_SOFT_ABORT;                                            \
286   else                                                                  \
287     return TRANS_ABORT;
288
289
290 #define ARRAYABORT                                                      \
291   for(;j>=lowoffset;j--) {                                              \
292     GETLOCKVAL(status, transao, j);                                     \
293     if  (status==STMDIRTY) {                                            \
294       GETLOCKPTR(lockptr, mainao,j);                                    \
295       write_unlock(lockptr);                                            \
296     }                                                                   \
297   }                                                                     \
298   ABORT
299
300 #ifdef DUALVIEW
301 /* Try to grab object lock...If we get it, check object access
302    version and abort on mismatch */
303   
304 #define DVGETLOCK if (!addwrobject) {                                   \
305     unsigned int * objlock=&(&((objheader_t *)mainao)[-1])->lock;       \
306     if(unlikely(!rwread_trylock(objlock))) {                            \
307       ABORT;                                                            \
308     }                                                                   \
309 }
310
311 #define DVRELEASELOCK(x) {                                              \
312     unsigned int * objlock=&(&((objheader_t *)x)[-1])->lock;            \
313     rwread_unlock(objlock);                                             \
314   }
315
316 /*not finished...if we can't get the lock, it is okay if it is in our
317     access set*/
318
319 #define DVCHECKLOCK(x)                                                  \
320   unsigned int * objlock=&(&((objheader_t *)x)[-1])->lock;              \
321   if (objlock<=0) {                                                     \
322     if (!dc_t_chashSearch(x)) {                                         \
323       ABORTREAD;                                                        \
324     }                                                                   \
325   }
326
327 #define DVSETINDEX                                                      \
328   dirwrindex[numoidwrlocked]=-1;
329
330 #else
331 #define DVSETINDEX
332 #define DVGETLOCK
333 #define DVCHECKLOCK(x)
334 #define DVRELEASELOCK(x)
335 #endif
336
337
338 #if defined(DELAYCOMP)&&!defined(DUALVIEW)
339 #define READCHECK                                                       \
340   else if (dc_t_chashSearchArray(mainao,j)) {                           \
341     CFENCE;                                                             \
342     unsigned int localversion;                                          \
343     unsigned int remoteversion;                                         \
344     GETVERSIONVAL(localversion, transao, j);                            \
345     GETVERSIONVAL(remoteversion, mainao, j);                            \
346     if (localversion != remoteversion) {                                \
347       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
348       freearrays;                                                       \
349       return TRANS_ABORT;                                               \
350     }                                                                   \
351   }
352 #else
353 #define READCHECK
354 #endif
355
356   /* This code checks arrays to try to lock them if dirty or check
357      later if they were only read. */
358
359 #define PROCESSARRAY                                                    \
360   int type=((int *)cachedobj)[0];                                       \
361   if (type>=NUMCLASSES) {                                               \
362     struct ArrayObject *transao=(struct ArrayObject *) cachedobj;       \
363     struct ArrayObject *mainao=(struct ArrayObject *) objptr;           \
364     int lowoffset=(transao->lowindex);                                  \
365     int highoffset=(transao->highindex);                                \
366     int j;                                                              \
367     int addwrobject=0, addrdobject=0;                                   \
368     for(j=lowoffset; j<=highoffset;j++) {                               \
369       unsigned int status;                                              \
370       GETLOCKVAL(status, transao, j);                                   \
371       if (status==STMDIRTY) {                                           \
372         DVGETLOCK;                                                      \
373         unsigned int * lockptr;                                         \
374         GETLOCKPTR(lockptr, mainao,j);                                  \
375         if (likely(write_trylock(lockptr))) {                           \
376           unsigned int localversion;                                    \
377           unsigned int remoteversion;                                   \
378           GETVERSIONVAL(localversion, transao, j);                      \
379           GETVERSIONVAL(remoteversion, mainao, j);                      \
380           if (likely(localversion == remoteversion)) {                  \
381             addwrobject=1;                                              \
382           } else {                                                      \
383             DVRELEASELOCK(mainao);                                      \
384             ARRAYABORT;                                                 \
385           }                                                             \
386         } else {                                                        \
387           j--;                                                          \
388           DVRELEASELOCK(mainao);                                        \
389           ARRAYABORT;                                                   \
390         }                                                               \
391       } else if (status==STMCLEAN) {                                    \
392         addrdobject=1;                                                  \
393       }                                                                 \
394     }                                                                   \
395     if (addwrobject) {                                                  \
396       DVSETINDEX                                                        \
397       dirwrlocked[numoidwrlocked++] = objptr;                           \
398     }                                                                   \
399     if (addrdobject) {                                                  \
400       oidrdlockedarray[numoidrdlockedarray++]=objptr;                   \
401     }                                                                   \
402   } else
403
404
405   /** This code allows us to skip the expensive checks in some cases */
406
407 #ifdef DUALVIEW 
408 #define QUICKCHECK {                                                    \
409     objheader_t * head=&((objheader_t *)mainao)[-1];                    \
410     if (head->lock==RW_LOCK_BIAS&&                                      \
411         ((objheader_t *)transao)[-1].version==head->version)            \
412       continue;                                                         \
413   }
414 #define ARRAYCHECK                                      \
415   if (transao->arrayversion!=mainao->arrayversion) {    \
416     ABORT;}
417 #else
418 #define QUICKCHECK
419 #define ARRAYCHECK
420 #endif
421   
422 #define READARRAYS                                                      \
423   for(i=0; i<numoidrdlockedarray; i++) {                                \
424     struct ArrayObject * transao=(struct ArrayObject *) oidrdlockedarray[i]; \
425     struct ArrayObject * mainao=(struct ArrayObject *) transao->___objlocation___; \
426     ARRAYCHECK;                                                         \
427     QUICKCHECK;                                                         \
428     DVCHECKLOCK(mainao);                                                \
429     int lowoffset=(transao->lowindex);                                  \
430     int highoffset=(transao->highindex);                                \
431     int j;                                                              \
432     for(j=lowoffset; j<=highoffset;j++) {                               \
433       unsigned int locallock;GETLOCKVAL(locallock,transao,j);           \
434       if (locallock==STMCLEAN) {                                        \
435         /* do read check */                                             \
436         unsigned int mainlock; GETLOCKVAL(mainlock, mainao, j);         \
437         if (mainlock>0) {                                               \
438           CFENCE;                                                       \
439           unsigned int localversion;                                    \
440           unsigned int remoteversion;                                   \
441           GETVERSIONVAL(localversion, transao, j);                      \
442           GETVERSIONVAL(remoteversion, mainao, j);                      \
443           if (localversion != remoteversion) {                          \
444             transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
445             freearrays;                                                 \
446             return TRANS_ABORT;                                         \
447           }                                                             \
448         }                                                               \
449         READCHECK                                                       \
450         else {                                                          \
451           unsigned int localversion;                                    \
452           unsigned int remoteversion;                                   \
453           GETVERSIONVAL(localversion, transao, j);                      \
454           GETVERSIONVAL(remoteversion, mainao, j);                      \
455           if (localversion==remoteversion)                              \
456             softabort=1;                                                \
457           transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
458           freearrays;                                                   \
459           if (softabort)                                                \
460             return TRANS_SOFT_ABORT;                                    \
461           else                                                          \
462             return TRANS_ABORT;                                         \
463         }                                                               \
464       }                                                                 \
465     }                                                                   \
466   }
467 #else
468 #define ARRAYDEFINES
469 #define PROCESSARRAY
470 #define READARRAYS
471 #define STMARRAYFREE
472 #define STMARRAYALLOC
473 #define STMARRAYASSIGN
474 #define STMARRAYDELAYFREE
475 #define STMARRAYDELAYALLOC
476 #define STMARRAYDELAYASSIGN
477 #endif
478
479 #ifdef DELAYCOMP
480 #if defined(STMARRAY)&&!defined(DUALVIEW)
481 #define ARRAYLOCK                                                       \
482   int intkey=dc_curr->intkey;                                           \
483   if (intkey!=-1) {                                                     \
484     unsigned int *lockptr;                                              \
485     GETLOCKPTR(lockptr, objptr, intkey);                                \
486     if (likely(write_trylock(lockptr))) {                               \
487       /*have lock on element */                                         \
488       dirwrlocked[numoidwrtotal]=objptr;                                \
489       dirwrindex[numoidwrtotal++]=intkey;                               \
490     } else {                                                            \
491       chashlistnode_t *node = &c_table[(((unsigned INTPTR)objptr) & c_mask)>>4]; \
492       do {                                                              \
493         if(node->key == objptr) {                                       \
494           unsigned int lockval;                                         \
495           GETLOCKVAL(lockval, ((struct ArrayObject *)node->val), intkey); \
496           if (lockval!=STMDIRTY) {                                      \
497             /*have to abort to avoid deadlock*/                         \
498             transAbortProcess(oidwrlocked, numoidwrtotal, dirwrindex, numoidwrlocked); \
499             ABORTSTAT1;                                                 \
500             freearrays;                                                 \
501             if (softabort)                                              \
502               return TRANS_SOFT_ABORT;                                  \
503             else                                                        \
504               return TRANS_ABORT;                                       \
505           }                                                             \
506           break;                                                        \
507         }                                                               \
508         node = node->next;                                              \
509         if(node==NULL) {                                                \
510           transAbortProcess(oidwrlocked, numoidwrtotal, dirwrindex, numoidwrlocked); \
511           ABORTSTAT1;                                                   \
512           freearrays;                                                   \
513           if (softabort)                                                \
514             return TRANS_SOFT_ABORT;                                    \
515           else                                                          \
516             return TRANS_ABORT;                                         \
517         }                                                               \
518        } while(1);                                                      \
519     }                                                                   \
520   } else
521
522 #elif defined(STMARRAY)&&defined(DUALVIEW)
523 #define ARRAYLOCK                                                       \
524   if (((struct ___Object___ *)objptr)->type>=NUMCLASSES) {              \
525     if (likely(rwwrite_trylock(&header->lock))) {                       \
526       dirwrindex[numoidwrtotal]=0;                                      \
527       dirwrlocked[numoidwrtotal++] = objptr;                            \
528     } else {                                                            \
529       chashlistnode_t *node = &c_table[(((unsigned INTPTR)objptr) & c_mask)>>4]; \
530       do {                                                              \
531         if(node->key == objptr) {                                       \
532           objheader_t * headeraddr=&((objheader_t *) node->val)[-1];    \
533           if(STATUS(headeraddr) & DIRTY) {                              \
534             if (likely(rwconvert_trylock(&header->lock))) {             \
535               dirwrindex[numoidwrtotal]=1;                              \
536               dirwrlocked[numoidwrtotal++] = objptr;                    \
537               goto nextloop;                                            \
538             }                                                           \
539           }                                                             \
540           break;                                                        \
541         }                                                               \
542         node = node->next;                                              \
543       } while(node != NULL);                                            \
544       ABORTREAD;                                                        \
545     }                                                                   \
546   } else
547 #else
548 #define ARRAYLOCK
549 #endif
550
551
552 #define ACCESSLOCKS                                                     \
553   unsigned int numoidwrtotal=numoidwrlocked;                            \
554   dchashlistnode_t *dc_curr = dc_c_list;                                \
555   /* Inner loop to traverse the linked list of the cache lookupTable */ \
556   while(likely(dc_curr != NULL)) {                                      \
557     /*if the first bin in hash table is empty   */                      \
558     void *objptr=dc_curr->key;                                          \
559     objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t)); \
560     ARRAYLOCK                                                           \
561     if(likely(write_trylock(&header->lock))) { /*can aquire write lock*/ \
562       ARRAYDELAYWRAP(dirwrindex[numoidwrtotal]=-1;)                     \
563       dirwrlocked[numoidwrtotal++] = objptr;                            \
564     } else {                                                            \
565       /* maybe we already have lock*/                                   \
566       chashlistnode_t *node = &c_table[(((unsigned INTPTR)objptr) & c_mask)>>4]; \
567                                                                         \
568       do {                                                              \
569         if(node->key == objptr) {                                       \
570           objheader_t * headeraddr=&((objheader_t *) node->val)[-1];    \
571           if(STATUS(headeraddr) & DIRTY) {                              \
572             goto nextloop;                                              \
573           } else                                                        \
574             break;                                                      \
575         }                                                               \
576         node = node->next;                                              \
577       } while(node != NULL);                                            \
578                                                                         \
579       /*have to abort to avoid deadlock */                              \
580       transAbortProcess(oidwrlocked, numoidwrtotal ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
581       ABORTSTAT1;                                                       \
582       freearrays;                                                       \
583       if (softabort)                                                    \
584         return TRANS_SOFT_ABORT;                                        \
585       else                                                              \
586         return TRANS_ABORT;                                             \
587     }                                                                   \
588   nextloop:                                                             \
589     dc_curr = dc_curr->lnext;                                           \
590   }                                                     
591 #else
592 #define ACCESSLOCKS
593 #endif
594
595 #ifdef DELAYCOMP
596 void lwreset(dchashlistnode_t *dc_curr) {
597   dchashlistnode_t *ptr = dc_c_table;                           
598   dchashlistnode_t *top=&ptr[dc_c_size];                        
599   dchashlistnode_t *tmpptr=dc_c_list;                           
600   int reset=1;
601   while(tmpptr!=NULL) {                                         
602     dchashlistnode_t *next=tmpptr->lnext;                       
603     if (reset) {
604       if (tmpptr==dc_curr) {
605         reset=0;
606       } else {
607         struct ___Object___ * objptr=tmpptr->key;               
608         objheader_t *header=&((objheader_t *)objptr)[-1];       
609         if (objptr->type>=NUMCLASSES) {                 
610           rwwrite_unlock(&header->lock);
611         } else {
612           write_unlock(&header->lock);
613         }
614       }
615     }
616     if (tmpptr>=ptr&&tmpptr<top) {                              
617       //zero in list                                            
618       tmpptr->key=NULL;                                         
619       tmpptr->next=NULL;                                        
620     }                                                           
621     tmpptr=next;
622   }                                                             
623   while(dc_c_structs->next!=NULL) {                             
624     dcliststruct_t *next=dc_c_structs->next;                    
625     free(dc_c_structs);                                         
626     dc_c_structs=next;                                          
627   }                                                             
628   dc_c_structs->num = 0;                                        
629   dc_c_numelements = 0;                                        
630   dc_c_list=NULL;
631   ptrstack.count=0;
632   primstack.count=0;
633   branchstack.count=0;
634 }
635 #endif
636
637 /* ==================================================
638  * traverseCache
639  * - goes through the transaction cache and
640  * - decides if a transaction should commit or abort
641  * ==================================================
642  */
643
644 #ifdef DELAYCOMP
645 int traverseCache(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
646 #else
647 int traverseCache() {
648 #endif
649   /* Create info to keep track of objects that can be locked */
650   int numoidrdlocked=0;
651   int numoidwrlocked=0;
652   void * rdlocked[200];
653   int rdversion[200];
654   struct fixedlist wrlocked;
655   int softabort=0;
656   int i;
657   void ** oidrdlocked;
658   int * oidrdversion;
659   ARRAYDEFINES;
660   STMWRAP(int rdage[200];int * oidrdage;int ObjSeqId;int objtypetraverse[TOTALNUMCLASSANDARRAY];);
661   struct garbagelist * oidwrlocked;
662   void ** dirwrlocked;
663 #if defined(STMARRAY)&&defined(DELAYCOMP)
664   int wrindex[200];
665   int * dirwrindex;
666 #endif
667   allocarrays;
668
669   STMWRAP(for(i=0; i<TOTALNUMCLASSANDARRAY; i++) objtypetraverse[i] = 0;);
670
671   chashlistnode_t *ptr = c_table;
672   /* Represents number of bins in the chash table */
673   unsigned int size = c_size;
674   for(i = 0; i<size; i++) {
675     chashlistnode_t *curr = &ptr[i];
676     /* Inner loop to traverse the linked list of the cache lookupTable */
677     while(curr != NULL) {
678       //if the first bin in hash table is empty
679       if(curr->key == NULL)
680         break;
681       objheader_t * cachedobj=curr->val;
682       objheader_t * headeraddr=&((objheader_t *) cachedobj)[-1]; //cached object
683       void * objptr=curr->key;
684       objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t)); //real object
685       unsigned int version = headeraddr->version;
686
687       if(STATUS(headeraddr) & DIRTY) {
688         PROCESSARRAY
689         /* Read from the main heap  and compare versions */
690         if(likely(write_trylock(&header->lock))) { //can aquire write lock
691           if (likely(version == header->version)) { /* versions match */
692             /* Keep track of objects locked */
693             dirwrlocked[numoidwrlocked++] = objptr;
694           } else {
695             dirwrlocked[numoidwrlocked++] = objptr;
696             transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked));
697             ABORTSTAT1;
698             freearrays;
699             if (softabort)
700               return TRANS_SOFT_ABORT;
701             else 
702               return TRANS_ABORT;
703           }
704         } else {
705           if(version == header->version) {
706             /* versions match */
707             softabort=1;
708           }
709           transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked));
710           ABORTSTAT1;
711           freearrays;
712           if (softabort)
713             return TRANS_SOFT_ABORT;
714           else 
715             return TRANS_ABORT;
716       
717         }
718       } else {
719         STMWRAP(oidrdage[numoidrdlocked]=headeraddr->accessCount;);
720         oidrdversion[numoidrdlocked]=version;
721         oidrdlocked[numoidrdlocked++]=header;
722       }
723       curr = curr->next;
724     }
725   } //end of for
726   
727   ACCESSLOCKS;
728
729   //THIS IS THE SERIALIZATION END POINT (START POINT IS END OF EXECUTION)*****
730   READARRAYS;
731
732   for(i=0; i<numoidrdlocked; i++) {
733     /* Read from the main heap  and compare versions */
734     objheader_t *header=oidrdlocked[i];
735     unsigned int version=oidrdversion[i];
736 #if defined(STMARRAY)&&defined(DUALVIEW)
737     unsigned int isobject=((struct ___Object___ *)&header[1])->type<NUMCLASSES;
738     if(likely((isobject&&header->lock>0)||(!isobject&&header->lock==RW_LOCK_BIAS))) {
739 #else      
740     if(likely(header->lock>0)) { //not write locked
741 #endif
742       CFENCE;
743       if(version != header->version) { /* versions do not match */
744         transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
745         ABORTSTAT2;
746         freearrays;
747         return TRANS_ABORT;
748       }
749 #if DELAYCOMP
750     } else if (dc_t_chashSearch(((char *)header)+sizeof(objheader_t))) {
751       //couldn't get lock because we already have it
752       //check if it is the right version number
753       if (version!=header->version) {
754         transAbortProcess(oidwrlocked, numoidwrtotal ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
755         ABORTSTAT2;
756         freearrays;
757         return TRANS_ABORT;
758       }
759 #endif
760     } else { /* cannot aquire lock */
761       //do increment as we didn't get lock
762       if(version == header->version) {
763         softabort=1;
764       }
765       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
766       ABORTSTAT2;
767       freearrays;
768       if (softabort)
769         return TRANS_SOFT_ABORT;
770       else 
771         return TRANS_ABORT;
772     }
773   }
774
775 #ifdef READSET
776   //need to validate auxilary readset
777   rdchashlistnode_t *rd_curr = rd_c_list;
778   /* Inner loop to traverse the linked list of the cache lookupTable */
779   while(likely(rd_curr != NULL)) {
780     //if the first bin in hash table is empty
781     unsigned int version=rd_curr->version;
782     struct ___Object___ * objptr=rd_curr->key;
783     objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t));
784 #if defined(STMARRAY)&&defined(DUALVIEW)
785     unsigned int isobject=objptr->type<NUMCLASSES;
786     if(likely((isobject&&header->lock>0)||(!isobject&&header->lock==RW_LOCK_BIAS))) {
787 #else      
788     if(likely(header->lock>0)) { //not write locked
789 #endif
790       //object is not write locked
791       if (unlikely(version!=header->version)) {
792         //have to abort
793         transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
794         STMWRAP((typesCausingAbort[TYPE(header)])++;);
795         freearrays;
796         if (softabort)
797           return TRANS_SOFT_ABORT;
798         else
799           return TRANS_ABORT;
800       }
801     } else {
802       //maybe we already have lock
803       if (likely(version==header->version)) {
804         void * key=rd_curr->key;
805 #ifdef DELAYCOMP
806         //check to see if it is in the delaycomp table
807         {
808           dchashlistnode_t *node = &dc_c_table[(((unsigned INTPTR)key) & dc_c_mask)>>4];
809           do {
810             if(node->key == key) {
811               goto nextloopread;
812             }
813             node = node->next;
814           } while(node != NULL);
815         }
816 #endif
817         //check normal table
818 #ifdef STMARRAY
819       if (likely(objptr->type<NUMCLASSES||header->lock==(RW_LOCK_BIAS-1))) {    
820 #else
821         {
822 #endif
823           chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
824           do {
825             if(node->key == key) {
826               objheader_t * headeraddr=&((objheader_t *) node->val)[-1];          
827               if(STATUS(headeraddr) & DIRTY) {
828                 goto nextloopread;
829               }
830             }
831             node = node->next;
832           } while(node != NULL);
833         }
834       }
835       //have to abort to avoid deadlock
836       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
837       STMWRAP((typesCausingAbort[TYPE(header)])++;);
838       freearrays;
839       if (softabort)
840         return TRANS_SOFT_ABORT;
841       else
842         return TRANS_ABORT;
843     }
844   nextloopread:
845     rd_curr = rd_curr->lnext;
846   }
847 #endif
848   
849   /* Decide the final response */
850 #ifdef DELAYCOMP
851   transCommitProcess(oidwrlocked ARRAYDELAYWRAP1(dirwrindex), numoidwrlocked, numoidwrtotal, commitmethod, primitives, locals, params);
852 #else
853   transCommitProcess(oidwrlocked, numoidwrlocked);
854 #endif
855   freearrays;
856   return TRANS_COMMIT;
857 }
858
859 /* ==================================================
860  * alttraverseCache
861  * - goes through the transaction cache and
862  * - decides if a transaction should commit or abort
863  * ==================================================
864  */
865
866 #ifdef DELAYCOMP
867 int alttraverseCache(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
868 #else
869 int alttraverseCache() {
870 #endif
871   /* Create info to keep track of objects that can be locked */
872   int numoidrdlocked=0;
873   int numoidwrlocked=0;
874   void * rdlocked[200];
875   int rdversion[200];
876   struct fixedlist wrlocked;
877   int softabort=0;
878   int i;
879   void ** oidrdlocked;
880   int * oidrdversion;
881   STMWRAP(int rdage[200];int * oidrdage;int ObjSeqId;int objtypetraverse[TOTALNUMCLASSANDARRAY];);
882   ARRAYDEFINES;
883   struct garbagelist * oidwrlocked;
884   void ** dirwrlocked;
885 #if defined(STMARRAY)&&defined(DELAYCOMP)
886   int wrindex[200];
887   int * dirwrindex;
888 #endif
889   allocarrays;
890
891   STMWRAP(for(i=0; i<TOTALNUMCLASSANDARRAY; i++) objtypetraverse[i] = 0;);
892
893   chashlistnode_t *curr = c_list;
894   /* Inner loop to traverse the linked list of the cache lookupTable */
895   while(likely(curr != NULL)) {
896     //if the first bin in hash table is empty
897       objheader_t * cachedobj=curr->val;
898     objheader_t * headeraddr=&((objheader_t *) cachedobj)[-1];
899     void *objptr=curr->key;
900     objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t));
901     unsigned int version = headeraddr->version;
902
903     if(STATUS(headeraddr) & DIRTY) {
904       PROCESSARRAY
905       /* Read from the main heap  and compare versions */
906       if(likely(write_trylock(&header->lock))) { //can aquire write lock
907         if (likely(version == header->version)) { /* versions match */
908           /* Keep track of objects locked */
909           dirwrlocked[numoidwrlocked++] = objptr;
910         } else {
911           dirwrlocked[numoidwrlocked++] = objptr;
912           transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked));
913           ABORTSTAT3;
914           freearrays;
915           return TRANS_ABORT;
916         }
917       } else { /* cannot aquire lock */
918         if(version == header->version) {
919           /* versions match */
920           softabort=1;
921         }
922         transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked));
923         ABORTSTAT3;
924         freearrays;
925         if (softabort)
926           return TRANS_SOFT_ABORT;
927         else 
928           return TRANS_ABORT;
929       }
930     } else {
931       /* Read from the main heap  and compare versions */
932       oidrdversion[numoidrdlocked]=version;
933       oidrdlocked[numoidrdlocked++] = header;
934     }
935     curr = curr->lnext;
936   }
937
938   ACCESSLOCKS;
939
940   //THIS IS THE SERIALIZATION END POINT (START POINT IS END OF EXECUTION)*****
941   READARRAYS;
942
943   for(i=0; i<numoidrdlocked; i++) {
944     objheader_t * header=oidrdlocked[i];
945     unsigned int version=oidrdversion[i];
946 #if defined(STMARRAY)&&defined(DUALVIEW)
947     unsigned int isobject=((struct ___Object___ *)&header[1])->type<NUMCLASSES;
948     if(likely((isobject&&header->lock>0)||(!isobject&&header->lock==RW_LOCK_BIAS))) {
949 #else      
950     if(likely(header->lock>0)) { //not write locked
951 #endif
952       CFENCE;
953       if(unlikely(version != header->version)) {
954         transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
955         ABORTSTAT2;
956         freearrays;
957         return TRANS_ABORT;
958       }
959 #ifdef DELAYCOMP
960     } else if (dc_t_chashSearch(((char *)header)+sizeof(objheader_t))) {
961       //couldn't get lock because we already have it
962       //check if it is the right version number
963       if (version!=header->version) {
964         transAbortProcess(oidwrlocked, numoidwrtotal ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
965         ABORTSTAT4;
966         freearrays;
967         return TRANS_ABORT;
968       }
969 #endif
970     } else { /* cannot aquire lock */
971       if(version == header->version) {
972         softabort=1;
973       }
974       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
975       ABORTSTAT2;
976       freearrays;
977       if (softabort)
978         return TRANS_SOFT_ABORT;
979       else 
980         return TRANS_ABORT;
981     }
982   }
983
984 #ifdef READSET
985   //need to validate auxilary readset
986   rdchashlistnode_t *rd_curr = rd_c_list;
987   /* Inner loop to traverse the linked list of the cache lookupTable */
988   while(likely(rd_curr != NULL)) {
989     //if the first bin in hash table is empty
990     int version=rd_curr->version;
991     struct ___Object___ * objptr=rd_curr->key;
992     objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t));
993 #if defined(STMARRAY)&&defined(DUALVIEW)
994     unsigned int isobject=objptr->type<NUMCLASSES;
995     if(likely((isobject&&header->lock>0)||(!isobject&&header->lock==RW_LOCK_BIAS))) {
996 #else      
997     if(likely(header->lock>0)) { //not write locked
998 #endif
999       if (unlikely(version!=header->version)) {
1000         //have to abort
1001         transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
1002         STMWRAP((typesCausingAbort[TYPE(header)])++;);
1003         freearrays;
1004         if (softabort)
1005           return TRANS_SOFT_ABORT;
1006         else
1007           return TRANS_ABORT;   
1008       }
1009     } else {
1010       if (version==header->version) {
1011         void * key=rd_curr->key;
1012 #ifdef DELAYCOMP
1013         //check to see if it is in the delaycomp table
1014         {
1015           dchashlistnode_t *node = &dc_c_table[(((unsigned INTPTR)key) & dc_c_mask)>>4];
1016           do {
1017             if(node->key == key)
1018               goto nextloopread;
1019             node = node->next;
1020           } while(node != NULL);
1021         }
1022 #endif
1023         //check normal table
1024 #ifdef STMARRAY
1025         if (likely(objptr->type<NUMCLASSES||header->lock==(RW_LOCK_BIAS-1))) {  
1026 #else
1027           {
1028 #endif
1029           chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
1030           do {
1031             if(node->key == key) {
1032               objheader_t * headeraddr=&((objheader_t *) node->val)[-1];          
1033               if(STATUS(headeraddr) & DIRTY) {
1034                 goto nextloopread;
1035               }
1036             }
1037             node = node->next;
1038           } while(node != NULL);
1039         }
1040       }
1041       //have to abort to avoid deadlock
1042       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
1043       STMWRAP((typesCausingAbort[TYPE(header)])++;);
1044       freearrays;
1045       if (softabort)
1046         return TRANS_SOFT_ABORT;
1047       else
1048         return TRANS_ABORT;
1049     }
1050   nextloopread:
1051     rd_curr = rd_curr->lnext;
1052   }
1053 #endif
1054
1055   /* Decide the final response */
1056 #ifdef DELAYCOMP
1057   transCommitProcess(oidwrlocked ARRAYDELAYWRAP1(dirwrindex), numoidwrlocked, numoidwrtotal, commitmethod, primitives, locals, params);
1058 #else
1059   transCommitProcess(oidwrlocked, numoidwrlocked);
1060 #endif
1061   freearrays;
1062   return TRANS_COMMIT;
1063 }
1064
1065 /* ==================================
1066  * transAbortProcess
1067  *
1068  * =================================
1069  */
1070
1071 int logflag=1;
1072  
1073 #if defined(STMARRAY)&&defined(DELAYCOMP)
1074 void transAbortProcess(struct garbagelist *oidwrlocked, int numoidwrtotal, int * dirwrindex, int numoidwrlocked) {
1075 #else
1076 void transAbortProcess(struct garbagelist *oidwrlocked, int numoidwrlocked) {
1077 #endif
1078   int i;
1079   objheader_t *header;
1080   /* Release read locks */
1081   void ** dirwrlocked=oidwrlocked->array;
1082   /* Release write locks */
1083   for(i=numoidwrlocked-1; i>=0; i--) {
1084     /* Read from the main heap */
1085     struct ___Object___ * dst=dirwrlocked[i];
1086     header = &((objheader_t *)dst)[-1];
1087 #ifdef STMARRAY
1088     int type=dst->type;
1089     if (type>=NUMCLASSES) {
1090       //have array, do unlocking of bins
1091       struct ArrayObject *src=(struct ArrayObject *)t_chashSearch(dst);
1092       int lowoffset=(src->lowindex);
1093       int highoffset=(src->highindex);
1094       int j;
1095       int addwrobject=0, addrdobject=0;
1096       for(j=lowoffset; j<=highoffset;j++) {
1097         unsigned int status;
1098         GETLOCKVAL(status, src, j);
1099         if (status==STMDIRTY) {
1100           unsigned int *lockptr;
1101           GETLOCKPTR(lockptr, ((struct ArrayObject *)dst), j);
1102           write_unlock(lockptr);
1103         }
1104       }
1105 #ifdef DUALVIEW
1106       //release object array lock
1107       rwread_unlock(&header->lock);
1108 #endif
1109     } else
1110 #endif
1111       write_unlock(&header->lock);
1112   }
1113 #if defined(STMARRAY)&&defined(DELAYCOMP)&&!defined(DUALVIEW)
1114   //release access locks
1115   for(i=numoidwrtotal-1; i>=numoidwrlocked; i--) {
1116     struct ___Object___ * dst=dirwrlocked[i];
1117     header = &((objheader_t *)dst)[-1];
1118     int wrindex=dirwrindex[i];
1119     if (wrindex==-1) {
1120       //normal object
1121       write_unlock(&header->lock);
1122     } else {
1123       //array element
1124       unsigned int *intptr;
1125       GETLOCKPTR(intptr, ((struct ArrayObject *)dst), wrindex);
1126       write_unlock(intptr);
1127     }
1128   }
1129 #endif
1130 #if defined(STMARRAY)&&defined(DELAYCOMP)&&defined(DUALVIEW)
1131   //release access locks
1132   for(i=numoidwrtotal-1; i>=numoidwrlocked; i--) {
1133     struct ___Object___ * dst=dirwrlocked[i];
1134     header = &((objheader_t *)dst)[-1];
1135     int wrindex=dirwrindex[i];
1136     if (wrindex==-1) {
1137       //normal object
1138       write_unlock(&header->lock);
1139     } else if (wrindex==0) {
1140       //array element
1141       rwwrite_unlock(&header->lock);
1142     } else {
1143       rwconvert_unlock(&header->lock);
1144     }
1145   }
1146 #endif
1147 #ifdef STMSTATS
1148   /* clear trec and then release objects locked */
1149   struct objlist *ptr=lockedobjs;
1150   while(ptr!=NULL) {
1151     int max=ptr->offset;
1152     for(i=max-1; i>=0; i--) {
1153       header = (objheader_t *)ptr->objs[i];
1154       header->trec = NULL;
1155       pthread_mutex_unlock(header->objlock);
1156     }
1157     ptr=ptr->next;
1158   }
1159 #endif
1160 }
1161
1162 /* ==================================
1163  * transCommitProcess
1164  *
1165  * =================================
1166  */
1167 #ifdef DELAYCOMP
1168 void transCommitProcess(struct garbagelist * oidwrlocked ARRAYDELAYWRAP1(int * dirwrindex), int numoidwrlocked, int numoidwrtotal, void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
1169 #else
1170 void transCommitProcess(struct garbagelist * oidwrlocked, int numoidwrlocked) {
1171 #endif
1172   objheader_t *header;
1173   void *ptrcreate;
1174   int i;
1175   struct objlist *ptr=newobjs;
1176   void **dirwrlocked=oidwrlocked->array;
1177   while(ptr!=NULL) {
1178     int max=ptr->offset;
1179     for(i=0; i<max; i++) {
1180       //clear the new flag
1181       ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
1182     }
1183     ptr=ptr->next;
1184   }
1185
1186   /* Copy from transaction cache -> main object store */
1187   for (i = numoidwrlocked-1; i >=0; i--) {
1188     /* Read from the main heap */
1189     header = &((objheader_t *)dirwrlocked[i])[-1];
1190     int tmpsize;
1191     GETSIZE(tmpsize, header);
1192     struct ___Object___ *dst=(struct ___Object___*)dirwrlocked[i];
1193     struct ___Object___ *src=t_chashSearch(dst);
1194     dst->___cachedCode___=src->___cachedCode___;
1195     dst->___cachedHash___=src->___cachedHash___;
1196 #ifdef STMARRAY
1197     int type=dst->type;
1198     if (type>=NUMCLASSES) {
1199       //have array, do copying of bins
1200       int lowoffset=(((struct ArrayObject *)src)->lowindex);
1201       int highoffset=(((struct ArrayObject *)src)->highindex);
1202       int j;
1203       int addwrobject=0, addrdobject=0;
1204       int elementsize=classsize[type];
1205       int baseoffset=(lowoffset<<INDEXSHIFT)+sizeof(int)+((int)&(((struct ArrayObject *)0)->___length___));
1206       char *dstptr=((char *)dst)+baseoffset;
1207       char *srcptr=((char *)src)+baseoffset;
1208       for(j=lowoffset; j<=highoffset;j++, srcptr+=INDEXLENGTH,dstptr+=INDEXLENGTH) {
1209         unsigned int status;
1210         GETLOCKVAL(status, ((struct ArrayObject *)src), j);
1211         if (status==STMDIRTY) {
1212           A_memcpy(dstptr, srcptr, INDEXLENGTH);
1213         }
1214       }
1215     } else
1216 #endif 
1217       A_memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1218   }
1219   CFENCE;
1220
1221 #ifdef DELAYCOMP
1222   //  call commit method
1223   ptrstack.maxcount=0;
1224   primstack.count=0;
1225   branchstack.count=0;
1226 #if defined(STMARRAY)&&!defined(DUALVIEW)
1227   arraystack.maxcount=0;
1228 #endif
1229   //splice oidwrlocked in
1230   oidwrlocked->size=numoidwrtotal;
1231   oidwrlocked->next=params;
1232   ((struct garbagelist *)locals)->next=oidwrlocked;
1233   if (commitmethod!=NULL)
1234     commitmethod(params, locals, primitives);
1235   ((struct garbagelist *)locals)->next=params;
1236 #endif
1237
1238   /* Release write locks */
1239 #if defined(STMARRAY)&&defined(DELAYCOMP)
1240   for(i=numoidwrlocked-1; i>=0; i--) {
1241 #else
1242   for(i=NUMWRTOTAL-1; i>=0; i--) {
1243 #endif
1244     struct ___Object___ * dst=dirwrlocked[i];
1245     header = &((objheader_t *)dst)[-1];
1246 #ifdef STMARRAY
1247     int type=dst->type;
1248     if (type>=NUMCLASSES) {
1249       //have array, do unlocking of bins
1250       struct ArrayObject *src=(struct ArrayObject *)t_chashSearch(dst);
1251       int lowoffset=(src->lowindex);
1252       int highoffset=(src->highindex);
1253       int j;
1254       int addwrobject=0, addrdobject=0;
1255       for(j=lowoffset; j<=highoffset;j++) {
1256         unsigned int status;
1257         GETLOCKVAL(status, src, j);
1258         if (status==STMDIRTY) {
1259           unsigned int *intptr;
1260           GETVERSIONPTR(intptr, ((struct ArrayObject *)dst), j);
1261           (*intptr)++;
1262           GETLOCKPTR(intptr, ((struct ArrayObject *)dst), j);
1263           write_unlock(intptr);
1264         }
1265       }
1266       atomic_inc(&header->version);
1267 #ifdef DUALVIEW
1268       rwread_unlock(&header->lock);
1269 #endif
1270     } else
1271 #endif
1272     {
1273       header->version++;
1274       write_unlock(&header->lock);
1275     }
1276   }
1277 #if defined(STMARRAY)&&defined(DELAYCOMP)&&defined(DUALVIEW)
1278   //release access locks
1279   for(i=numoidwrtotal-1; i>=numoidwrlocked; i--) {
1280     struct ___Object___ * dst=dirwrlocked[i];
1281     int wrlock=dirwrindex[i];
1282     header = &((objheader_t *)dst)[-1];
1283     if (wrlock==-1) {
1284       header->version++;
1285       write_unlock(&header->lock);
1286     } else if (wrlock==0) {
1287       header->version++;
1288       rwwrite_unlock(&header->lock);
1289     } else {
1290       header->version++;
1291 #ifdef DUALVIEW
1292       ((struct ArrayObject*)dst)->arrayversion++;
1293 #endif
1294       rwconvert_unlock(&header->lock);
1295     }
1296   }
1297 #endif
1298 #if defined(STMARRAY)&&defined(DELAYCOMP)&&!defined(DUALVIEW)
1299   //release access locks
1300   for(i=numoidwrtotal-1; i>=numoidwrlocked; i--) {
1301     struct ___Object___ * dst=dirwrlocked[i];
1302     header = &((objheader_t *)dst)[-1];
1303     int wrindex=dirwrindex[i];
1304     if (wrindex==-1) {
1305       //normal object
1306       header->version++;
1307       write_unlock(&header->lock);
1308     } else {
1309       //array element
1310       unsigned int *intptr;
1311       atomic_inc(&header->version);
1312       GETVERSIONPTR(intptr, ((struct ArrayObject *)dst), wrindex);
1313       (*intptr)++;
1314       GETLOCKPTR(intptr, ((struct ArrayObject *)dst), wrindex);
1315       write_unlock(intptr);
1316     }
1317   }
1318 #endif
1319 #ifdef STMSTATS
1320   /* clear trec and then release objects locked */
1321   ptr=lockedobjs;
1322   while(ptr!=NULL) {
1323     int max=ptr->offset;
1324     for(i=max-1; i>=0; i--) {
1325       header = (objheader_t *)ptr->objs[i];
1326       header->trec = NULL;
1327       pthread_mutex_unlock(header->objlock);
1328     }
1329     ptr=ptr->next;
1330   }
1331 #endif
1332 }