diff --git a/src/main/java/db/SlowCompletableFutureDb.java b/src/main/java/db/SlowCompletableFutureDb.java index 4d2b373..b64c412 100755 --- a/src/main/java/db/SlowCompletableFutureDb.java +++ b/src/main/java/db/SlowCompletableFutureDb.java @@ -7,7 +7,6 @@ import java.util.concurrent.*; public class SlowCompletableFutureDb implements DataStorage, Closeable { - private volatile Map values; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); diff --git a/src/main/java/part2/cache/CachingDataStorageImpl.java b/src/main/java/part2/cache/CachingDataStorageImpl.java index a2ae460..6bc8902 100755 --- a/src/main/java/part2/cache/CachingDataStorageImpl.java +++ b/src/main/java/part2/cache/CachingDataStorageImpl.java @@ -1,7 +1,6 @@ package part2.cache; import db.DataStorage; -import db.SlowCompletableFutureDb; import java.util.concurrent.*; @@ -32,12 +31,27 @@ public CachingDataStorageImpl(DataStorage db, int timeout, TimeUnit t @Override public OutdatableResult getOutdatable(String key) { - // TODO implement - // TODO use ScheduledExecutorService to remove outdated result from cache - see SlowCompletableFutureDb implementation - // TODO complete OutdatableResult::outdated after removing outdated result from cache - // TODO don't use obtrudeException on result - just don't - // TODO use remove(Object key, Object value) to remove target value - // TODO Start timeout after receiving result in CompletableFuture, not after receiving CompletableFuture itself - throw new UnsupportedOperationException(); + final OutdatableResult newResult + = new OutdatableResult<>(new CompletableFuture<>(), new CompletableFuture<>()); + + final OutdatableResult cashed = + cache.putIfAbsent(key, newResult); + if (cashed != null) { + return cashed; + } + + db.get(key).whenComplete((res, ex) -> { + if (ex != null) { + newResult.getResult().completeExceptionally(ex); + } else { + newResult.getResult().complete(res); + } + scheduledExecutorService.schedule(() -> { + cache.remove(key, newResult); + newResult.getOutdated().complete(null); + }, timeout, timeoutUnits); + }); + + return newResult; } } diff --git a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java index e9049a5..b452470 100755 --- a/src/main/java/part2/cache/TypedEmployeeCachedStorage.java +++ b/src/main/java/part2/cache/TypedEmployeeCachedStorage.java @@ -2,14 +2,24 @@ import data.typed.Employee; import data.typed.Employer; +import data.typed.JobHistoryEntry; import data.typed.Position; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.function.Function; + +import static java.util.stream.Collectors.toList; + public class TypedEmployeeCachedStorage implements CachingDataStorage { private final CachingDataStorage employeeStorage; private final CachingDataStorage positionStorage; private final CachingDataStorage employerStorage; + public TypedEmployeeCachedStorage(CachingDataStorage employeeStorage, CachingDataStorage positionStorage, CachingDataStorage employerStorage) { @@ -18,9 +28,69 @@ public TypedEmployeeCachedStorage(CachingDataStorage empl this.employerStorage = employerStorage; } + private OutdatableResult asyncToTyped(data.Employee e) { + + final List> jobHistoryFutures = + e.getJobHistory().stream() + .map(this::asyncToTyped) + .collect(toList()); + + final List outdatedList = e.getJobHistory().stream() + .map(this::getOutDated) + .collect(toList()); + + + return new OutdatableResult<>( + CompletableFuture.allOf(jobHistoryFutures.toArray(new CompletableFuture[0])) + .thenApplyAsync(x -> { + final List jobHistory = jobHistoryFutures.stream() + .map(this::getOrNull) + .collect(toList()); + return new data.typed.Employee(e.getPerson(), jobHistory); + }) + .thenApply(Function.identity()), + CompletableFuture.anyOf(outdatedList.toArray(new CompletableFuture[0])) + .thenApply(x -> null) + ); + } + + private CompletableFuture asyncToTyped(data.JobHistoryEntry j) { + return employerStorage.get(j.getEmployer()) + .thenCombine( + positionStorage.get(j.getPosition()), + (e, p) -> new JobHistoryEntry(p, e, j.getDuration())); + } + + private CompletableFuture getOutDated(data.JobHistoryEntry j) { + return CompletableFuture.anyOf(positionStorage.getOutdatable(j.getPosition()).getOutdated(), + employerStorage.getOutdatable(j.getEmployer()).getOutdated()); + } + + private T getOrNull(Future f) { + try { + return f.get(); + } catch (InterruptedException | ExecutionException e1) { + e1.printStackTrace(); + return null; + } + } + @Override public OutdatableResult getOutdatable(String key) { - // TODO note that you don't know timeouts for different storage. And timeouts can be different. - throw new UnsupportedOperationException(); + final OutdatableResult outdatable = employeeStorage.getOutdatable(key); + + final CompletableFuture> future = outdatable.getResult().thenApply(this::asyncToTyped); + final OutdatableResult result = new OutdatableResult<>(new CompletableFuture<>(), new CompletableFuture<>()); + future.whenComplete((res, ex) -> { + if (ex != null) { + result.getResult().completeExceptionally(ex); +// result.getOutdated().completeExceptionally(ex); + } else { + result.getResult().complete(getOrNull(res.getResult())); + outdatable.getOutdated().runAfterEither(res.getOutdated(), () -> result.getOutdated().complete(null)); + } + }); + return result; } } + diff --git a/src/test/java/part2/cache/CachingDataStorageImplTest.java b/src/test/java/part2/cache/CachingDataStorageImplTest.java index 041370d..d0b3173 100755 --- a/src/test/java/part2/cache/CachingDataStorageImplTest.java +++ b/src/test/java/part2/cache/CachingDataStorageImplTest.java @@ -5,10 +5,10 @@ import data.typed.Employer; import data.typed.Position; import db.SlowCompletableFutureDb; -import part2.cache.CachingDataStorage.OutdatableResult; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import part2.cache.CachingDataStorage.OutdatableResult; import java.io.IOException; import java.util.Arrays; @@ -83,5 +83,4 @@ public void expiration() throws InterruptedException, ExecutionException, Timeou assertEquals(person2, result3.getResult().get().getPerson()); } - } diff --git a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java index fecb3b4..a22762d 100755 --- a/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java +++ b/src/test/java/part2/cache/TypedEmployeeCachedStorageTest.java @@ -1,6 +1,8 @@ package part2.cache; import data.Employee; +import data.JobHistoryEntry; +import data.Person; import data.typed.Employer; import data.typed.Position; import db.SlowCompletableFutureDb; @@ -10,18 +12,22 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; import static java.util.stream.Collectors.toMap; +import static org.junit.Assert.assertEquals; public class TypedEmployeeCachedStorageTest { private static SlowCompletableFutureDb employeeDb; private static SlowCompletableFutureDb employerDb; private static SlowCompletableFutureDb positionDb; + @BeforeClass public static void defore() { final Map employerMap = @@ -49,7 +55,7 @@ public static void after() { } @Test - public void expiration() { + public void expiration() throws ExecutionException, InterruptedException { final CachingDataStorageImpl employeeCache = new CachingDataStorageImpl<>(employeeDb, 1, TimeUnit.SECONDS); @@ -59,9 +65,35 @@ public void expiration() { final CachingDataStorageImpl positionCache = new CachingDataStorageImpl<>(positionDb, 100, TimeUnit.MILLISECONDS); + Map employeeTmp = new HashMap<>(); + + final Person person1 = new Person("John", "Doe", 30); + employeeTmp.put("a", new Employee(person1, + Collections.singletonList(new JobHistoryEntry(1, Position.BA.name(), Employer.EPAM.name())))); + employeeDb.setValues(employeeTmp); + final TypedEmployeeCachedStorage typedCache = new TypedEmployeeCachedStorage(employeeCache, positionCache, employerCache); - // TODO check than cache gets outdated with the firs outdated inner cache + final CachingDataStorage.OutdatableResult aPerson = typedCache.getOutdatable("a"); + + assertEquals(aPerson.getResult().get().getPerson(), person1); + assertEquals(aPerson.getResult().get().getJobHistoryEntries(), + Collections.singletonList(new data.typed.JobHistoryEntry(Position.BA, Employer.EPAM, 1))); + + Thread.sleep(50); + + employeeTmp = new HashMap<>(); + final Person person2 = new Person("Dagni", "Taggart", 30); + employeeTmp.put("a", new Employee(person2, Collections.emptyList())); + employeeDb.setValues(employeeTmp); + + final CachingDataStorage.OutdatableResult aPerson2 = typedCache.getOutdatable("a"); + assertEquals(aPerson2.getResult().get().getPerson(), person1); + + Thread.sleep(100); + final CachingDataStorage.OutdatableResult aPerson3 = typedCache.getOutdatable("a"); + + assertEquals(aPerson3.getResult().get().getPerson(), person2); } }