Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@

All notable changes to this project will be documented in this file.

## [Unreleased]

### 🚀 Features

- *(health)* Return 503 error if spark context is stopped
- Add local development documentation (#21)

### 🚜 Refactor

- Mep BC 3.1 - add _key field in list table omop pg
- Resource pmsi changes (#16)

### 📚 Documentation

- Add missing mode documentation

## [2.10.0] - 2025-07-01

### 🚀 Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import org.apache.spark.sql.{DataFrame, SparkSession, functions => F}
import org.hl7.fhir.r4.model.ListResource.ListMode

/**
* @param pg pgTool obj
*/
* @param pg pgTool obj
*/
class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
private final val cohort_item_table_rw = AppConfig.get.pg.get.cohortConfig.cohortItemsTableName
private final val cohort_table_rw = AppConfig.get.pg.get.cohortConfig.cohortTableName
Expand All @@ -33,11 +33,12 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
} else {
("", "")
}
val key_col = baseCohortId.map(_.toString).getOrElse("-1")
val stmt =
s"""
|insert into ${cohort_table_rw}
|(hash, title, ${note_text_column_name},${indentifier_col} _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size)
|values (-1, ?, ?,${identifier_val} ?, ?, '$cohort_provider_name', 'Practitioner', '${mode.toCode}', '${CohortStatus.RUNNING}', ?, now(), ?)
|insert into $cohort_table_rw
|(hash, title, $note_text_column_name,$indentifier_col _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size, _key)
|values (-1, ?, ?,$identifier_val ?, ?, '$cohort_provider_name', 'Practitioner', '${mode.toCode}', '${CohortStatus.RUNNING}', ?, now(), ?, '$key_col')
|returning id
|""".stripMargin
val result = pg
Expand All @@ -47,7 +48,7 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
cohortDefinitionName,
cohortDefinitionSyntax,
ownerEntityId,
s"Practitioner/${ownerEntityId}",
s"Practitioner/$ownerEntityId",
resourceType,
size
)
Expand All @@ -58,8 +59,8 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
}

/**
* This loads both a cohort and its definition into postgres and solr
*/
* This loads both a cohort and its definition into postgres and solr
*/
override def updateCohort(cohortId: Long,
cohort: DataFrame,
sourcePopulation: SourcePopulation,
Expand Down Expand Up @@ -122,11 +123,11 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
private def readCohortDiffEntries(cohortId: Long): DataFrame = {
val stmt =
s"""
|select date,_itemreferenceid,deleted
|from ${cohort_item_table_rw}
|join ${cohort_table_rw} on ${cohort_table_rw}.id = ${cohort_item_table_rw}._listid
|where ${cohort_table_rw}.identifier___official__value = '$cohortId'
|""".stripMargin
|select date,_itemreferenceid,deleted
|from ${cohort_item_table_rw}
|join ${cohort_table_rw} on ${cohort_table_rw}.id = ${cohort_item_table_rw}._listid
|where ${cohort_table_rw}.identifier___official__value = '$cohortId'
|""".stripMargin
pg.sqlExecWithResult(stmt)
.select(col("date"), col("_itemreferenceid"), col("deleted"))
.orderBy(col("date").asc)
Expand All @@ -140,7 +141,7 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
for (sc_id <- sourcePopulation.cohortList.get) {
val (list_list_id, list_relationship_concept_id) =
(List(List(cohortDefinitionId, sc_id), List(sc_id, cohortDefinitionId)),
List(44818821, 44818823))
List(44818821, 44818823))
for ((list_id, relationship_concept_id) <- list_list_id zip list_relationship_concept_id) {
val stmt =
s"""
Expand All @@ -155,24 +156,24 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
}

/**
* This loads only a cohort definition into postgres.
*
* @param cohortDefinitionId id of the cohort
* @param count nb of patient of the cohort
*/
* This loads only a cohort definition into postgres.
*
* @param cohortDefinitionId id of the cohort
* @param count nb of patient of the cohort
*/
private def uploadCount(
cohortDefinitionId: Long,
count: Long
): Unit = {
cohortDefinitionId: Long,
count: Long
): Unit = {
setOmopCohortSize(cohortDefinitionId, count)
setOmopCohortStatus(cohortDefinitionId, CohortStatus.FINISHED)
setOmopCohortActive(cohortDefinitionId, status = true)
}

private def setOmopCohortSize(
cohortDefinitionId: Long,
count: Long
): Unit = {
cohortDefinitionId: Long,
count: Long
): Unit = {
val stmt =
s"""
|update ${cohort_table_rw}
Expand All @@ -183,9 +184,9 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
}

private def setOmopCohortStatus(
cohortDefinitionId: Long,
status: CohortStatus.Value
): Unit = {
cohortDefinitionId: Long,
status: CohortStatus.Value
): Unit = {
val stmt =
s"""
|update ${cohort_table_rw}
Expand All @@ -196,9 +197,9 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
}

private def setOmopCohortActive(
cohortDefinitionId: Long,
status: Boolean
): Unit = {
cohortDefinitionId: Long,
status: Boolean
): Unit = {
val mode = if (status) "working" else "snapshot"
val stmt =
s"""
Expand All @@ -220,22 +221,22 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
"cohort dataframe shall have _listid, _provider, _provider and item__reference"
)
pg.outputBulk(cohort_item_table_rw,
dfAddHash(df),
Some(4),
primaryKeys = Seq("_listid", "_itemreferenceid", "_provider"))
dfAddHash(df),
Some(4),
primaryKeys = Seq("_listid", "_itemreferenceid", "_provider"))
}

/**
* Adds a hash column based on several other columns
*
* @param df DataFrame
* @param columnsToExclude List[String] the columns not to be hashed
* @return DataFrame
*/
* Adds a hash column based on several other columns
*
* @param df DataFrame
* @param columnsToExclude List[String] the columns not to be hashed
* @return DataFrame
*/
private def dfAddHash(
df: DataFrame,
columnsToExclude: List[String] = Nil
): DataFrame = {
df: DataFrame,
columnsToExclude: List[String] = Nil
): DataFrame = {
df.withColumn(
"hash",
hash(
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/testCases/occurences/resource_2.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
patient;_ref.encounter.period.start;_ref.encounter.period.end;recordedDate
patient;_ref.encounter.period.start;_ref.encounter.period.end;onsetDateTime
22;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z
22;2016-03-07T12:04:00Z;2016-03-07T12:04:00Z;2016-03-07T12:04:00Z
23;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
patient;_ref.encounter.period.start;_ref.encounter.period.end;recordedDate
patient;_ref.encounter.period.start;_ref.encounter.period.end;onsetDateTime
22;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z
23;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z
24;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z;2016-03-06T12:04:00Z
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class PGCohortCreationTest
pgTools.sqlExecWithResult(
"""
|insert into list_cohort360
|(hash, title, note__text, _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size)
|values (-1, ?, ?, ?, ?, 'Cohort360', 'Practitioner', 'snapshot', 'retired', ?, now(), ?)
|(hash, title, note__text, _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size, _key)
|values (-1, ?, ?, ?, ?, 'Cohort360', 'Practitioner', 'snapshot', 'retired', ?, now(), ?, '-1')
|returning id
|""".stripMargin,
List("test", "test", "test", "Practitioner/test", "test", 1)
Expand All @@ -57,6 +57,31 @@ class PGCohortCreationTest
1)
}

test("testCreateCohortWithBaseCohort") {
val pgTools = mock[PGTool]
val pgCohortCreation = new PGCohortCreation(pgTools)
val expectedResult: Dataset[Row] = mock[Dataset[Row]]
when(expectedResult.collect()).thenReturn(Array(Row(1L)))
when(
pgTools.sqlExecWithResult(
"""
|insert into list_cohort360
|(hash, title, note__text, identifier, _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size, _key)
|values (-1, ?, ?, 42, ?, ?, 'Cohort360', 'Practitioner', 'snapshot', 'retired', ?, now(), ?, '42')
|returning id
|""".stripMargin,
List("test", "test", "test", "Practitioner/test", "test", 1)
)).thenReturn(expectedResult)
pgCohortCreation.createCohort("test",
Some("test"),
"test",
"test",
"test",
Some(42L),
ListMode.SNAPSHOT,
1)
}

test("testUpdateCohort") {
val pgTools = mock[PGTool]

Expand Down
Loading