2 * To change this template, choose Tools | Templates
3 * and open the template in the editor.
5 package TransactionalIO.core;
7 import TransactionalIO.exceptions.AbortedException;
8 import TransactionalIO.benchmarks.benchmark;
9 import TransactionalIO.benchmarks.customhandler;
10 import TransactionalIO.benchmarks.customhandler;
11 import TransactionalIO.interfaces.BlockAccessModesEnum;
12 import TransactionalIO.interfaces.ContentionManager;
13 import TransactionalIO.interfaces.TransactionStatu;
14 //import dstm2.file.managers.BaseManager;
15 import java.awt.event.ActionListener;
16 import java.beans.EventHandler;
17 import java.beans.PropertyChangeEvent;
18 import java.beans.PropertyChangeListener;
19 import java.beans.PropertyChangeSupport;
20 import java.io.FileDescriptor;
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Iterator;
27 import java.util.TreeMap;
28 import java.util.Vector;
29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock;
31 import java.util.concurrent.locks.ReentrantReadWriteLock;
32 import java.util.logging.Level;
33 import java.util.logging.Logger;
39 public class ExtendedTransaction implements TransactionStatu {
41 private native int nativepwrite(byte buff[], long offset, int size, FileDescriptor fd);
45 System.load("/home/navid/libkooni.so");
47 private boolean flag = true;
48 public TransactionStatu memorystate;
49 private PropertyChangeSupport changes = new PropertyChangeSupport(this);
52 public TreeMap msg = new TreeMap();
53 public int numberofwrites;
54 public int numberofreads;
58 ABORTED, ACTIVE, COMMITTED
60 private boolean writesmerged = true;
61 //private Vector<ReentrantLock> heldoffsetlocks;
62 private Vector heldoffsetlocks;
63 //private Vector<ReentrantLock> heldblocklocks;
64 private Vector heldblocklocks;
65 //private HashMap<INode, Vector<TransactionalFile>> AccessedFiles;
66 private HashMap AccessedFiles;
67 //private HashMap<INode, HashMap<Integer, BlockAccessModesEnum> > accessedBlocks;
68 private HashMap accessedBlocks;
69 //private HashMap<TransactionalFile, TransactionLocalFileAttributes> LocaltoGlobalMappings;
70 private HashMap GlobaltoLocalMappings;
71 public HashMap merge_for_writes_done;
72 private HashMap writeBuffer;
73 private ContentionManager contentionmanager;
74 private volatile Status status;
77 public ExtendedTransaction() {
79 // id = Integer.valueOf(Thread.currentThread().getName().substring(7));
80 heldblocklocks = new Vector();
81 heldoffsetlocks = new Vector();
82 AccessedFiles = new HashMap();
83 GlobaltoLocalMappings = new HashMap/*<TransactionalFile, TransactionLocalFileAttributes >*/();
84 writeBuffer = new HashMap();
85 status = Status.ACTIVE;
86 accessedBlocks = new HashMap();
87 merge_for_writes_done = new HashMap();
89 // setContentionmanager(new BaseManager());
90 // beginTransaction();
94 public ExtendedTransaction(TransactionStatu memorystate) {
97 this.memorystate = memorystate;
100 private int invokeNativepwrite(byte buff[], long offset, int size, RandomAccessFile file) {
102 return nativepwrite(buff, offset, buff.length, file.getFD());
103 } catch (IOException ex) {
105 Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
111 public void beginTransaction() {
112 this.addPropertyChangeListener(new customhandler(Status.ABORTED));
115 public void abort() {
116 synchronized (this) {
117 this.status = Status.ABORTED;
118 if (this.memorystate != null && !(this.memorystate).isAborted()) {
119 this.memorystate.abortThisSystem();
124 public Status getStatus() {
128 public boolean isActive() {
129 return this.getStatus() == Status.ACTIVE;
132 public boolean isAborted() {
133 return this.getStatus() == Status.ABORTED;
136 public ContentionManager getContentionmanager() {
137 return contentionmanager;
140 public void setContentionmanager(ContentionManager contentionmanager) {
141 this.contentionmanager = contentionmanager;
144 public HashMap getWriteBuffer() {
148 public HashMap getAccessedFiles() {
149 return AccessedFiles;
152 public boolean isWritesmerged() {
156 public void setWritesmerged(boolean writesmerged) {
157 this.writesmerged = writesmerged;
160 public HashMap getGlobaltoLocalMappings() {
161 return GlobaltoLocalMappings;
164 public HashMap getAccessedBlocks() {
165 return accessedBlocks;
168 public ContentionManager getBlockContentionManager() {
169 return ManagerRepository.getBlockcm();
172 public ContentionManager getOffsetContentionManager() {
173 return ManagerRepository.getOffsetcm();
176 public TreeMap getSortedFileAccessMap(HashMap hmap) {
177 /*TreeMap sortedMap = new TreeMap(hmap);
179 return new TreeMap(hmap);
182 public void setStatus(Status st) {
183 Status oldst = getStatus();
185 this.changes.firePropertyChange("status", oldst, st);
188 public void addFile(TransactionalFile tf, long offsetnumber/*, TransactionLocalFileAttributes tmp*/) {
190 TransactionLocalFileAttributes tmp = new TransactionLocalFileAttributes(offsetnumber/*, tf.getInodestate().commitedfilesize.get()*/);
193 if (AccessedFiles.containsKey(tf.getInode())) {
194 dummy = (Vector) AccessedFiles.get(tf.getInode());
196 dummy = new Vector();
197 AccessedFiles.put(tf.getInode(), dummy);
200 GlobaltoLocalMappings.put(tf, tmp);
201 merge_for_writes_done.put(tf.getInode(), Boolean.TRUE);
204 public boolean lockOffsets() { /// Locking offsets for File Descriptors
207 TreeMap hm = getSortedFileAccessMap(AccessedFiles);
208 Iterator iter = hm.keySet().iterator();
210 while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
211 INode key = (INode) iter.next();
212 Vector vec = (Vector) AccessedFiles.get(key);
213 Collections.sort(vec);
214 Iterator it = vec.iterator();
215 while (it.hasNext() && this.getStatus() == Status.ACTIVE) {
216 TransactionalFile value = (TransactionalFile) it.next();
217 value.offsetlock.lock();
218 heldoffsetlocks.add(value.offsetlock);
223 if (this.getStatus() != Status.ACTIVE) {
229 public boolean lockBlock(BlockDataStructure block, BlockAccessModesEnum mode/*, GlobalINodeState adapter, BlockAccessModesEnum mode, int expvalue, INode inode, TransactionLocalFileAttributes tf*/) {
232 if (mode == BlockAccessModesEnum.READ) {
233 lock = block.getLock().readLock();
236 lock = block.getLock().writeLock();
240 heldblocklocks.add(lock);
245 public void prepareCommit() {
246 if (this.status != Status.ACTIVE) {
247 throw new AbortedException();
250 if (!lockOffsets()) {
251 throw new AbortedException();
255 ///////////////////////////
258 Map hm = getWriteBuffer();
260 Iterator iter = hm.keySet().iterator();
261 WriteOperations value;
262 Vector vec = new Vector();
263 while (iter.hasNext() && (this.getStatus() == Status.ACTIVE) && ok) {
264 INode key = (INode) iter.next();
265 vec = (Vector) hm.get(key);
266 Collections.sort(vec);
267 Iterator it = vec.iterator();
268 while (it.hasNext()) {
270 value = (WriteOperations) it.next();
271 if (value.isUnknownoffset()) {
275 start = value.getRange().getStart() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber();
276 end = value.getRange().getEnd() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber();
277 if (value.getBelongingto().isUnknown_inital_offset_for_write()) {
278 value.getBelongingto().setLocaloffset(value.getBelongingto().getLocaloffset() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber());
279 value.getBelongingto().setUnknown_inital_offset_for_write(false);
282 int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(start);
283 int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(start, value.getRange().getEnd() - value.getRange().getStart());
286 if (this.getAccessedBlocks().get(key) != null) {
287 sset = (TreeMap) this.getAccessedBlocks().get(key);
289 sset = new TreeMap();
290 this.getAccessedBlocks().put(key, sset);
294 for (int i = startblock; i <= targetblock; i++) {
295 if (sset.containsKey(Integer.valueOf(i))) {
296 if (sset.get(Integer.valueOf(i)) != BlockAccessModesEnum.WRITE) {
297 sset.put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE);
300 sset.put(Integer.valueOf(i), BlockAccessModesEnum.WRITE);
304 value.getRange().setStart(start);
305 value.getRange().setEnd(end);
312 Iterator it = this.getAccessedBlocks().keySet().iterator();
313 BlockDataStructure[] blocks = new BlockDataStructure[100];
314 if (this.getStatus() == Status.ACTIVE)
315 while (it.hasNext() /*&& (this.getStatus() == Status.ACTIVE)*/) {
316 INode inode = (INode) it.next();
317 GlobalINodeState inodestate = TransactionalFileWrapperFactory.getTateransactionalFileINodeState(inode);
318 TreeMap vec2 = (TreeMap) this.getAccessedBlocks().get(inode);
319 Iterator iter2 = vec2.keySet().iterator();
321 while (iter2.hasNext() && this.getStatus() == Status.ACTIVE) {
322 Integer num = (Integer) iter2.next();
323 BlockDataStructure blockobj = inodestate.getBlockDataStructure(num);
324 this.lockBlock(blockobj, (BlockAccessModesEnum) vec2.get(num));
329 if (this.getStatus() != Status.ACTIVE) {
331 throw new AbortedException();
337 public void commitChanges() {
339 Map hm = getWriteBuffer();
340 Iterator iter = hm.keySet().iterator();
342 WriteOperations writeop;
344 while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
345 INode key = (INode) iter.next();
347 vec = (Vector) hm.get(key);
348 Collections.sort(vec);
350 while (it.hasNext()) {
352 writeop = (WriteOperations) it.next();
353 Byte[] data = new Byte[(int) (writeop.getRange().getEnd() - writeop.getRange().getStart())];
354 byte[] bytedata = new byte[(int) (writeop.getRange().getEnd() - writeop.getRange().getStart())];
355 data = (Byte[]) writeop.getData();
357 for (int i = 0; i < data.length; i++) {
358 bytedata[i] = data[i];
360 invokeNativepwrite(bytedata, writeop.getRange().getStart(), bytedata.length, writeop.getOwnertransactionalFile().file);
364 Iterator k = GlobaltoLocalMappings.keySet().iterator();
365 while (k.hasNext()) {
366 TransactionalFile trf = (TransactionalFile) (k.next());
367 trf.getCommitedoffset().setOffsetnumber(((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).getLocaloffset());
371 public void unlockAllLocks() {
372 Iterator it = heldblocklocks.iterator();
374 while (it.hasNext()) {
376 Lock lock = (Lock) it.next();
379 // heldblocklocks.clear();
381 it = heldoffsetlocks.iterator();
382 while (it.hasNext()) {
383 ReentrantLock lock = (ReentrantLock) it.next();
386 // heldoffsetlocks.clear();
389 public void abortAllReaders() {
390 TreeMap hm = getSortedFileAccessMap(AccessedFiles);
392 Iterator iter = hm.keySet().iterator();
393 TransactionalFile value;
394 while (iter.hasNext()) {
395 INode key = (INode) iter.next();
396 Vector vec = (Vector) AccessedFiles.get(key);
397 Iterator it = vec.iterator();
398 while (it.hasNext()) {
400 value = (TransactionalFile) it.next();
401 Iterator it2 = value.getCommitedoffset().getOffsetReaders().iterator(); // for visible readers strategy
403 while (it2.hasNext()) {
404 ExtendedTransaction tr = (ExtendedTransaction) it2.next();
409 value.getCommitedoffset().getOffsetReaders().clear();
413 if (accessedBlocks.get(key) != null) {
414 vec2 = (TreeMap) accessedBlocks.get(key);
416 vec2 = new TreeMap();
419 GlobalINodeState inodestate = TransactionalFileWrapperFactory.getTateransactionalFileINodeState(key);
420 Iterator it2 = vec2.keySet().iterator();
422 while (it2.hasNext()) {
424 Integer num = (Integer) it2.next();
425 if (vec2.get(num) != BlockAccessModesEnum.READ) {
426 BlockDataStructure blockobj = (BlockDataStructure) inodestate.getBlockDataStructure(num);
428 Iterator it4 = blockobj.getReaders().iterator(); // from here for visible readers strategy
430 while (it4.hasNext()) {
432 ExtendedTransaction tr = (ExtendedTransaction) it4.next();
437 blockobj.getReaders().clear();
445 public void addPropertyChangeListener(PropertyChangeListener listener) {
446 this.changes.addPropertyChangeListener("status", listener);
449 public void removePropertyChangeListener(PropertyChangeListener listener) {
450 this.changes.removePropertyChangeListener("status", listener);
453 public TransactionStatu getOtherSystem() {
457 public void setOtherSystem(TransactionStatu othersystem) {
458 memorystate = othersystem;
461 public Vector getHeldblocklocks() {
462 return heldblocklocks;
465 public void setHeldblocklocks(Vector heldblocklocks) {
466 this.heldblocklocks = heldblocklocks;
469 public Vector getHeldoffsetlocks() {
470 return heldoffsetlocks;
473 public void setHeldoffsetlocks(Vector heldoffsetlocks) {
474 this.heldoffsetlocks = heldoffsetlocks;
477 public void abortThisSystem() {
481 public boolean isCommitted() {
482 if (this.status == Status.COMMITTED) {
490 public boolean lockBlock(BlockDataStructure block, Adapter adapter, BlockAccessModesEnum mode, int expvalue) { // from here for visible readers strategy
491 while (this.getStatus() == Status.ACTIVE) {
492 if (lock.tryLock()) {
493 Thread.onAbortOnce(new Runnable() {
500 heldblocklocks.add(lock);
502 synchronized (adapter) {
503 block.setOwner(this);
504 // Iterator it = block.getReaders().iterator();
505 // while (it.hasNext())
507 // ExtendedTransaction tr = (ExtendedTransaction) it.next();
514 getBlockContentionManager().resolveConflict(this, block.getOwner());
519 public boolean lockBlock(BlockDataStructure block, Adapter adapter, BlockAccessModesEnum mode, int expvalue) { // versioning strat
520 while (this.getStatus() == Status.ACTIVE) {
521 if (lock.tryLock()) {
522 Thread.onAbortOnce(new Runnable() {
529 heldblocklocks.add(lock);
530 if (mode != BlockAccessModesEnum.WRITE) { egy
531 if (block.getVersion().get() != expvalue) {
536 synchronized (adapter) {
537 block.setOwner(this);
542 getContentionManager().resolveConflict(this, block.getOwner());
548 //expvalue = ((Integer) value.getBlockversions().get(it)).intValue(); //for versioning strategy
549 /*if (!(value.isValidatelocaloffset())) {
550 if (((BlockAccessModesEnum) (value.getAccesedblocks().get(blockno))) != BlockAccessModesEnum.WRITE) { //versioning strategy
552 /if (blockobj.getVersion().get() == expvalue) {
554 ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
565 ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
575 throw new AbortedException();