19 #define STATFREE free(oidrdage);
20 #define STATALLOC oidrdage=malloc(size);
21 #define STATASSIGN oidrdage=rdage;
28 /* ================================================================
30 * - This function initiates the transaction commit process
31 * - goes through the transaction cache and decides
33 * ================================================================
36 int transCommit(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
43 TRANSWRAP(numTransCommit++;);
46 /* Look through all the objects in the transaction hash table */
49 if (c_numelements<(c_size>>3))
50 finalResponse=alttraverseCache(commitmethod, primitives, locals, params);
52 finalResponse=traverseCache(commitmethod, primitives, locals, params);
54 if (c_numelements<(c_size>>3))
55 finalResponse=alttraverseCache();
57 finalResponse=traverseCache();
59 if(finalResponse == TRANS_ABORT) {
60 TRANSWRAP(numTransAbort++;if (softaborted) nSoftAbortAbort++;);
62 STMWRAP(freelockedobjs(););
79 if(finalResponse == TRANS_COMMIT) {
80 TRANSWRAP(numTransCommit++;if (softaborted) nSoftAbortCommit++;);
82 STMWRAP(freelockedobjs(););
97 /* wait a random amount of time before retrying to commit transaction*/
98 if(finalResponse == TRANS_SOFT_ABORT) {
99 TRANSWRAP(nSoftAbort++;);
106 //retry if too many soft aborts
108 STMWRAP(freelockedobjs(););
123 printf("Error: in %s() Unknown outcome", __func__);
130 #define freearrays if (c_numelements>=200) { \
134 free(oidrdversion); \
136 if (t_numelements>=200) { \
140 #define freearrays if (c_numelements>=200) { \
144 free(oidrdversion); \
150 #define allocarrays int t_numelements=c_numelements+dc_c_numelements; \
151 if (t_numelements<200) { \
152 oidwrlocked=(struct garbagelist *) &wrlocked; \
154 oidwrlocked=malloc(2*sizeof(INTPTR)+t_numelements*sizeof(void *)); \
156 if (c_numelements<200) { \
157 oidrdlocked=rdlocked; \
158 oidrdversion=rdversion; \
162 int size=c_numelements*sizeof(void*); \
163 oidrdlocked=malloc(size); \
164 oidrdversion=malloc(size); \
168 dirwrlocked=oidwrlocked->array;
170 #define allocarrays if (c_numelements<200) { \
171 oidrdlocked=rdlocked; \
172 oidrdversion=rdversion; \
173 oidwrlocked=(struct garbagelist *) &wrlocked; \
177 int size=c_numelements*sizeof(void*); \
178 oidrdlocked=malloc(size); \
179 oidrdversion=malloc(size); \
180 oidwrlocked=malloc(size+2*sizeof(INTPTR)); \
184 dirwrlocked=oidwrlocked->array;
188 #define ABORTSTAT1 header->abortCount++; \
189 ObjSeqId = headeraddr->accessCount; \
190 (typesCausingAbort[TYPE(header)]).numabort++; \
191 (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements; \
192 (typesCausingAbort[TYPE(header)]).numtrans+=1; \
193 objtypetraverse[TYPE(header)]=1; \
194 if(getTotalAbortCount(i+1, size, (void *)(curr->next), numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse)) \
197 ObjSeqId = oidrdage[i];\
198 header->abortCount++; \
199 (typesCausingAbort[TYPE(header)]).numabort++; \
200 (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements; \
201 (typesCausingAbort[TYPE(header)]).numtrans+=1; \
202 objtypetraverse[TYPE(header)]=1; \
203 if (getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse)) \
206 header->abortCount++; \
207 ObjSeqId = headeraddr->accessCount; \
208 (typesCausingAbort[TYPE(header)]).numabort++; \
209 (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements; \
210 (typesCausingAbort[TYPE(header)]).numtrans+=1; \
211 objtypetraverse[TYPE(header)]=1; \
212 if (getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse)) \
214 #define ABORTSTAT4 ObjSeqId = oidrdage[i]; \
215 header->abortCount++; \
216 getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse);
225 #define DELAYWRAP(x) x
226 #define NUMWRTOTAL numoidwrtotal
229 #define NUMWRTOTAL numoidwrlocked
233 #define STMARRAYFREE free(oidrdlockedarray);
234 #define STMARRAYALLOC oidrdlockedarray=malloc(size);
235 #define STMARRAYASSIGN oidrdlockedarray=rdlockedarray;
237 #define ARRAYDEFINES int numoidrdlockedarray=0; \
238 void * rdlockedarray[200]; \
239 void ** oidrdlockedarray;
242 for(;j>=lowoffset;j--) { \
243 GETLOCKVAL(status, transao, j); \
244 if (status==STMDIRTY) { \
245 GETLOCKPTR(lockptr, mainao,j); \
246 write_unlock(lockptr); \
249 transAbortProcess(oidwrlocked, numoidwrlocked); \
252 return TRANS_SOFT_ABORT; \
257 #define PROCESSARRAY \
258 int type=((int *)cachedobj)[0]; \
259 if (type>=NUMCLASSES) { \
260 struct ArrayObject *transao=(struct ArrayObject *) cachedobj; \
261 struct ArrayObject *mainao=(struct ArrayObject *) objptr; \
262 int lowoffset=(transao->lowindex)>>INDEXSHIFT; \
263 int highoffset=(transao->highindex)>>INDEXSHIFT; \
265 int addwrobject=0, addrdobject=0; \
266 for(j=lowoffset; j<=highoffset;j++) { \
267 unsigned int status; \
268 GETLOCKVAL(status, transao, j); \
269 if (status==STMDIRTY) { \
270 unsigned int * lockptr; \
271 GETLOCKPTR(lockptr, mainao,j); \
272 if (write_trylock(lockptr)) { \
273 unsigned int localversion; \
274 unsigned int remoteversion; \
275 GETVERSIONVAL(localversion, transao, j); \
276 GETVERSIONVAL(remoteversion, mainao, j); \
277 if (localversion == remoteversion) { \
286 } else if (status==STMCLEAN) { \
291 dirwrlocked[numoidwrlocked++] = objptr; \
294 rdlockedarray[numoidrdlockedarray++]=objptr; \
299 for(i=0; i<numoidrdlockedarray; i++) { \
300 objheader_t * transheader=oidrdlockedarray[i]; \
301 struct ArrayObject * transao=(struct ArrayObject *)&transheader[1]; \
302 objheader_t * mainheader=OID(transheader); \
303 struct ArrayObject * mainao=(struct ArrayObject *)&transheader[1]; \
304 int lowoffset=(transao->lowindex)>>INDEXSHIFT; \
305 int highoffset=(transao->highindex)>>INDEXSHIFT; \
307 for(j=lowoffset; j<=highoffset;j++) { \
308 unsigned int locallock;GETLOCKVAL(locallock,transao,j); \
309 if (locallock==STMCLEAN) { \
310 /* do read check */ \
311 unsigned int mainlock; GETLOCKVAL(mainlock, mainao, j); \
314 unsigned int localversion; \
315 unsigned int remoteversion; \
316 GETVERSIONVAL(localversion, transao, j); \
317 GETVERSIONVAL(remoteversion, mainao, j); \
318 if (localversion != remoteversion) { \
319 transAbortProcess(oidwrlocked, NUMWRTOTAL); \
321 return TRANS_ABORT; \
332 #define STMARRAYALLOC
333 #define STMARRAYASSIGN
336 /* ==================================================
338 * - goes through the transaction cache and
339 * - decides if a transaction should commit or abort
340 * ==================================================
344 int traverseCache(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
346 int traverseCache() {
348 /* Create info to keep track of objects that can be locked */
349 int numoidrdlocked=0;
350 int numoidwrlocked=0;
351 void * rdlocked[200];
353 struct fixedlist wrlocked;
359 STMWRAP(int rdage[200];int * oidrdage;int ObjSeqId;int objtypetraverse[TOTALNUMCLASSANDARRAY];);
360 struct garbagelist * oidwrlocked;
364 STMWRAP(for(i=0; i<TOTALNUMCLASSANDARRAY; i++) objtypetraverse[i] = 0;);
366 chashlistnode_t *ptr = c_table;
367 /* Represents number of bins in the chash table */
368 unsigned int size = c_size;
369 for(i = 0; i<size; i++) {
370 chashlistnode_t *curr = &ptr[i];
371 /* Inner loop to traverse the linked list of the cache lookupTable */
372 while(curr != NULL) {
373 //if the first bin in hash table is empty
374 if(curr->key == NULL)
376 objheader_t * cachedobj=curr->val;
377 objheader_t * headeraddr=&((objheader_t *) cachedobj)[-1]; //cached object
378 void * objptr=curr->key;
379 objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t)); //real object
380 unsigned int version = headeraddr->version;
384 if(STATUS(headeraddr) & DIRTY) {
385 /* Read from the main heap and compare versions */
386 if(write_trylock(&header->lock)) { //can aquire write lock
387 if (version == header->version) { /* versions match */
388 /* Keep track of objects locked */
389 dirwrlocked[numoidwrlocked++] = objptr;
391 dirwrlocked[numoidwrlocked++] = objptr;
392 transAbortProcess(oidwrlocked, numoidwrlocked);
396 return TRANS_SOFT_ABORT;
401 if(version == header->version) {
405 transAbortProcess(oidwrlocked, numoidwrlocked);
409 return TRANS_SOFT_ABORT;
415 STMWRAP(oidrdage[numoidrdlocked]=headeraddr->accessCount;);
416 oidrdversion[numoidrdlocked]=version;
417 oidrdlocked[numoidrdlocked++]=header;
424 //acquire access set locks
425 unsigned int numoidwrtotal=numoidwrlocked;
427 chashlistnode_t *dc_curr = dc_c_list;
428 /* Inner loop to traverse the linked list of the cache lookupTable */
429 while(likely(dc_curr != NULL)) {
430 //if the first bin in hash table is empty
431 objheader_t * headeraddr=&((objheader_t *) dc_curr->val)[-1];
432 void *objptr=dc_curr->key;
433 objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t));
434 if(write_trylock(&header->lock)) { //can aquire write lock
435 dirwrlocked[numoidwrtotal++] = objptr;
437 //maybe we already have lock
438 void * key=dc_curr->key;
439 chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
442 if(node->key == key) {
443 objheader_t * headeraddr=&((objheader_t *) node->val)[-1];
444 if(STATUS(headeraddr) & DIRTY) {
450 } while(node != NULL);
452 //have to abort to avoid deadlock
453 transAbortProcess(oidwrlocked, numoidwrtotal);
457 return TRANS_SOFT_ABORT;
462 dc_curr = dc_curr->lnext;
466 //THIS IS THE SERIALIZATION END POINT (START POINT IS END OF EXECUTION)*****
470 for(i=0; i<numoidrdlocked; i++) {
471 /* Read from the main heap and compare versions */
472 objheader_t *header=oidrdlocked[i];
473 unsigned int version=oidrdversion[i];
474 if(header->lock>0) { //not write locked
476 if(version != header->version) { /* versions do not match */
477 transAbortProcess(oidwrlocked, NUMWRTOTAL);
483 } else if (dc_t_chashSearch(((char *)header)+sizeof(objheader_t))!=NULL) {
484 //couldn't get lock because we already have it
485 //check if it is the right version number
486 if (version!=header->version) {
487 transAbortProcess(oidwrlocked, numoidwrtotal);
493 } else { /* cannot aquire lock */
494 //do increment as we didn't get lock
495 if(version == header->version) {
498 transAbortProcess(oidwrlocked, NUMWRTOTAL);
502 return TRANS_SOFT_ABORT;
509 //need to validate auxilary readset
510 rdchashlistnode_t *rd_curr = rd_c_list;
511 /* Inner loop to traverse the linked list of the cache lookupTable */
512 while(likely(rd_curr != NULL)) {
513 //if the first bin in hash table is empty
514 unsigned int version=rd_curr->version;
515 objheader_t *header=(objheader_t *)(((char *)rd_curr->key)-sizeof(objheader_t));
516 if(header->lock>0) { //object is not locked
517 if (version!=header->version) {
519 transAbortProcess(oidwrlocked, NUMWRTOTAL);
520 STMWRAP((typesCausingAbort[TYPE(header)])++;);
523 return TRANS_SOFT_ABORT;
528 //maybe we already have lock
529 if (version==header->version) {
530 void * key=rd_curr->key;
532 //check to see if it is in the delaycomp table
534 chashlistnode_t *node = &dc_c_table[(((unsigned INTPTR)key) & dc_c_mask)>>4];
539 } while(node != NULL);
544 chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
546 if(node->key == key) {
547 objheader_t * headeraddr=&((objheader_t *) node->val)[-1];
548 if(STATUS(headeraddr) & DIRTY) {
553 } while(node != NULL);
556 //have to abort to avoid deadlock
557 transAbortProcess(oidwrlocked, NUMWRTOTAL);
558 STMWRAP((typesCausingAbort[TYPE(header)])++;);
561 return TRANS_SOFT_ABORT;
566 rd_curr = rd_curr->lnext;
570 /* Decide the final response */
572 transCommitProcess(oidwrlocked, numoidwrlocked, numoidwrtotal, commitmethod, primitives, locals, params);
574 transCommitProcess(oidwrlocked, numoidwrlocked);
580 /* ==================================================
582 * - goes through the transaction cache and
583 * - decides if a transaction should commit or abort
584 * ==================================================
588 int alttraverseCache(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
590 int alttraverseCache() {
592 /* Create info to keep track of objects that can be locked */
593 int numoidrdlocked=0;
594 int numoidwrlocked=0;
595 void * rdlocked[200];
597 struct fixedlist wrlocked;
602 STMWRAP(int rdage[200];int * oidrdage;int ObjSeqId;int objtypetraverse[TOTALNUMCLASSANDARRAY];);
604 struct garbagelist * oidwrlocked;
608 STMWRAP(for(i=0; i<TOTALNUMCLASSANDARRAY; i++) objtypetraverse[i] = 0;);
610 chashlistnode_t *curr = c_list;
611 /* Inner loop to traverse the linked list of the cache lookupTable */
612 while(likely(curr != NULL)) {
613 //if the first bin in hash table is empty
614 objheader_t * cachedobj=curr->val;
615 objheader_t * headeraddr=&((objheader_t *) cachedobj)[-1];
616 void *objptr=curr->key;
617 objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t));
618 unsigned int version = headeraddr->version;
622 if(STATUS(headeraddr) & DIRTY) {
623 /* Read from the main heap and compare versions */
624 if(likely(write_trylock(&header->lock))) { //can aquire write lock
625 if (likely(version == header->version)) { /* versions match */
626 /* Keep track of objects locked */
627 dirwrlocked[numoidwrlocked++] = objptr;
629 dirwrlocked[numoidwrlocked++] = objptr;
630 transAbortProcess(oidwrlocked, numoidwrlocked);
635 } else { /* cannot aquire lock */
636 if(version == header->version) {
640 transAbortProcess(oidwrlocked, numoidwrlocked);
644 return TRANS_SOFT_ABORT;
649 /* Read from the main heap and compare versions */
650 oidrdversion[numoidrdlocked]=version;
651 oidrdlocked[numoidrdlocked++] = header;
657 //acquire other locks
658 unsigned int numoidwrtotal=numoidwrlocked;
659 chashlistnode_t *dc_curr = dc_c_list;
660 /* Inner loop to traverse the linked list of the cache lookupTable */
661 while(likely(dc_curr != NULL)) {
662 //if the first bin in hash table is empty
663 objheader_t * headeraddr=&((objheader_t *) dc_curr->val)[-1];
664 void *objptr=dc_curr->key;
665 objheader_t *header=(objheader_t *)(((char *)objptr)-sizeof(objheader_t));
666 if(write_trylock(&header->lock)) { //can aquire write lock
667 dirwrlocked[numoidwrtotal++] = objptr;
669 //maybe we already have lock
670 void * key=dc_curr->key;
671 chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
674 if(node->key == key) {
675 objheader_t * headeraddr=&((objheader_t *) node->val)[-1];
676 if(STATUS(headeraddr) & DIRTY) {
681 } while(node != NULL);
683 //have to abort to avoid deadlock
684 transAbortProcess(oidwrlocked, numoidwrtotal);
688 return TRANS_SOFT_ABORT;
693 dc_curr = dc_curr->lnext;
697 //THIS IS THE SERIALIZATION END POINT (START POINT IS END OF EXECUTION)*****
701 for(i=0; i<numoidrdlocked; i++) {
702 objheader_t * header=oidrdlocked[i];
703 unsigned int version=oidrdversion[i];
706 if(version != header->version) {
707 transAbortProcess(oidwrlocked, NUMWRTOTAL);
713 } else if (dc_t_chashSearch(((char *)header)+sizeof(objheader_t))!=NULL) {
714 //couldn't get lock because we already have it
715 //check if it is the right version number
716 if (version!=header->version) {
717 transAbortProcess(oidwrlocked, numoidwrtotal);
723 } else { /* cannot aquire lock */
724 if(version == header->version) {
727 transAbortProcess(oidwrlocked, NUMWRTOTAL);
731 return TRANS_SOFT_ABORT;
738 //need to validate auxilary readset
739 rdchashlistnode_t *rd_curr = rd_c_list;
740 /* Inner loop to traverse the linked list of the cache lookupTable */
741 while(likely(rd_curr != NULL)) {
742 //if the first bin in hash table is empty
743 int version=rd_curr->version;
744 objheader_t *header=(objheader_t *)(((char *)rd_curr->key)-sizeof(objheader_t));
745 if(header->lock>0) { //object is not locked
746 if (version!=header->version) {
748 transAbortProcess(oidwrlocked, NUMWRTOTAL);
749 STMWRAP((typesCausingAbort[TYPE(header)])++;);
752 return TRANS_SOFT_ABORT;
757 if (version==header->version) {
758 void * key=rd_curr->key;
760 //check to see if it is in the delaycomp table
762 chashlistnode_t *node = &dc_c_table[(((unsigned INTPTR)key) & dc_c_mask)>>4];
767 } while(node != NULL);
772 chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
774 if(node->key == key) {
775 objheader_t * headeraddr=&((objheader_t *) node->val)[-1];
776 if(STATUS(headeraddr) & DIRTY) {
781 } while(node != NULL);
784 //have to abort to avoid deadlock
785 transAbortProcess(oidwrlocked, NUMWRTOTAL);
786 STMWRAP((typesCausingAbort[TYPE(header)])++;);
789 return TRANS_SOFT_ABORT;
794 rd_curr = rd_curr->lnext;
798 /* Decide the final response */
800 transCommitProcess(oidwrlocked, numoidwrlocked, numoidwrtotal, commitmethod, primitives, locals, params);
802 transCommitProcess(oidwrlocked, numoidwrlocked);
808 /* ==================================
811 * =================================
816 void transAbortProcess(struct garbagelist *oidwrlocked, int numoidwrlocked) {
819 /* Release read locks */
820 void ** dirwrlocked=oidwrlocked->array;
821 /* Release write locks */
822 for(i=numoidwrlocked-1; i>=0; i--) {
823 /* Read from the main heap */
824 struct ___Object___ * dst=dirwrlocked[i];
825 header = &((objheader_t *)dst)[-1];
828 if (type>=NUMCLASSES) {
829 //have array, do unlocking of bins
830 struct ArrayObject *src=(struct ArrayObject *)t_chashSearch(dst);
831 int lowoffset=(src->lowindex)>>INDEXSHIFT;
832 int highoffset=(src->highindex)>>INDEXSHIFT;
834 int addwrobject=0, addrdobject=0;
835 for(j=lowoffset; j<=highoffset;j++) {
837 GETLOCKVAL(status, src, j);
838 if (status==STMDIRTY) {
839 unsigned int *lockptr;
840 GETLOCKPTR(lockptr, ((struct ArrayObject *)dst), j);
841 write_unlock(lockptr);
846 write_unlock(&header->lock);
849 /* clear trec and then release objects locked */
850 struct objlist *ptr=lockedobjs;
853 for(i=max-1; i>=0; i--) {
854 header = (objheader_t *)ptr->objs[i];
856 pthread_mutex_unlock(header->objlock);
863 /* ==================================
866 * =================================
869 void transCommitProcess(struct garbagelist * oidwrlocked, int numoidwrlocked, int numoidwrtotal, void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
871 void transCommitProcess(struct garbagelist * oidwrlocked, int numoidwrlocked) {
876 struct objlist *ptr=newobjs;
877 void **dirwrlocked=oidwrlocked->array;
880 for(i=0; i<max; i++) {
882 ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
887 /* Copy from transaction cache -> main object store */
888 for (i = numoidwrlocked-1; i >=0; i--) {
889 /* Read from the main heap */
890 header = &((objheader_t *)dirwrlocked[i])[-1];
892 GETSIZE(tmpsize, header);
893 struct ___Object___ *dst=(struct ___Object___*)dirwrlocked[i];
894 struct ___Object___ *src=t_chashSearch(dst);
895 dst->___cachedCode___=src->___cachedCode___;
896 dst->___cachedHash___=src->___cachedHash___;
899 if (type>=NUMCLASSES) {
900 //have array, do copying of bins
901 int lowoffset=(((struct ArrayObject *)src)->lowindex)>>INDEXSHIFT;
902 int highoffset=(((struct ArrayObject *)src)->highindex)>>INDEXSHIFT;
904 int addwrobject=0, addrdobject=0;
905 int elementsize=classsize[type];
906 int baseoffset=(lowoffset<<INDEXSHIFT)+sizeof(int)+((int)&(((struct ArrayObject *)0)->___length___));
907 char *dstptr=((char *)dst)+baseoffset;
908 char *srcptr=((char *)src)+baseoffset;
909 for(j=lowoffset; j<=highoffset;j++, srcptr+=INDEXLENGTH,dstptr+=INDEXLENGTH) {
911 GETLOCKVAL(status, ((struct ArrayObject *)src), j);
912 if (status==STMDIRTY) {
913 A_memcpy(dstptr, srcptr, INDEXLENGTH);
918 A_memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
923 // call commit method
927 //splice oidwrlocked in
928 oidwrlocked->size=numoidwrtotal;
929 oidwrlocked->next=params;
930 ((struct garbagelist *)locals)->next=oidwrlocked;
931 commitmethod(params, locals, primitives);
932 ((struct garbagelist *)locals)->next=params;
935 /* Release write locks */
936 for(i=NUMWRTOTAL-1; i>=0; i--) {
937 struct ___Object___ * dst=dirwrlocked[i];
938 header = &((objheader_t *)dst)[-1];
941 if (type>=NUMCLASSES) {
942 //have array, do unlocking of bins
943 struct ArrayObject *src=(struct ArrayObject *)t_chashSearch(dst);
944 int lowoffset=(src->lowindex)>>INDEXSHIFT;
945 int highoffset=(src->highindex)>>INDEXSHIFT;
947 int addwrobject=0, addrdobject=0;
948 for(j=lowoffset; j<=highoffset;j++) {
950 GETLOCKVAL(status, src, j);
951 if (status==STMDIRTY) {
952 unsigned int *intptr;
953 GETVERSIONPTR(intptr, ((struct ArrayObject *)dst), j);
955 GETLOCKPTR(intptr, ((struct ArrayObject *)dst), j);
956 write_unlock(intptr);
963 write_unlock(&header->lock);
968 /* clear trec and then release objects locked */
972 for(i=max-1; i>=0; i--) {
973 header = (objheader_t *)ptr->objs[i];
975 pthread_mutex_unlock(header->objlock);