add likely/unlikely branch hints
[IRC.git] / Robust / src / Runtime / STM / commit.c
1 #include<tm.h>
2 #ifdef DELAYCOMP
3 #include<delaycomp.h>
4 #endif
5
6 #ifdef TRANSSTATS
7 #define TRANSWRAP(x) x
8 #else
9 #define TRANSWRAP(x)
10 #endif
11
12 #ifdef STMSTATS
13 #define STMWRAP(x) x
14 #else
15 #define STMWRAP(x)
16 #endif
17
18 #ifdef STMSTATS
19 #define STATFREE free(oidrdage);
20 #define STATALLOC oidrdage=malloc(size);
21 #define STATASSIGN oidrdage=rdage;
22 #else
23 #define STATFREE
24 #define STATALLOC
25 #define STATASSIGN
26 #endif
27
28 #if defined(STMARRAY)&&defined(DELAYCOMP)
29 #define ARRAYDELAYWRAP(x) x
30 #define ARRAYDELAYWRAP1(x) ,x
31 #else
32 #define ARRAYDELAYWRAP(x)
33 #define ARRAYDELAYWRAP1(x)
34 #endif
35
36 #ifdef DUALVIEW
37 #define DUALVIEWWRAP(x) x
38 #else
39 #define DUALVIEWWRAP(x)
40 #endif
41
42 /* ================================================================
43  * transCommit
44  * - This function initiates the transaction commit process
45  * - goes through the transaction cache and decides
46  * - a final response
47  * ================================================================
48  */
49
50 #ifdef DELAYCOMP
51 int transCommit(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
52 #else
53 int transCommit() {
54 #endif
55 #ifdef SANDBOX
56   abortenabled=0;
57 #endif
58   TRANSWRAP(numTransCommit++;);
59   int softaborted=0;
60   do {
61     /* Look through all the objects in the transaction hash table */
62     int finalResponse;
63 #ifdef DELAYCOMP
64     if (c_numelements<(c_size>>3))
65       finalResponse=alttraverseCache(commitmethod, primitives, locals, params);
66     else
67       finalResponse=traverseCache(commitmethod, primitives, locals, params);
68 #else
69     if (c_numelements<(c_size>>3))
70       finalResponse=alttraverseCache();
71     else
72       finalResponse=traverseCache();
73 #endif
74     if(finalResponse == TRANS_ABORT) {
75       TRANSWRAP(numTransAbort++;if (softaborted) nSoftAbortAbort++;);
76       freenewobjs();
77       STMWRAP(freelockedobjs(););
78       objstrReset();
79       t_chashreset();
80 #ifdef READSET
81       rd_t_chashreset();
82 #endif
83 #ifdef DELAYCOMP
84       dc_t_chashreset();
85       ptrstack.count=0;
86       primstack.count=0;
87       branchstack.count=0;
88 #if defined(STMARRAY)&&!defined(DUALVIEW)
89       arraystack.count=0;
90 #endif
91 #endif
92 #ifdef SANDBOX
93       abortenabled=1;
94 #endif
95
96       return TRANS_ABORT;
97     }
98     if(finalResponse == TRANS_COMMIT) {
99       TRANSWRAP(numTransCommit++;if (softaborted) nSoftAbortCommit++;);
100       freenewobjs();
101       STMWRAP(freelockedobjs(););
102       objstrReset();
103       t_chashreset();
104 #ifdef READSET
105       rd_t_chashreset();
106 #endif
107 #ifdef DELAYCOMP
108       dc_t_chashreset();
109       ptrstack.count=0;
110       primstack.count=0;
111       branchstack.count=0;
112 #if defined(STMARRAY)&&!defined(DUALVIEW)
113       arraystack.count=0;
114 #endif
115 #endif
116       return 0;
117     }
118
119     /* wait a random amount of time before retrying to commit transaction*/
120     if(finalResponse == TRANS_SOFT_ABORT) {
121       TRANSWRAP(nSoftAbort++;);
122       softaborted++;
123 #ifdef SOFTABORT
124       if (softaborted>1) {
125 #else
126       if (1) {
127 #endif
128         //retry if too many soft aborts
129         freenewobjs();
130         STMWRAP(freelockedobjs(););
131         objstrReset();
132         t_chashreset();
133 #ifdef READSET
134         rd_t_chashreset();
135 #endif
136 #ifdef DELAYCOMP
137         dc_t_chashreset();
138         ptrstack.count=0;
139         primstack.count=0;
140         branchstack.count=0;
141 #if defined(STMARRAY)&&!defined(DUALVIEW)
142       arraystack.count=0;
143 #endif
144 #endif
145         return TRANS_ABORT;
146       }
147     } else {
148       printf("Error: in %s() Unknown outcome", __func__);
149       exit(-1);
150     }
151   } while (1);
152 }
153
154 #ifdef DELAYCOMP
155 #define freearrays if (c_numelements>=200) { \
156     STATFREE;                                \
157     STMARRAYFREE;                            \
158     free(oidrdlocked);                       \
159     free(oidrdversion);                      \
160   }                                          \
161   if (t_numelements>=200) {                  \
162     free(oidwrlocked);                       \
163     STMARRAYDELAYFREE;                       \
164   }
165 #else
166 #define freearrays   if (c_numelements>=200) { \
167     STATFREE;                                  \
168     STMARRAYFREE;                              \
169     free(oidrdlocked);                         \
170     free(oidrdversion);                        \
171     free(oidwrlocked);                         \
172   }
173 #endif
174
175 #ifdef DELAYCOMP
176 #define allocarrays int t_numelements=c_numelements+dc_c_numelements;   \
177   if (t_numelements<200) {                                              \
178     oidwrlocked=(struct garbagelist *) &wrlocked;                       \
179     STMARRAYDELAYASSIGN;                                                \
180   } else {                                                              \
181     oidwrlocked=malloc(2*sizeof(INTPTR)+t_numelements*(sizeof(void *))); \
182     STMARRAYDELAYALLOC;                                                 \
183   }                                                                     \
184   if (c_numelements<200) {                                              \
185     oidrdlocked=rdlocked;                                               \
186     oidrdversion=rdversion;                                             \
187     STATASSIGN;                                                         \
188     STMARRAYASSIGN;                                                     \
189   } else {                                                              \
190     int size=c_numelements*sizeof(void*);                               \
191     oidrdlocked=malloc(size);                                           \
192     oidrdversion=malloc(size);                                          \
193     STATALLOC;                                                          \
194     STMARRAYALLOC;                                                      \
195   }                                                                     \
196   dirwrlocked=oidwrlocked->array;
197 #else
198 #define allocarrays if (c_numelements<200) {      \
199     oidrdlocked=rdlocked;                         \
200     oidrdversion=rdversion;                       \
201     oidwrlocked=(struct garbagelist *) &wrlocked; \
202     STATASSIGN;                                   \
203     STMARRAYASSIGN;                               \
204   } else {                                        \
205     int size=c_numelements*sizeof(void*);         \
206     oidrdlocked=malloc(size);                     \
207     oidrdversion=malloc(size);                    \
208     oidwrlocked=malloc(size+2*sizeof(INTPTR));    \
209     STATALLOC;                                    \
210     STMARRAYALLOC;                                \
211   }                                               \
212   dirwrlocked=oidwrlocked->array;
213 #endif
214
215 #ifdef STMSTATS
216 #define ABORTSTAT1 header->abortCount++;                                \
217   ObjSeqId = headeraddr->accessCount;                                   \
218   (typesCausingAbort[TYPE(header)]).numabort++;                         \
219   (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;           \
220   (typesCausingAbort[TYPE(header)]).numtrans+=1;                        \
221   objtypetraverse[TYPE(header)]=1;                                      \
222   if(getTotalAbortCount(i+1, size, (void *)(curr->next), numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse)) \
223     softabort=0;
224 #define ABORTSTAT2                                                      \
225   ObjSeqId = oidrdage[i];                                               \
226   header->abortCount++;                                                 \
227   (typesCausingAbort[TYPE(header)]).numabort++;                         \
228   (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;           \
229   (typesCausingAbort[TYPE(header)]).numtrans+=1;                        \
230   objtypetraverse[TYPE(header)]=1;                                      \
231   if (getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse)) \
232     softabort=0;
233 #define ABORTSTAT3                                                      \
234   header->abortCount++;                                                 \
235   ObjSeqId = headeraddr->accessCount;                                   \
236   (typesCausingAbort[TYPE(header)]).numabort++;                         \
237   (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;           \
238   (typesCausingAbort[TYPE(header)]).numtrans+=1;                        \
239   objtypetraverse[TYPE(header)]=1;                                      \
240   if (getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse)) \
241     softabort=0;
242 #define ABORTSTAT4 ObjSeqId = oidrdage[i];                              \
243   header->abortCount++;                                                 \
244   getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse);
245 #else
246 #define ABORTSTAT1
247 #define ABORTSTAT2
248 #define ABORTSTAT3
249 #define ABORTSTAT4
250 #endif
251
252 #ifdef DELAYCOMP
253 #define DELAYWRAP(x) x
254 #define NUMWRTOTAL numoidwrtotal
255 #else
256 #define DELAYWRAP(x)
257 #define NUMWRTOTAL numoidwrlocked
258 #endif
259
260 #if defined(STMARRAY)
261 #define STMARRAYFREE free(oidrdlockedarray);
262 #define STMARRAYALLOC oidrdlockedarray=malloc(size);
263 #define STMARRAYASSIGN oidrdlockedarray=rdlockedarray;
264
265 //allocation statements for dirwrindex
266 #define STMARRAYDELAYFREE free(dirwrindex);
267 #define STMARRAYDELAYALLOC dirwrindex=malloc(t_numelements*sizeof(int));
268 #define STMARRAYDELAYASSIGN dirwrindex=wrindex;
269
270 #define ARRAYDEFINES int numoidrdlockedarray=0; \
271   void * rdlockedarray[200];                    \
272   void ** oidrdlockedarray;
273
274 #define ABORT                                   \
275   transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked)); \
276   freearrays;                                                           \
277   if (softabort)                                                        \
278     return TRANS_SOFT_ABORT;                                            \
279   else                                                                  \
280     return TRANS_ABORT;
281   
282 #define ABORTREAD                                                       \
283   transAbortProcess(oidwrlocked, numoidwrtotal ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
284   freearrays;                                                           \
285   if (softabort)                                                        \
286     return TRANS_SOFT_ABORT;                                            \
287   else                                                                  \
288     return TRANS_ABORT;
289
290
291 #define ARRAYABORT                                                      \
292   for(;j>=lowoffset;j--) {                                              \
293     GETLOCKVAL(status, transao, j);                                     \
294     if  (status==STMDIRTY) {                                            \
295       GETLOCKPTR(lockptr, mainao,j);                                    \
296       write_unlock(lockptr);                                            \
297     }                                                                   \
298   }                                                                     \
299   ABORT
300
301 #ifdef DUALVIEW
302 #define DVGETLOCK(x) if (!addwrobject) {                                \
303     unsigned int * objlock=&(&((objheader_t *)x)[-1])->lock;            \
304     if(!rwread_trylock(objlock)) {                                      \
305       ABORT;                                                            \
306     }                                                                   \
307 }
308
309 #define DVRELEASELOCK(x) {                                              \
310     unsigned int * objlock=&(&((objheader_t *)x)[-1])->lock;            \
311     rwread_unlock(objlock);                                             \
312   }
313
314 //not finished...if we can't get the lock, it is okay if it is in our access set
315 #define DVCHECKLOCK(x)                                                  \
316   unsigned int * objlock=&(&((objheader_t *)x)[-1])->lock;              \
317   if (objlock<=0) {                                                     \
318     if (dc_t_chashSearch(x)==NULL) {                                    \
319       ABORTREAD;                                                        \
320     }                                                                   \
321   }
322 #else
323 #define DVGETLOCK(x)
324 #define DVCHECKLOCK(x)
325 #define DVRELEASELOCK(x)
326 #endif
327
328 #if defined(DELAYCOMP)&&!defined(DUALVIEW)
329 #define READCHECK                                                       \
330   else if (dc_t_chashSearchArray(mainao,j)) {                           \
331     CFENCE;                                                             \
332     unsigned int localversion;                                          \
333     unsigned int remoteversion;                                         \
334     GETVERSIONVAL(localversion, transao, j);                            \
335     GETVERSIONVAL(remoteversion, mainao, j);                            \
336     if (localversion != remoteversion) {                                \
337       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
338       freearrays;                                                       \
339       return TRANS_ABORT;                                               \
340     }                                                                   \
341   }
342 #else
343 #define READCHECK
344 #endif
345
346 #define PROCESSARRAY                                                    \
347   int type=((int *)cachedobj)[0];                                       \
348   if (type>=NUMCLASSES) {                                               \
349     struct ArrayObject *transao=(struct ArrayObject *) cachedobj;       \
350     struct ArrayObject *mainao=(struct ArrayObject *) objptr;           \
351     int lowoffset=(transao->lowindex);                                  \
352     int highoffset=(transao->highindex);                                \
353     int j;                                                              \
354     int addwrobject=0, addrdobject=0;                                   \
355     for(j=lowoffset; j<=highoffset;j++) {                               \
356       unsigned int status;                                              \
357       GETLOCKVAL(status, transao, j);                                   \
358       if (status==STMDIRTY) {                                           \
359         DVGETLOCK(mainao);                                              \
360         unsigned int * lockptr;                                         \
361         GETLOCKPTR(lockptr, mainao,j);                                  \
362         if (likely(write_trylock(lockptr))) {                           \
363           unsigned int localversion;                                    \
364           unsigned int remoteversion;                                   \
365           GETVERSIONVAL(localversion, transao, j);                      \
366           GETVERSIONVAL(remoteversion, mainao, j);                      \
367           if (likely(localversion == remoteversion)) {                  \
368             addwrobject=1;                                              \
369           } else {                                                      \
370             DVRELEASELOCK(mainao);                                      \
371             ARRAYABORT;                                                 \
372           }                                                             \
373         } else {                                                        \
374           j--;                                                          \
375           DVRELEASELOCK(mainao);                                        \
376           ARRAYABORT;                                                   \
377         }                                                               \
378       } else if (status==STMCLEAN) {                                    \
379         addrdobject=1;                                                  \
380       }                                                                 \
381     }                                                                   \
382     if (addwrobject) {                                                  \
383       dirwrlocked[numoidwrlocked++] = objptr;                           \
384     }                                                                   \
385     if (addrdobject) {                                                  \
386       oidrdlockedarray[numoidrdlockedarray++]=objptr;                   \
387     }                                                                   \
388   } else
389
390
391 #ifdef DUALVIEW 
392 #define QUICKCHECK {                                                    \
393     objheader_t * head=&((objheader_t *)mainao)[-1];                    \
394     if (head->lock==RW_LOCK_BIAS&&                                      \
395         ((objheader_t *)transao)[-1].version==head->version)            \
396       continue;                                                         \
397   }
398 #else
399 #define QUICKCHECK
400 #endif
401
402 #define READARRAYS                                                      \
403   for(i=0; i<numoidrdlockedarray; i++) {                                \
404     struct ArrayObject * transao=(struct ArrayObject *) oidrdlockedarray[i]; \
405     struct ArrayObject * mainao=(struct ArrayObject *) transao->___objlocation___; \
406     QUICKCHECK;                                                         \
407     DVCHECKLOCK(mainao);                                                \
408     int lowoffset=(transao->lowindex);                                  \
409     int highoffset=(transao->highindex);                                \
410     int j;                                                              \
411     for(j=lowoffset; j<=highoffset;j++) {                               \
412       unsigned int locallock;GETLOCKVAL(locallock,transao,j);           \
413       if (locallock==STMCLEAN) {                                        \
414         /* do read check */                                             \
415         unsigned int mainlock; GETLOCKVAL(mainlock, mainao, j);         \
416         if (mainlock>0) {                                               \
417           CFENCE;                                                       \
418           unsigned int localversion;                                    \
419           unsigned int remoteversion;                                   \
420           GETVERSIONVAL(localversion, transao, j);                      \
421           GETVERSIONVAL(remoteversion, mainao, j);                      \
422           if (localversion != remoteversion) {                          \
423             transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
424             freearrays;                                                 \
425             return TRANS_ABORT;                                         \
426           }                                                             \
427         }                                                               \
428         READCHECK                                                       \
429         else {                                                          \
430           unsigned int localversion;                                    \
431           unsigned int remoteversion;                                   \
432           GETVERSIONVAL(localversion, transao, j);                      \
433           GETVERSIONVAL(remoteversion, mainao, j);                      \
434           if (localversion==remoteversion)                              \
435             softabort=1;                                                \
436           transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
437           freearrays;                                                   \
438           if (softabort)                                                \
439             return TRANS_SOFT_ABORT;                                    \
440           else                                                          \
441             return TRANS_ABORT;                                         \
442         }                                                               \
443       }                                                                 \
444     }                                                                   \
445   }
446 #else
447 #define ARRAYDEFINES
448 #define PROCESSARRAY
449 #define READARRAYS
450 #define STMARRAYFREE
451 #define STMARRAYALLOC
452 #define STMARRAYASSIGN
453 #define STMARRAYDELAYFREE
454 #define STMARRAYDELAYALLOC
455 #define STMARRAYDELAYASSIGN
456 #endif
457
458 #ifdef DELAYCOMP
459 #if defined(STMARRAY)&&!defined(DUALVIEW)
460 #define ARRAYLOCK                                                       \
461   int intkey=dc_curr->intkey;                                           \
462   if (intkey!=-1) {                                                     \
463     unsigned int *lockptr;                                              \
464     GETLOCKPTR(lockptr, objptr, intkey);                                \
465     if (likely(write_trylock(lockptr))) {                               \
466       /*have lock on element */                                         \
467       dirwrlocked[numoidwrtotal]=objptr;                                \
468       dirwrindex[numoidwrtotal++]=intkey;                               \
469     } else {                                                            \
470       unsigned int lockval;                                             \
471       GETLOCKVAL(lockval, valptr, intkey);                              \
472       if (lockval!=STMDIRTY) {                                          \
473         /*have to abort to avoid deadlock*/                             \
474         transAbortProcess(oidwrlocked, numoidwrtotal, dirwrindex, numoidwrlocked); \
475         ABORTSTAT1;                                                     \
476         freearrays;                                                     \
477         if (softabort)                                                  \
478           return TRANS_SOFT_ABORT;                                      \
479         else                                                            \
480           return TRANS_ABORT;                                           \
481       }                                                                 \
482     }                                                                   \
483   } else
484
485 #elif defined(STMARRAY)&&defined(DUALVIEW)
486 #define ARRAYLOCK                                                       \
487   if (((struct ___Object___ *)objptr)->type>=NUMCLASSES) {              \
488     if (likely(rwwrite_trylock(&header->lock))) {                       \
489       dirwrindex[numoidwrtotal]=0;                                      \
490       dirwrlocked[numoidwrtotal++] = objptr;                            \
491     } else {                                                            \
492       chashlistnode_t *node = &c_table[(((unsigned INTPTR)objptr) & c_mask)>>4]; \
493       do {                                                              \
494         if(node->key == objptr) {                                       \
495           objheader_t * headeraddr=&((objheader_t *) node->val)[-1];    \
496           if(STATUS(headeraddr) & DIRTY) {                              \
497             if (likely(rwconvert_trylock(&header->lock))) {             \
498               dirwrindex[numoidwrtotal]=1;                              \
499               dirwrlocked[numoidwrtotal++] = objptr;                    \
500               goto nextloop;                                            \
501             }                                                           \
502           }                                                             \
503           break;                                                        \
504         }                                                               \
505         node = node->next;                                              \
506       } while(node != NULL);                                            \
507       ABORTREAD;                                                        \
508     }                                                                   \
509   } else
510 #else
511 #define ARRAYLOCK
512 #endif
513
514
515 #define ACCESSLOCKS                                                     \
516   unsigned int numoidwrtotal=numoidwrlocked;                            \
517   dchashlistnode_t *dc_curr = dc_c_list;                                \
518   /* Inner loop to traverse the linked list of the cache lookupTable */ \
519   while(likely(dc_curr != NULL)) {                                      \
520     /*if the first bin in hash table is empty   */                      \
521     void *valptr=dc_curr->val;                                          \
522     objheader_t * headeraddr=&((objheader_t *) valptr)[-1];             \
523     void *objptr=dc_curr->key;                                          \
524     objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t)); \
525     ARRAYLOCK                                                           \
526     if(likely(write_trylock(&header->lock))) { /*can aquire write lock*/ \
527       ARRAYDELAYWRAP(dirwrindex[numoidwrtotal]=-1;)                     \
528       dirwrlocked[numoidwrtotal++] = objptr;                            \
529     } else {                                                            \
530       /* maybe we already have lock*/                                   \
531       chashlistnode_t *node = &c_table[(((unsigned INTPTR)objptr) & c_mask)>>4]; \
532                                                                         \
533       do {                                                              \
534         if(node->key == objptr) {                                       \
535           objheader_t * headeraddr=&((objheader_t *) node->val)[-1];    \
536           if(STATUS(headeraddr) & DIRTY) {                              \
537             goto nextloop;                                              \
538           } else                                                        \
539             break;                                                      \
540         }                                                               \
541         node = node->next;                                              \
542       } while(node != NULL);                                            \
543                                                                         \
544       /*have to abort to avoid deadlock */                              \
545       transAbortProcess(oidwrlocked, numoidwrtotal ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked)); \
546       ABORTSTAT1;                                                       \
547       freearrays;                                                       \
548       if (softabort)                                                    \
549         return TRANS_SOFT_ABORT;                                        \
550       else                                                              \
551         return TRANS_ABORT;                                             \
552     }                                                                   \
553   nextloop:                                                             \
554     dc_curr = dc_curr->lnext;                                           \
555   }                                                     
556 #else
557 #define ACCESSLOCKS
558 #endif
559
560 /* ==================================================
561  * traverseCache
562  * - goes through the transaction cache and
563  * - decides if a transaction should commit or abort
564  * ==================================================
565  */
566
567 #ifdef DELAYCOMP
568 int traverseCache(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
569 #else
570 int traverseCache() {
571 #endif
572   /* Create info to keep track of objects that can be locked */
573   int numoidrdlocked=0;
574   int numoidwrlocked=0;
575   void * rdlocked[200];
576   int rdversion[200];
577   struct fixedlist wrlocked;
578   int softabort=0;
579   int i;
580   void ** oidrdlocked;
581   int * oidrdversion;
582   ARRAYDEFINES;
583   STMWRAP(int rdage[200];int * oidrdage;int ObjSeqId;int objtypetraverse[TOTALNUMCLASSANDARRAY];);
584   struct garbagelist * oidwrlocked;
585   void ** dirwrlocked;
586 #if defined(STMARRAY)&&defined(DELAYCOMP)
587   int wrindex[200];
588   int * dirwrindex;
589 #endif
590   allocarrays;
591
592   STMWRAP(for(i=0; i<TOTALNUMCLASSANDARRAY; i++) objtypetraverse[i] = 0;);
593
594   chashlistnode_t *ptr = c_table;
595   /* Represents number of bins in the chash table */
596   unsigned int size = c_size;
597   for(i = 0; i<size; i++) {
598     chashlistnode_t *curr = &ptr[i];
599     /* Inner loop to traverse the linked list of the cache lookupTable */
600     while(curr != NULL) {
601       //if the first bin in hash table is empty
602       if(curr->key == NULL)
603         break;
604       objheader_t * cachedobj=curr->val;
605       objheader_t * headeraddr=&((objheader_t *) cachedobj)[-1]; //cached object
606       void * objptr=curr->key;
607       objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t)); //real object
608       unsigned int version = headeraddr->version;
609
610       if(STATUS(headeraddr) & DIRTY) {
611         PROCESSARRAY
612         /* Read from the main heap  and compare versions */
613         if(likely(write_trylock(&header->lock))) { //can aquire write lock
614           if (likely(version == header->version)) { /* versions match */
615             /* Keep track of objects locked */
616             dirwrlocked[numoidwrlocked++] = objptr;
617           } else {
618             dirwrlocked[numoidwrlocked++] = objptr;
619             transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked));
620             ABORTSTAT1;
621             freearrays;
622             if (softabort)
623               return TRANS_SOFT_ABORT;
624             else 
625               return TRANS_ABORT;
626           }
627         } else {
628           if(version == header->version) {
629             /* versions match */
630             softabort=1;
631           }
632           transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked));
633           ABORTSTAT1;
634           freearrays;
635           if (softabort)
636             return TRANS_SOFT_ABORT;
637           else 
638             return TRANS_ABORT;
639       
640         }
641       } else {
642         STMWRAP(oidrdage[numoidrdlocked]=headeraddr->accessCount;);
643         oidrdversion[numoidrdlocked]=version;
644         oidrdlocked[numoidrdlocked++]=header;
645       }
646       curr = curr->next;
647     }
648   } //end of for
649   
650   ACCESSLOCKS;
651
652   //THIS IS THE SERIALIZATION END POINT (START POINT IS END OF EXECUTION)*****
653   READARRAYS;
654
655   for(i=0; i<numoidrdlocked; i++) {
656     /* Read from the main heap  and compare versions */
657     objheader_t *header=oidrdlocked[i];
658     unsigned int version=oidrdversion[i];
659     if(header->lock>0) { //not write locked
660       CFENCE;
661       if(version != header->version) { /* versions do not match */
662         transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
663         ABORTSTAT2;
664         freearrays;
665         return TRANS_ABORT;
666       }
667 #if DELAYCOMP
668     } else if (dc_t_chashSearch(((char *)header)+sizeof(objheader_t))!=NULL) {
669       //couldn't get lock because we already have it
670       //check if it is the right version number
671       if (version!=header->version) {
672         transAbortProcess(oidwrlocked, numoidwrtotal ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
673         ABORTSTAT2;
674         freearrays;
675         return TRANS_ABORT;
676       }
677 #endif
678     } else { /* cannot aquire lock */
679       //do increment as we didn't get lock
680       if(version == header->version) {
681         softabort=1;
682       }
683       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
684       ABORTSTAT2;
685       freearrays;
686       if (softabort)
687         return TRANS_SOFT_ABORT;
688       else 
689         return TRANS_ABORT;
690     }
691   }
692
693 #ifdef READSET
694   //need to validate auxilary readset
695   rdchashlistnode_t *rd_curr = rd_c_list;
696   /* Inner loop to traverse the linked list of the cache lookupTable */
697   while(likely(rd_curr != NULL)) {
698     //if the first bin in hash table is empty
699     unsigned int version=rd_curr->version;
700     struct ___Object___ * objptr=rd_curr->key;
701     objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t));
702 #ifdef STMARRAY
703     int isobject=objptr->type<NUMCLASSES;
704     if(likely((isobject&&header->lock>0)||(!isobject&&header->lock==RW_LOCK_BIAS))) {
705 #else
706     if(header->lock>0) {
707 #endif
708       //object is not locked
709       if (unlikely(version!=header->version)) {
710         //have to abort
711         transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
712         STMWRAP((typesCausingAbort[TYPE(header)])++;);
713         freearrays;
714         if (softabort)
715           return TRANS_SOFT_ABORT;
716         else
717           return TRANS_ABORT;
718       }
719     } else {
720       //maybe we already have lock
721       if (likely(version==header->version)) {
722         void * key=rd_curr->key;
723 #ifdef DELAYCOMP
724         //check to see if it is in the delaycomp table
725         {
726           dchashlistnode_t *node = &dc_c_table[(((unsigned INTPTR)key) & dc_c_mask)>>4];
727           do {
728             if(node->key == key) {
729               goto nextloopread;
730             }
731             node = node->next;
732           } while(node != NULL);
733         }
734 #endif
735         //check normal table
736 #ifdef STMARRAY
737       if (likely(isobject||header->lock==(RW_LOCK_BIAS-1))) {
738 #else
739         {
740 #endif
741           chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
742           do {
743             if(node->key == key) {
744               objheader_t * headeraddr=&((objheader_t *) node->val)[-1];          
745               if(STATUS(headeraddr) & DIRTY) {
746                 goto nextloopread;
747               }
748             }
749             node = node->next;
750           } while(node != NULL);
751         }
752       }
753       //have to abort to avoid deadlock
754       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
755       STMWRAP((typesCausingAbort[TYPE(header)])++;);
756       freearrays;
757       if (softabort)
758         return TRANS_SOFT_ABORT;
759       else
760         return TRANS_ABORT;
761     }
762   nextloopread:
763     rd_curr = rd_curr->lnext;
764   }
765 #endif
766   
767   /* Decide the final response */
768 #ifdef DELAYCOMP
769   transCommitProcess(oidwrlocked ARRAYDELAYWRAP1(dirwrindex), numoidwrlocked, numoidwrtotal, commitmethod, primitives, locals, params);
770 #else
771   transCommitProcess(oidwrlocked, numoidwrlocked);
772 #endif
773   freearrays;
774   return TRANS_COMMIT;
775 }
776
777 /* ==================================================
778  * alttraverseCache
779  * - goes through the transaction cache and
780  * - decides if a transaction should commit or abort
781  * ==================================================
782  */
783
784 #ifdef DELAYCOMP
785 int alttraverseCache(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
786 #else
787 int alttraverseCache() {
788 #endif
789   /* Create info to keep track of objects that can be locked */
790   int numoidrdlocked=0;
791   int numoidwrlocked=0;
792   void * rdlocked[200];
793   int rdversion[200];
794   struct fixedlist wrlocked;
795   int softabort=0;
796   int i;
797   void ** oidrdlocked;
798   int * oidrdversion;
799   STMWRAP(int rdage[200];int * oidrdage;int ObjSeqId;int objtypetraverse[TOTALNUMCLASSANDARRAY];);
800   ARRAYDEFINES;
801   struct garbagelist * oidwrlocked;
802   void ** dirwrlocked;
803 #if defined(STMARRAY)&&defined(DELAYCOMP)
804   int wrindex[200];
805   int * dirwrindex;
806 #endif
807   allocarrays;
808
809   STMWRAP(for(i=0; i<TOTALNUMCLASSANDARRAY; i++) objtypetraverse[i] = 0;);
810
811   chashlistnode_t *curr = c_list;
812   /* Inner loop to traverse the linked list of the cache lookupTable */
813   while(likely(curr != NULL)) {
814     //if the first bin in hash table is empty
815       objheader_t * cachedobj=curr->val;
816     objheader_t * headeraddr=&((objheader_t *) cachedobj)[-1];
817     void *objptr=curr->key;
818     objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t));
819     unsigned int version = headeraddr->version;
820
821     if(STATUS(headeraddr) & DIRTY) {
822       PROCESSARRAY
823       /* Read from the main heap  and compare versions */
824       if(likely(write_trylock(&header->lock))) { //can aquire write lock
825         if (likely(version == header->version)) { /* versions match */
826           /* Keep track of objects locked */
827           dirwrlocked[numoidwrlocked++] = objptr;
828         } else {
829           dirwrlocked[numoidwrlocked++] = objptr;
830           transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked));
831           ABORTSTAT3;
832           freearrays;
833           return TRANS_ABORT;
834         }
835       } else { /* cannot aquire lock */
836         if(version == header->version) {
837           /* versions match */
838           softabort=1;
839         }
840         transAbortProcess(oidwrlocked, numoidwrlocked ARRAYDELAYWRAP1(NULL) ARRAYDELAYWRAP1(numoidwrlocked));
841         ABORTSTAT3;
842         freearrays;
843         if (softabort)
844           return TRANS_SOFT_ABORT;
845         else 
846           return TRANS_ABORT;
847       }
848     } else {
849       /* Read from the main heap  and compare versions */
850       oidrdversion[numoidrdlocked]=version;
851       oidrdlocked[numoidrdlocked++] = header;
852     }
853     curr = curr->lnext;
854   }
855
856   ACCESSLOCKS;
857
858   //THIS IS THE SERIALIZATION END POINT (START POINT IS END OF EXECUTION)*****
859   READARRAYS;
860
861   for(i=0; i<numoidrdlocked; i++) {
862     objheader_t * header=oidrdlocked[i];
863     unsigned int version=oidrdversion[i];
864     if(likely(header->lock>0)) {
865       CFENCE;
866       if(unlikely(version != header->version)) {
867         transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
868         ABORTSTAT2;
869         freearrays;
870         return TRANS_ABORT;
871       }
872 #ifdef DELAYCOMP
873     } else if (dc_t_chashSearch(((char *)header)+sizeof(objheader_t))!=NULL) {
874       //couldn't get lock because we already have it
875       //check if it is the right version number
876       if (version!=header->version) {
877         transAbortProcess(oidwrlocked, numoidwrtotal ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
878         ABORTSTAT4;
879         freearrays;
880         return TRANS_ABORT;
881       }
882 #endif
883     } else { /* cannot aquire lock */
884       if(version == header->version) {
885         softabort=1;
886       }
887       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
888       ABORTSTAT2;
889       freearrays;
890       if (softabort)
891         return TRANS_SOFT_ABORT;
892       else 
893         return TRANS_ABORT;
894     }
895   }
896
897 #ifdef READSET
898   //need to validate auxilary readset
899   rdchashlistnode_t *rd_curr = rd_c_list;
900   /* Inner loop to traverse the linked list of the cache lookupTable */
901   while(likely(rd_curr != NULL)) {
902     //if the first bin in hash table is empty
903     int version=rd_curr->version;
904     struct ___Object___ * objptr=rd_curr->key;
905     objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t));
906 #ifdef STMARRAY    
907     int isobject=objptr->type<NUMCLASSES;
908     if(likely((isobject&&header->lock>0)||(!isobject&&header->lock==RW_LOCK_BIAS))) {
909 #else
910     if(likely(header->lock>0)) { //object is not locked
911 #endif
912       if (unlikely(version!=header->version)) {
913         //have to abort
914         transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
915         STMWRAP((typesCausingAbort[TYPE(header)])++;);
916         freearrays;
917         if (softabort)
918           return TRANS_SOFT_ABORT;
919         else
920           return TRANS_ABORT;   
921       }
922     } else {
923       if (version==header->version) {
924         void * key=rd_curr->key;
925 #ifdef DELAYCOMP
926         //check to see if it is in the delaycomp table
927         {
928           dchashlistnode_t *node = &dc_c_table[(((unsigned INTPTR)key) & dc_c_mask)>>4];
929           do {
930             if(node->key == key)
931               goto nextloopread;
932             node = node->next;
933           } while(node != NULL);
934         }
935 #endif
936         //check normal table
937 #ifdef STMARRAY
938         if (likely(isobject||header->lock==(RW_LOCK_BIAS-1))) { 
939 #else
940           {
941 #endif
942           chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
943           do {
944             if(node->key == key) {
945               objheader_t * headeraddr=&((objheader_t *) node->val)[-1];          
946               if(STATUS(headeraddr) & DIRTY) {
947                 goto nextloopread;
948               }
949             }
950             node = node->next;
951           } while(node != NULL);
952         }
953       }
954       //have to abort to avoid deadlock
955       transAbortProcess(oidwrlocked, NUMWRTOTAL ARRAYDELAYWRAP1(dirwrindex) ARRAYDELAYWRAP1(numoidwrlocked));
956       STMWRAP((typesCausingAbort[TYPE(header)])++;);
957       freearrays;
958       if (softabort)
959         return TRANS_SOFT_ABORT;
960       else
961         return TRANS_ABORT;
962     }
963   nextloopread:
964     rd_curr = rd_curr->lnext;
965   }
966 #endif
967
968   /* Decide the final response */
969 #ifdef DELAYCOMP
970   transCommitProcess(oidwrlocked ARRAYDELAYWRAP1(dirwrindex), numoidwrlocked, numoidwrtotal, commitmethod, primitives, locals, params);
971 #else
972   transCommitProcess(oidwrlocked, numoidwrlocked);
973 #endif
974   freearrays;
975   return TRANS_COMMIT;
976 }
977
978 /* ==================================
979  * transAbortProcess
980  *
981  * =================================
982  */
983
984 int logflag=1;
985  
986 #if defined(STMARRAY)&&defined(DELAYCOMP)
987 void transAbortProcess(struct garbagelist *oidwrlocked, int numoidwrtotal, int * dirwrindex, int numoidwrlocked) {
988 #else
989 void transAbortProcess(struct garbagelist *oidwrlocked, int numoidwrlocked) {
990 #endif
991   int i;
992   objheader_t *header;
993   /* Release read locks */
994   void ** dirwrlocked=oidwrlocked->array;
995   /* Release write locks */
996   for(i=numoidwrlocked-1; i>=0; i--) {
997     /* Read from the main heap */
998     struct ___Object___ * dst=dirwrlocked[i];
999     header = &((objheader_t *)dst)[-1];
1000 #ifdef STMARRAY
1001     int type=dst->type;
1002     if (type>=NUMCLASSES) {
1003       //have array, do unlocking of bins
1004       struct ArrayObject *src=(struct ArrayObject *)t_chashSearch(dst);
1005       int lowoffset=(src->lowindex);
1006       int highoffset=(src->highindex);
1007       int j;
1008       int addwrobject=0, addrdobject=0;
1009       for(j=lowoffset; j<=highoffset;j++) {
1010         unsigned int status;
1011         GETLOCKVAL(status, src, j);
1012         if (status==STMDIRTY) {
1013           unsigned int *lockptr;
1014           GETLOCKPTR(lockptr, ((struct ArrayObject *)dst), j);
1015           write_unlock(lockptr);
1016         }
1017       }
1018 #ifdef DUALVIEW
1019       //release object array lock
1020       rwread_unlock(&header->lock);
1021 #endif
1022     } else
1023 #endif
1024     write_unlock(&header->lock);
1025   }
1026 #if defined(STMARRAY)&&defined(DELAYCOMP)&&!defined(DUALVIEW)
1027   //release access locks
1028   for(i=numoidwrtotal-1; i>=numoidwrlocked; i--) {
1029     struct ___Object___ * dst=dirwrlocked[i];
1030     header = &((objheader_t *)dst)[-1];
1031     int wrindex=dirwrindex[i];
1032     if (wrindex==-1) {
1033       //normal object
1034       write_unlock(&header->lock);
1035     } else {
1036       //array element
1037       unsigned int *intptr;
1038       GETLOCKPTR(intptr, ((struct ArrayObject *)dst), wrindex);
1039       write_unlock(intptr);
1040     }
1041   }
1042 #endif
1043 #if defined(STMARRAY)&&defined(DELAYCOMP)&&defined(DUALVIEW)
1044   //release access locks
1045   for(i=numoidwrtotal-1; i>=numoidwrlocked; i--) {
1046     struct ___Object___ * dst=dirwrlocked[i];
1047     header = &((objheader_t *)dst)[-1];
1048     int wrindex=dirwrindex[i];
1049     if (wrindex==-1) {
1050       //normal object
1051       write_unlock(&header->lock);
1052     } else if (wrindex==0) {
1053       //array element
1054       rwwrite_unlock(&header->lock);
1055     } else {
1056       rwconvert_unlock(&header->lock);
1057     }
1058   }
1059 #endif
1060 #ifdef STMSTATS
1061   /* clear trec and then release objects locked */
1062   struct objlist *ptr=lockedobjs;
1063   while(ptr!=NULL) {
1064     int max=ptr->offset;
1065     for(i=max-1; i>=0; i--) {
1066       header = (objheader_t *)ptr->objs[i];
1067       header->trec = NULL;
1068       pthread_mutex_unlock(header->objlock);
1069     }
1070     ptr=ptr->next;
1071   }
1072 #endif
1073 }
1074
1075 /* ==================================
1076  * transCommitProcess
1077  *
1078  * =================================
1079  */
1080 #ifdef DELAYCOMP
1081 void transCommitProcess(struct garbagelist * oidwrlocked ARRAYDELAYWRAP1(int * dirwrindex), int numoidwrlocked, int numoidwrtotal, void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
1082 #else
1083 void transCommitProcess(struct garbagelist * oidwrlocked, int numoidwrlocked) {
1084 #endif
1085   objheader_t *header;
1086   void *ptrcreate;
1087   int i;
1088   struct objlist *ptr=newobjs;
1089   void **dirwrlocked=oidwrlocked->array;
1090   while(ptr!=NULL) {
1091     int max=ptr->offset;
1092     for(i=0; i<max; i++) {
1093       //clear the new flag
1094       ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
1095     }
1096     ptr=ptr->next;
1097   }
1098
1099   /* Copy from transaction cache -> main object store */
1100   for (i = numoidwrlocked-1; i >=0; i--) {
1101     /* Read from the main heap */
1102     header = &((objheader_t *)dirwrlocked[i])[-1];
1103     int tmpsize;
1104     GETSIZE(tmpsize, header);
1105     struct ___Object___ *dst=(struct ___Object___*)dirwrlocked[i];
1106     struct ___Object___ *src=t_chashSearch(dst);
1107     dst->___cachedCode___=src->___cachedCode___;
1108     dst->___cachedHash___=src->___cachedHash___;
1109 #ifdef STMARRAY
1110     int type=dst->type;
1111     if (type>=NUMCLASSES) {
1112       //have array, do copying of bins
1113       int lowoffset=(((struct ArrayObject *)src)->lowindex);
1114       int highoffset=(((struct ArrayObject *)src)->highindex);
1115       int j;
1116       int addwrobject=0, addrdobject=0;
1117       int elementsize=classsize[type];
1118       int baseoffset=(lowoffset<<INDEXSHIFT)+sizeof(int)+((int)&(((struct ArrayObject *)0)->___length___));
1119       char *dstptr=((char *)dst)+baseoffset;
1120       char *srcptr=((char *)src)+baseoffset;
1121       for(j=lowoffset; j<=highoffset;j++, srcptr+=INDEXLENGTH,dstptr+=INDEXLENGTH) {
1122         unsigned int status;
1123         GETLOCKVAL(status, ((struct ArrayObject *)src), j);
1124         if (status==STMDIRTY) {
1125           A_memcpy(dstptr, srcptr, INDEXLENGTH);
1126         }
1127       }
1128     } else
1129 #endif 
1130       A_memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1131   }
1132   CFENCE;
1133
1134 #ifdef DELAYCOMP
1135   //  call commit method
1136   ptrstack.maxcount=0;
1137   primstack.count=0;
1138   branchstack.count=0;
1139 #if defined(STMARRAY)&&!defined(DUALVIEW)
1140   arraystack.maxcount=0;
1141 #endif
1142   //splice oidwrlocked in
1143   oidwrlocked->size=numoidwrtotal;
1144   oidwrlocked->next=params;
1145   ((struct garbagelist *)locals)->next=oidwrlocked;
1146   if (commitmethod!=NULL)
1147     commitmethod(params, locals, primitives);
1148   ((struct garbagelist *)locals)->next=params;
1149 #endif
1150
1151   /* Release write locks */
1152 #if defined(STMARRAY)&&defined(DELAYCOMP)
1153   for(i=numoidwrlocked-1; i>=0; i--) {
1154 #else
1155   for(i=NUMWRTOTAL-1; i>=0; i--) {
1156 #endif
1157     struct ___Object___ * dst=dirwrlocked[i];
1158     header = &((objheader_t *)dst)[-1];
1159 #ifdef STMARRAY
1160     int type=dst->type;
1161     if (type>=NUMCLASSES) {
1162       //have array, do unlocking of bins
1163       struct ArrayObject *src=(struct ArrayObject *)t_chashSearch(dst);
1164       int lowoffset=(src->lowindex);
1165       int highoffset=(src->highindex);
1166       int j;
1167       int addwrobject=0, addrdobject=0;
1168       for(j=lowoffset; j<=highoffset;j++) {
1169         unsigned int status;
1170         GETLOCKVAL(status, src, j);
1171         if (status==STMDIRTY) {
1172           unsigned int *intptr;
1173           GETVERSIONPTR(intptr, ((struct ArrayObject *)dst), j);
1174           (*intptr)++;
1175           GETLOCKPTR(intptr, ((struct ArrayObject *)dst), j);
1176           write_unlock(intptr);
1177         }
1178       }
1179       atomic_inc(&header->version);
1180 #ifdef DUALVIEW
1181       rwread_unlock(&header->lock);
1182 #endif
1183     } else
1184 #endif
1185     {
1186       header->version++;
1187       write_unlock(&header->lock);
1188     }
1189   }
1190 #if defined(STMARRAY)&&defined(DELAYCOMP)&&defined(DUALVIEW)
1191   //release access locks
1192   for(i=numoidwrtotal-1; i>=numoidwrlocked; i--) {
1193     struct ___Object___ * dst=dirwrlocked[i];
1194     int wrlock=dirwrindex[i];
1195     header = &((objheader_t *)dst)[-1];
1196     if (wrlock==-1) {
1197       header->version++;
1198       write_unlock(&header->lock);
1199     } else if (wrlock==0) {
1200       header->version++;
1201       rwwrite_unlock(&header->lock);
1202     } else {
1203       header->version++;
1204       rwconvert_unlock(&header->lock);
1205     }
1206   }
1207 #endif
1208 #if defined(STMARRAY)&&defined(DELAYCOMP)&&!defined(DUALVIEW)
1209   //release access locks
1210   for(i=numoidwrtotal-1; i>=numoidwrlocked; i--) {
1211     struct ___Object___ * dst=dirwrlocked[i];
1212     header = &((objheader_t *)dst)[-1];
1213     int wrindex=dirwrindex[i];
1214     if (wrindex==-1) {
1215       //normal object
1216       header->version++;
1217       write_unlock(&header->lock);
1218     } else {
1219       //array element
1220       unsigned int *intptr;
1221       atomic_inc(&header->version);
1222       GETVERSIONPTR(intptr, ((struct ArrayObject *)dst), wrindex);
1223       (*intptr)++;
1224       GETLOCKPTR(intptr, ((struct ArrayObject *)dst), wrindex);
1225       write_unlock(intptr);
1226     }
1227   }
1228 #endif
1229 #ifdef STMSTATS
1230   /* clear trec and then release objects locked */
1231   ptr=lockedobjs;
1232   while(ptr!=NULL) {
1233     int max=ptr->offset;
1234     for(i=max-1; i>=0; i--) {
1235       header = (objheader_t *)ptr->objs[i];
1236       header->trec = NULL;
1237       pthread_mutex_unlock(header->objlock);
1238     }
1239     ptr=ptr->next;
1240   }
1241 #endif
1242 }