From 57d2483d909b02f5f8e4c46daff51cecb7401ab9 Mon Sep 17 00:00:00 2001 From: "francois.ancien" <francois.ancien@uni.lu> Date: Tue, 23 Apr 2024 10:26:24 +0200 Subject: [PATCH] Adding scripts to aggregate data and edc-client references --- .../aggregate-central.cwl.yml | 7 +-- .../aggregate-pulldata.cwl.yml | 11 ++-- .../resources/aggregate-central.Dockerfile | 4 ++ workflows/resources/aggregate-central.py | 27 ++++++++-- workflows/resources/aggregate-central.sh | 2 + .../resources/aggregate-remote.Dockerfile | 2 +- workflows/resources/aggregate-remote.py | 52 ++++++++++++++----- workflows/resources/iderha-edc-input.json | 3 +- 8 files changed, 83 insertions(+), 25 deletions(-) create mode 100644 workflows/resources/aggregate-central.sh diff --git a/workflows/iderha-aggregate/aggregate-central.cwl.yml b/workflows/iderha-aggregate/aggregate-central.cwl.yml index 6d5d4c2..5f8687f 100644 --- a/workflows/iderha-aggregate/aggregate-central.cwl.yml +++ b/workflows/iderha-aggregate/aggregate-central.cwl.yml @@ -28,8 +28,9 @@ inputs: outputs: - id: final - type: stdout + type: File doc: "The aggregated values from all partial data" + outputBinding: + glob: avg-ages.json -stdout: stdout.txt -baseCommand: ["python", "aggregate-central.py"] +baseCommand: ["aggregate_central"] diff --git a/workflows/iderha-aggregate/aggregate-pulldata.cwl.yml b/workflows/iderha-aggregate/aggregate-pulldata.cwl.yml index 97ec42d..85e6e07 100644 --- a/workflows/iderha-aggregate/aggregate-pulldata.cwl.yml +++ b/workflows/iderha-aggregate/aggregate-pulldata.cwl.yml @@ -7,7 +7,7 @@ doc: "Task to pull data from EDC" hints: - class: DockerRequirement - dockerPull: gitlab.lcsb.uni.lu:4567/luca.bolzani/iderha-test-deployment/test-aggregate-remote + dockerPull: gitlab.lcsb.uni.lu:4567/luca.bolzani/iderha-test-deployment/edc-client inputs: - id: input @@ -16,8 +16,9 @@ inputs: position: 1 outputs: - - id: datalink - type: stdout + datalink: + type: File + outputBinding: + glob: aggregated-ages.json -stdout: stdout.txt -baseCommand: ["aggregate_remote"] +baseCommand: ["/app/edc_client.sh"] diff --git a/workflows/resources/aggregate-central.Dockerfile b/workflows/resources/aggregate-central.Dockerfile index d98826e..b7bbd28 100644 --- a/workflows/resources/aggregate-central.Dockerfile +++ b/workflows/resources/aggregate-central.Dockerfile @@ -1,7 +1,11 @@ FROM repomanager.lcsb.uni.lu:9999/python:3.9 LABEL authors="francois.ancien" +WORKDIR / COPY aggregate-central.py /aggregate-central.py +COPY aggregate-central.sh /usr/local/bin/aggregate_central + +RUN chmod +x /usr/local/bin/aggregate_central ENTRYPOINT ["/usr/bin/bash"] diff --git a/workflows/resources/aggregate-central.py b/workflows/resources/aggregate-central.py index d0e046a..b615821 100644 --- a/workflows/resources/aggregate-central.py +++ b/workflows/resources/aggregate-central.py @@ -1,8 +1,29 @@ #!/usr/bin/env python3.9 -def main(): - print("Hello world! From central node!") +import argparse +import json +import sys + +def main(args=None): + if args is None: + args = sys.argv[1:] + + parser = argparse.ArgumentParser() + parser.add_argument("infiles", metavar="Input", type=str, nargs="+", help="PATH to an input file") + + args = parser.parse_args(args) + + total_sum = 0.0 + total_count = 0.0 + for file_path in args.infiles: + with open(file_path, "r") as f: + data = json.load(f) + total_sum += data["sum"] + total_count += data["count"] + + with open("avg-ages.json", "w") as f: + json.dump({"avg": total_sum / total_count}, f) if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/workflows/resources/aggregate-central.sh b/workflows/resources/aggregate-central.sh new file mode 100644 index 0000000..2bb7938 --- /dev/null +++ b/workflows/resources/aggregate-central.sh @@ -0,0 +1,2 @@ +#!/bin/bash +python /aggregate-central.py $1 \ No newline at end of file diff --git a/workflows/resources/aggregate-remote.Dockerfile b/workflows/resources/aggregate-remote.Dockerfile index 27113de..09a0df5 100644 --- a/workflows/resources/aggregate-remote.Dockerfile +++ b/workflows/resources/aggregate-remote.Dockerfile @@ -1,7 +1,7 @@ FROM repomanager.lcsb.uni.lu:9999/python:3.9 LABEL authors="francois.ancien" -RUN pip install requests --no-cache +RUN pip install --no-cache requests WORKDIR / COPY aggregate-remote.py /aggregate-remote.py COPY aggregate-remote.sh /usr/local/bin/aggregate_remote diff --git a/workflows/resources/aggregate-remote.py b/workflows/resources/aggregate-remote.py index 2cf88a7..b43abe3 100644 --- a/workflows/resources/aggregate-remote.py +++ b/workflows/resources/aggregate-remote.py @@ -1,7 +1,8 @@ #!/usr/bin/env python import argparse -import requests +import json +from ftplib import FTP import sys @@ -9,20 +10,47 @@ def main(args=None): if args is None: args = sys.argv[1:] parser = argparse.ArgumentParser() - parser.add_argument("-i", "--input", help="URL to input file") + parser.add_argument("-i", "--input", help="PATH to input file") parsed = parser.parse_args(args) - res = requests.get(parsed.input) - if not res.ok: - issue = res.text - raise requests.HTTPError(f"Error {res.status_code}: {issue}") - - try: - data = res.json() - print(data) - except requests.JSONDecodeError: - print(f"Issue with data in {res.text}. Not a valid json") + data = None + ftp_path = None + with open(parsed.input, "r") as f: + try: + data = json.load(f) + ftp_path = data["FTP_DATA_ADDRESS"] + except json.JSONDecodeError as e: + raise json.JSONDecodeError(f"Impossible to parse data in {parsed.input}. Not a valid json.") from e + + # Getting data from ftp source + ftp_host = "http://ftp-upload-service/api/ftp/upload" + ftp = FTP(ftp_host) + ftp.login(user="ftp_iderha_user", passwd="ftp_iderha_pass") + tmp_path = "tmp_data.json" + with open(tmp_path, "wb") as fh: + ftp.retrbinary(ftp_path, fh.write) + + # Extracting data from json + with open(tmp_path, "rb") as f: + try: + data = json.load(f) + except json.JSONDecodeError as e: + raise json.JSONDecodeError(f"Impossible to parse data in {tmp_path}. Not a valid json.") from e + + # Calculating avg username lengths + sum_usrname = 0 + for row in data: + try: + sum_usrname += len(row["username"]) + except (AttributeError, KeyError): + continue + + count_rows = len(data) + + # Saving + with open("aggregated-ages.json", "w") as f: + json.dump({"sum": sum_usrname, "count": count_rows}, f) if __name__ == "__main__": diff --git a/workflows/resources/iderha-edc-input.json b/workflows/resources/iderha-edc-input.json index 30919c4..20ab9b9 100644 --- a/workflows/resources/iderha-edc-input.json +++ b/workflows/resources/iderha-edc-input.json @@ -10,5 +10,6 @@ "EDC_PROVIDER_PROTOCOL_URL": "http://edc-provider:19194/protocol", "EDC_PROVIDER_PUBLIC_URL": "http://edc-provider:19291/public", "EDR_ENDPOINT_URL": "http://edc-consumer:29191/api/edr/query", - "DATA_DESTINATION_URL": "http://ftp-upload-service/api/ftp/upload" + "DATA_DESTINATION_URL": "http://ftp-upload-service/api/ftp/upload", + "FTP_DATA_ADDRESS": "/home/ftp_iderha_user/isst-data.json" } -- GitLab