openEHR - ETL - sharing is caring

We, DIPS AS, have been working on the need to make openEHR data available for data analytics. The structured clinical data must be aligned with other (not openEHR) data sources. Thus there is a need to do ETL transformations on openEHR data.

AQL is a great query language for openEHR data. We use AQL as a the key building block for our ETL process. To support the target system (relational database) we annotate the AQL.

Here I will share one example to illustrate how it works. The example use the traditional “Hello world!” for openEHR: Blood pressure.

You will se how we define attributes on the AQL paths and also how we define demographic metadata to be added at runtime.

--@etl.definition (schema_version=1, id=no.dips.arena.blood_pressure, version=1.0.0, namespace=at_blood_pressure, authors="bna@dips.no", copyright="DIPS AS", tags="blood_pressure, blood_pressure_v2")
--@etl.query (id=blood_pressure, target_name=blood_pressure, name="blood_pressure", description="The local measurement of arterial blood pressure which is a surrogate for arterial pressure in the systemic circulation.")
--@etl.query.metadata (type=episodeofcareid, target_name=episodeofcareid)
--@etl.query.metadata (type=dim_patientid, target_name=dim_patientid)
--@etl.query.metadata (type=dim_orgunitid, target_name=dim_orgunitid)
--@etl.query.metadata (type=dim_wardid, target_name=dim_wardid)
SELECT
   --@etl.query.column (target_name=origin, type=ISO8601_DATE_TIME, index=true, description="History Structural node.")
   o/data[at0001]/origin/value AS 'origin',
   --@etl.query.column (target_name=time, type=ISO8601_DATE_TIME, index=true, description="Standard, uspesifisert tidspunkt eller tidsintervall som kan defineres mer eksplisitt i et templat eller i en applikasjon.")
   pe/time/value AS 'time',
   --@etl.query.column (target_name=systolic, type=Double, description="Peak systemic arterial blood pressure - measured in systolic or contraction phase of the heart cycle.")
   pe/data[at0003]/items[at0004]/value/magnitude AS 'systolic',
   --@etl.query.column (target_name=systolic_units, type=String, max_length=255, description="Units for systolic")
   pe/data[at0003]/items[at0004]/value/units AS 'systolic units',
   --@etl.query.column (target_name=diastolic, type=Double, description="Minimum systemic arterial blood pressure - measured in the diastolic or relaxation phase of the heart cycle.")
   pe/data[at0003]/items[at0005]/value/magnitude AS 'diastolic',
   --@etl.query.column (target_name=diastolic_units, type=String, max_length=255, description="Units for diastolic")
   pe/data[at0003]/items[at0005]/value/units AS 'diastolic (units)',
   --@etl.query.column (target_name=mean_arterial_pressure, type=Double, description="The average arterial pressure that occurs over the entire course of the heart contraction and relaxation cycle.")
   pe/data[at0003]/items[at1006]/value/magnitude AS 'mean_arterial_pressure',
   --@etl.query.column (target_name=mean_arterial_pressure_units, type=String, max_length=255, description="Unis or the average arterial pressure")
   pe/data[at0003]/items[at1006]/value/units AS 'mean_arterial_pressure (units)',
   --@etl.query.column (target_name=position, type=String, max_length=255, description="Individets posisjon ved tidspunktet for målingen.")
   pe/state[at0007]/items[at0008]/value/value AS 'position',
   --@etl.query.column (target_name=position_code_string, type=String, max_length=255, description="Individets posisjon ved tidspunktet for målingen.")
   pe/state[at0007]/items[at0008]/value/defining_code/code_string AS 'position (code_string)',
   --@etl.query.column (target_name=position_terminology_id, type=String, max_length=255, description="Individets posisjon ved tidspunktet for målingen.")
   pe/state[at0007]/items[at0008]/value/defining_code/terminology_id/value AS 'position (terminology_id)'
FROM
   COMPOSITION c[openEHR-EHR-COMPOSITION.encounter.v1]
      CONTAINS OBSERVATION o[openEHR-EHR-OBSERVATION.blood_pressure.v2]
         CONTAINS POINT_EVENT pe
ORDER BY o/data/origin/value desc

6 Likes

@bna is this creating some kind of table schema and loading that table with the extracted data?

Yes. The annotated AQL is a primary building block for DIPS openEHR ETL service. One such aql will be installed/deployed on the system. Multiple tables will be created, additional fields added, grants given, etc.

In addition the new definition will be registered in the asynchronous messaging bus for both full reload and continously adding new data.

2 Likes

Very interesting, thanks for sharing.
How do you handle repeating clusters in an archetype? When transforming to a fixed table schema.

Great question. That’s one of the key features we’ve implemented. There are different strategies. One is the capability to use nested queries which creates join tables. This is great for repeating structures.

1 Like

A very simple example below.

The structure in the template is:

  • One problem/diagnosis defining the problem (i.e. Prostate cancer)
  • A repeated DV_CODED_TEXT for referral reasons/indications

We want to have one line in the table for each problem. Each of the reason should be added to a join table to make it possible to aggregate/count/etc. different indications.

Note how the CLUSTER defined as CL is selected with no annotations. This tells the engine to pass it down to the next sub-query.

--@etl.definition (schema_version=1, id=no.dips.arena.test.01, version=1.1.0, namespace=at_test_01, name="Test parent-child", description="An example on how to extract repeating structures in a separate join table", authors="bna@dips.no", copyright="DIPS AS", tags="example, join, parent_child")
--@etl.query (id=parent, target_name=problem, name="problem")
SELECT
   --@etl.query.column (target_name=cuid, type=String, max_length=255, description="CUID")
   c/uid/value as 'cuid',
   --@etl.query.column (target_name=problem, type=String, max_length=255, description="The problem defined with some terminology")
   e/data[at0001]/items[at0002]/value as 'Problem',
   cl
FROM
   COMPOSITION c[openEHR-EHR-COMPOSITION.encounter.v1]
      CONTAINS EVALUATION e[openEHR-EHR-EVALUATION.problem_diagnosis.v1]
         CONTAINS CLUSTER cl[openEHR-EHR-CLUSTER.problem_diagnosis_dips.v1]


--@etl.query (id=parent.child, target_name=reasons, name="Reasons")
SELECT
   --@etl.query.column (target_name=reason, type=String, max_length=255, description="0..1 reasons for each problem")
   cl/items[at0001]/value AS 'Reason'
FROM
   CLUSTER cl

Based on this you might to a simple join and count query like:

select
    p.problem as problem,
    count(r.reason) as reason_count
from
    at_test_01.problem p
    left outer join at_test_01.reasons r on r.sourceid = p.sourceid
group by
    p.problem
2 Likes