diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java index 22cc46c2..64747b99 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java @@ -14,6 +14,7 @@ import com.the_qa_company.qendpoint.core.hdt.impl.diskimport.TripleCompressionResult; import com.the_qa_company.qendpoint.core.header.HeaderPrivate; import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcher; +import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcherUnordered; import com.the_qa_company.qendpoint.core.listener.MultiThreadListener; import com.the_qa_company.qendpoint.core.listener.ProgressListener; import com.the_qa_company.qendpoint.core.options.HDTOptions; @@ -187,7 +188,7 @@ public CompressTripleMapper compressDictionary(Iterator iterator) "Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, true) + "B with " + ways + "ways and " + workers + " worker(s)"); - AsyncIteratorFetcher source = new AsyncIteratorFetcher<>(iterator); + AsyncIteratorFetcherUnordered source = new AsyncIteratorFetcherUnordered<>(iterator); profiler.pushSection("section compression"); CompressionResult compressionResult; diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java index 78cc3d01..3ecec875 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java @@ -53,6 +53,7 @@ public class SectionCompressor implements KWayMerger.KWayMergerImpl source, MultiThreadListener listener, int bufferSize, long chunkSize, int k, boolean debugSleepKwayDict, @@ -253,7 +254,10 @@ public void createChunk(SizeFetcher fetcher, CloseSuppressPath out } if (tripleID % 100_000 == 0) { - listener.notifyProgress(10, "reading triples " + tripleID); + // use start to measure how many triples are read per second + int triplesPerSecond = (int) (tripleID / ((System.currentTimeMillis() - start) / 1000.0)); + + listener.notifyProgress(10, "reading triples " + tripleID + " triples per second: " + triplesPerSecond); } // too much ram allowed? if (subjects.size() == Integer.MAX_VALUE - 6) { diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java new file mode 100644 index 00000000..f9073713 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java @@ -0,0 +1,127 @@ +package com.the_qa_company.qendpoint.core.iterator.utils; + +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Queue; + +/** + * Synchronise an iterator + * + * @param iterator type + * @author Håvard M. Ottestad + * @author Antoine Willerval + */ +public class AsyncIteratorFetcherUnordered extends AsyncIteratorFetcher { + + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + public static final int BUFFER = 1024 * 32; + private final Iterator iterator; + private boolean end; + volatile Queue[] queue = new Queue[CORES * 2]; + + { + for (int i = 0; i < queue.length; i++) { + queue[i] = new ArrayDeque<>(BUFFER); + } + } + + public AsyncIteratorFetcherUnordered(Iterator iterator) { + super(iterator); + this.iterator = iterator; + } + + /** + * @return an element from the iterator, this method is thread safe + */ + @Override + public E get() { + + int index = (int) (Thread.currentThread().getId() % queue.length); + + Queue es = queue[index]; + if (es == null) { + for (Queue eQueue : queue) { + if (eQueue != null) { + synchronized (eQueue) { + E poll = eQueue.poll(); + + if (poll != null) { + return poll; + } + } + } + } + } + + if (es != null) { + // With this approach there is some risk that a queue is filled but + // never emptied. Maybe we should look for another queue to read + // from + // before filling our own queue? + synchronized (es) { + E poll = es.poll(); + + if (poll != null) { + return poll; + } + + synchronized (this) { + es = queue[index]; + if (es != null) { + + poll = es.poll(); + if (poll == null) { + if (iterator.hasNext()) { + poll = iterator.next(); + for (int i = 0; i < BUFFER && iterator.hasNext(); i++) { + es.add(iterator.next()); + } + } + + } + + if (poll == null) { + queue[index] = null; + } else { + return poll; + } + } + } + } + } + + for (Queue eQueue : queue) { + if (eQueue != null) { + + synchronized (eQueue) { + synchronized (this) { + E poll = eQueue.poll(); + + if (poll != null) { + return poll; + } + } + } + } + } + + synchronized (this) { + if (iterator.hasNext()) { + E poll = iterator.next(); + return poll; + } + } + + end = true; + return null; + + } + + /** + * @return is the end + */ + public boolean isEnd() { + return end; + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java new file mode 100644 index 00000000..72c91101 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java @@ -0,0 +1,400 @@ +package com.the_qa_company.qendpoint.core.iterator.utils; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +/** + * a utility class to create an iterator from the value returned by another + * Thread + * + * @param the iterator type + * @author Antoine Willerval + * @author Håvard M. Ottestad + */ + +public class PipedCopyIteratorUnordered extends PipedCopyIterator { + + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + /** + * RuntimeException generated by the PipedCopyIterator + * + * @author Antoine Willerval + */ + public static class PipedIteratorException extends RuntimeException { + public PipedIteratorException(String message, Throwable t) { + super(message, t); + } + } + + /** + * Callback for the + * {@link #createOfCallback(PipedCopyIteratorUnordered.PipeCallBack)} method + * + * @param the iterator type + * @author Antoine Willerval + */ + @FunctionalInterface + public interface PipeCallBack { + /** + * method called from the new thread to generate the new data, at the + * end of the callback, the pipe is closed with or without exception + * + * @param pipe the pipe to fill + * @throws Exception any exception returned by the generator + */ + void createPipe(PipedCopyIterator pipe) throws Exception; + } + + /** + * create a piped iterator from a callback runner, the call to the callback + * should be made in the callbackRunner + * + * @param callbackRunner the callback runner + * @param type of the iterator + * @return the iterator + */ + public static PipedCopyIterator createUnorderedOfCallback(PipeCallBack callbackRunner) { + PipedCopyIteratorUnordered pipe = new PipedCopyIteratorUnordered<>(); + + Thread thread = new Thread(() -> { + try { + callbackRunner.createPipe(pipe); + pipe.closePipe(); + } catch (Throwable e) { + pipe.closePipe(e); + } + }, "PipeIterator"); + thread.start(); + + // close the thread at end + pipe.attachThread(thread); + + return pipe; + } + + private interface QueueObject { + boolean end(); + + T get(); + } + + private class ElementQueueObject implements QueueObject { + private final T obj; + + private ElementQueueObject(T obj) { + this.obj = obj; + } + + @Override + public boolean end() { + return false; + } + + @Override + public T get() { + return obj; + } + } + + private class EndQueueObject implements QueueObject { + @Override + public boolean end() { + return true; + } + + @Override + public T get() { + throw new IllegalArgumentException(); + } + } + + private final ArrayBlockingQueue>[] queue = new ArrayBlockingQueue[CORES * 2]; + + { + for (int i = 0; i < queue.length; i++) { + queue[i] = new ArrayBlockingQueue<>(16 * 1024); + } + } + + private final AtomicBoolean[] queueEnd = new AtomicBoolean[queue.length]; + + { + for (int i = 0; i < queueEnd.length; i++) { + queueEnd[i] = new AtomicBoolean(false); + } + } + + private T next; + private boolean end; + private PipedIteratorException exception; + + private Thread thread; + + volatile ArrayBlockingQueue> focusQueue; + + static AtomicInteger index = new AtomicInteger(0); + + static ThreadLocal threadLocalIndexW = ThreadLocal.withInitial(() -> index.getAndIncrement()); + + @Override + public boolean hasNext() { + if (end) { + return false; + } + if (next != null) { + return true; + } + + QueueObject obj; + try { + obj = useFocusQueue(); + + if (obj == null) { + obj = useThreadBasedQueue(); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PipedIteratorException("Can't read pipe", e); + } + + if (obj == null || obj.end()) { + obj = checkAllQueues(obj); + } + + if (obj.end()) { + end = true; + if (exception != null) { + throw exception; + } + return false; + } + next = obj.get(); + return true; + } + + private QueueObject useThreadBasedQueue() throws InterruptedException { + QueueObject obj; + int i = Thread.currentThread().hashCode(); + obj = queue[i % queue.length].poll(); + if (obj == null) { + obj = iterateThroughAllQueues(obj); + } else if (obj.end()) { + setQueueEnd(queue[i % queue.length]); + } else if (focusQueue == null) { + focusQueue = queue[i % queue.length]; + } + return obj; + } + + private QueueObject checkAllQueues(QueueObject originalObj) { + QueueObject obj = null; + boolean done; + + do { + done = true; + for (int i = 0; i < queue.length; i++) { + if (queueEnd[i].get()) { + continue; + } + done = false; + ArrayBlockingQueue> queueObjects = queue[i]; + obj = queueObjects.poll(); + if (obj == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } else if (!obj.end()) { + return obj; + } else { + queueEnd[i].set(true); + } + } + } while (!done); + + if (obj == null) { + obj = originalObj; + } + + return obj; + } + + private QueueObject iterateThroughAllQueues(QueueObject obj) throws InterruptedException { + while (obj == null) { + for (int i = 0; i < queue.length; i++) { + if (queueEnd[i].get()) { + continue; + } + ArrayBlockingQueue> queueObjects = queue[i]; + obj = queueObjects.poll(); + if (obj != null) { + if (obj.end()) { + queueEnd[i].set(true); + } else if (focusQueue == null) { + focusQueue = queueObjects; + } + return obj; + } + } + Thread.sleep(10); + } + return obj; + } + + private QueueObject useFocusQueue() throws InterruptedException { + QueueObject obj; + var focusQueue = this.focusQueue; + if (focusQueue != null) { + QueueObject poll = focusQueue.poll(); + if (poll != null) { + obj = poll; + if (obj.end()) { + setQueueEnd(focusQueue); + } + } else { + obj = null; + this.focusQueue = null; + } + } else { + obj = null; + } + return obj; + } + + private void setQueueEnd(ArrayBlockingQueue> focusQueue) { + for (int i = 0; i < queue.length; i++) { + if (queue[i] == focusQueue) { + queueEnd[i].set(true); + break; + } + } + } + + @Override + public T next() { + if (!hasNext()) { + return null; + } + T next = this.next; + this.next = null; + return next; + } + + public void closePipe() { + closePipe(null); + } + + public void closePipe(Throwable e) { + if (e != null) { + // clear the queue to force the exception + for (ArrayBlockingQueue> queueObjects : queue) { + queueObjects.clear(); + } + if (e instanceof PipedIteratorException) { + this.exception = (PipedIteratorException) e; + } else { + this.exception = new PipedIteratorException("closing exception", e); + } + } + try { + for (ArrayBlockingQueue> queueObjects : queue) { + queueObjects.put(new EndQueueObject()); + } + } catch (InterruptedException ee) { + Thread.currentThread().interrupt(); + throw new PipedIteratorException("Can't close pipe", ee); + } + } + + /** + * map this iterator to another type + * + * @param mappingFunction the mapping function + * @param the future type + * @return mapped iterator + */ + public Iterator map(Function mappingFunction) { + return new MapIterator<>(this, mappingFunction); + } + + /** + * map this iterator to another type + * + * @param mappingFunction the mapping function + * @param the future type + * @return mapped iterator + */ + public Iterator mapWithId(MapIterator.MapWithIdFunction mappingFunction) { + return new MapIterator<>(this, mappingFunction); + } + + public void addElement(T node) { + ArrayBlockingQueue> currentQueue = queue[threadLocalIndexW.get() % queue.length]; + try { + + if (currentQueue == focusQueue) { + for (ArrayBlockingQueue> queueObjects : queue) { + if (queueObjects != focusQueue) { + currentQueue = queueObjects; + break; + } + + } + } + + boolean success = currentQueue.offer(new ElementQueueObject(node)); + if (!success) { + focusQueue = currentQueue; + while (!success) { + for (ArrayBlockingQueue> queueObjects : queue) { + success = queueObjects.offer(new ElementQueueObject(node), 1, TimeUnit.MILLISECONDS); + if (success) { + break; + } + } + } + } + + } catch (InterruptedException ee) { + Thread.currentThread().interrupt(); + throw new PipedIteratorException("Can't add element to pipe", ee); + } + } + + /** + * attach a thread to interrupt with this iterator + * + * @param thread the thread + */ + public void attachThread(Thread thread) { + Objects.requireNonNull(thread, "thread can't be null!"); + if (this.thread != null && this.thread != thread) { + throw new IllegalArgumentException("Thread already attached"); + } + this.thread = thread; + } + + /** + * Allow receiving again elements after an end node + */ + public void reset() { + this.end = false; + } + + @Override + public void close() throws IOException { + if (thread != null) { + thread.interrupt(); + } + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java index f03ca8f6..af808601 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java @@ -21,6 +21,7 @@ import com.the_qa_company.qendpoint.core.enums.RDFNotation; import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException; +import com.the_qa_company.qendpoint.core.iterator.utils.PipedCopyIteratorUnordered; import com.the_qa_company.qendpoint.core.options.HDTOptions; import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; import com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser; @@ -34,7 +35,6 @@ import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserZip; import com.the_qa_company.qendpoint.core.triples.TripleString; import com.the_qa_company.qendpoint.core.iterator.utils.PipedCopyIterator; -import com.the_qa_company.qendpoint.core.util.string.PrefixesStorage; import java.io.InputStream; @@ -97,8 +97,8 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptio */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, InputStream stream, String baseUri, boolean keepBNode, RDFNotation notation) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(stream, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(stream, baseUri, notation, + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } /** @@ -112,8 +112,8 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, String file, String baseUri, boolean keepBNode, RDFNotation notation) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(file, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(file, baseUri, notation, + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } /** @@ -127,8 +127,8 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, InputStream stream, String baseUri, boolean keepBNode, RDFNotation notation, HDTOptions spec) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(stream, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(stream, baseUri, notation, + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } /** @@ -143,8 +143,8 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, String file, String baseUri, boolean keepBNode, RDFNotation notation, HDTOptions spec) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(file, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(file, baseUri, notation, + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } }