Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.the_qa_company.qendpoint.core.hdt.impl.diskimport.TripleCompressionResult;
import com.the_qa_company.qendpoint.core.header.HeaderPrivate;
import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcher;
import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcherUnordered;
import com.the_qa_company.qendpoint.core.listener.MultiThreadListener;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
Expand Down Expand Up @@ -187,7 +188,7 @@ public CompressTripleMapper compressDictionary(Iterator<TripleString> iterator)
"Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, true) + "B with "
+ ways + "ways and " + workers + " worker(s)");

AsyncIteratorFetcher<TripleString> source = new AsyncIteratorFetcher<>(iterator);
AsyncIteratorFetcherUnordered<TripleString> source = new AsyncIteratorFetcherUnordered<>(iterator);

profiler.pushSection("section compression");
CompressionResult compressionResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class SectionCompressor implements KWayMerger.KWayMergerImpl<TripleString
private final boolean debugSleepKwayDict;
private final boolean quads;
private final CompressionType compressionType;
private final long start = System.currentTimeMillis();

public SectionCompressor(CloseSuppressPath baseFileName, AsyncIteratorFetcher<TripleString> source,
MultiThreadListener listener, int bufferSize, long chunkSize, int k, boolean debugSleepKwayDict,
Expand Down Expand Up @@ -253,7 +254,10 @@ public void createChunk(SizeFetcher<TripleString> fetcher, CloseSuppressPath out
}

if (tripleID % 100_000 == 0) {
listener.notifyProgress(10, "reading triples " + tripleID);
// use start to measure how many triples are read per second
int triplesPerSecond = (int) (tripleID / ((System.currentTimeMillis() - start) / 1000.0));

listener.notifyProgress(10, "reading triples " + tripleID + " triples per second: " + triplesPerSecond);
}
// too much ram allowed?
if (subjects.size() == Integer.MAX_VALUE - 6) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.the_qa_company.qendpoint.core.iterator.utils;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;

/**
* Synchronise an iterator
*
* @param <E> iterator type
* @author Håvard M. Ottestad
* @author Antoine Willerval
*/
public class AsyncIteratorFetcherUnordered<E> extends AsyncIteratorFetcher<E> {

private static final int CORES = Runtime.getRuntime().availableProcessors();

public static final int BUFFER = 1024 * 32;
private final Iterator<E> iterator;
private boolean end;
volatile Queue<E>[] queue = new Queue[CORES * 2];

{
for (int i = 0; i < queue.length; i++) {
queue[i] = new ArrayDeque<>(BUFFER);
}
}

public AsyncIteratorFetcherUnordered(Iterator<E> iterator) {
super(iterator);
this.iterator = iterator;
}

/**
* @return an element from the iterator, this method is thread safe
*/
@Override
public E get() {

int index = (int) (Thread.currentThread().getId() % queue.length);

Queue<E> es = queue[index];
if (es == null) {
for (Queue<E> eQueue : queue) {
if (eQueue != null) {
synchronized (eQueue) {
E poll = eQueue.poll();

if (poll != null) {
return poll;
}
}
}
}
}

if (es != null) {
// With this approach there is some risk that a queue is filled but
// never emptied. Maybe we should look for another queue to read
// from
// before filling our own queue?
synchronized (es) {
E poll = es.poll();

if (poll != null) {
return poll;
}

synchronized (this) {
es = queue[index];
if (es != null) {

poll = es.poll();
if (poll == null) {
if (iterator.hasNext()) {
poll = iterator.next();
for (int i = 0; i < BUFFER && iterator.hasNext(); i++) {
es.add(iterator.next());
}
}

}

if (poll == null) {
queue[index] = null;
} else {
return poll;
}
}
}
}
}

for (Queue<E> eQueue : queue) {
if (eQueue != null) {

synchronized (eQueue) {
synchronized (this) {
E poll = eQueue.poll();

if (poll != null) {
return poll;
}
}
}
}
}

synchronized (this) {
if (iterator.hasNext()) {
E poll = iterator.next();
return poll;
}
}

end = true;
return null;

}

/**
* @return is the end
*/
public boolean isEnd() {
return end;
}
}
Loading