# -*- orgstrap-cypher: sha256; orgstrap-norm-func-name: orgstrap-norm-func--dprp-1-0; orgstrap-block-checksum: 3fad0d34c32d6bd05ae351bc99f940235289b795ecbef52657c2fde30ea246bc; -*- # [[orgstrap][jump to the orgstrap block for this file]] #+TITLE: sparcur developer guide #+AUTHOR: Tom Gillespie #+OPTIONS: num:nil ^:nil h:7 broken-links:t #+LATEX_HEADER: \usepackage[margin=0.8in]{geometry} #+STARTUP: showall #+link: wt-gh-pull https://github.com/SciCrunch/sparc-curation/blob/0661c3ebd164d0beaa6c57382a459ffd28d9dc92/ # [[file:developer-guide.pdf]] # [[file:developer-guide.html]] #+name: orgstrap-shebang #+begin_src bash :eval never :results none :exports none set -e "-C" "-e" "-e" { null=/dev/null;} > "${null:=/dev/null}" { args=;file=;MyInvocation=;__p=$(mktemp -d);touch ${__p}/=;chmod +x ${__p}/=;__op=$PATH;PATH=${__p}:$PATH;} > "${null}" $file = $MyInvocation.MyCommand.Source { file=$0;PATH=$__op;rm ${__p}/=;rmdir ${__p};} > "${null}" emacs -batch -no-site-file -eval "(let (vc-follow-symlinks) (defun orgstrap--confirm-eval (l _) (not (memq (intern l) '(elisp emacs-lisp)))) (let ((file (pop argv)) enable-local-variables) (find-file-literally file) (end-of-line) (when (eq (char-before) ?\^m) (let ((coding-system-for-read 'utf-8)) (revert-buffer nil t t)))) (let ((enable-local-eval t) (enable-local-variables :all) (major-mode 'org-mode)) (require 'org) (org-set-regexps-and-options) (hack-local-variables)))" "${file}" -- ${args} "${@}" exit <# powershell open #+end_src * Demos ** Remote only connection If you have [[(def_babf)][~sparcur.simple~​]] this is the simplest way to get a remote only connection to Pennsieve. #+name: demo-remote-only #+begin_src python :results drawer :epilogue "import pprint; return pprint.pformat(datasets)" from sparcur.config import auth from sparcur.simple.utils import backend_pennsieve project_id = auth.get('remote-organization') PennsieveRemote = backend_pennsieve(project_id) root = PennsieveRemote(project_id) datasets = list(root.children) #+end_src Otherwise you have to do what backend_pennsieve does for you #+begin_src python :results drawer :epilogue "import pprint; return pprint.pformat(datasets)" from sparcur.paths import PennsieveCache, Path from sparcur.config import auth from sparcur.backends import PennsieveRemote project_id = auth.get('remote-organization') PennsieveRemote = PennsieveRemote._new(Path, PennsieveCache) PennsieveRemote.init(project_id) root = PennsieveRemote(PennsieveRemote.root) datasets = list(root.children) #+end_src ** Validate a dataset :PROPERTIES: :header-args:python: :comments link :exports code :mkdirp yes :header-args:python+: :shebang "#!/usr/bin/env python3" :END: This code is an example of how to use sparcur to get structured metadata from a local dataset. You can run this example block and it will validate the [[file:../resources/DatasetTemplate/][DatasetTemplate]]. # XXX when testing this you need to set an alternate sandboxed home Example usage #+begin_src bash rsync -r path/to/some/curation/dataset test pushd test/dataset SCIGRAPH_API_KEY=$(python -c 'from pyontutils.config import auth; print(auth.get("scigraph-api-key"))') \ HOME=/tmp/test-home python -m sparcur.simple.validate #+end_src Example python usage #+begin_src python from sparcur.simple.validate import main as validate from pathlib import Path path = Path('../resources/DatasetTemplate') blob = validate(path) #+end_src Implementation of =sparcur.simple.validate=. # #+header: :epilogue "import pprint; return pprint.pformat(data.keys())" # #+header: :epilogue "import pprint; return pprint.pformat(data)" #+name: validate.py #+begin_src python :results drawer :exports both :cache yes :tangle ../sparcur/simple/validate.py :eval no-export import augpathlib as aug from sparcur import pipelines as pipes from sparcur.paths import PathL, CacheL from sparcur.utils import GetTimeNow def makeValidator(dataset_path, time_now=None): if time_now is None: time_now = GetTimeNow() class CacheX(CacheL): def __new__(cls, *args, **kwargs): return super().__new__(cls, *args, **kwargs) CacheX._bind_flavours() class PathX(PathL): """ Workaround absense of cache. """ _cache_class = CacheX def __new__(cls, *args, **kwargs): return super().__new__(cls, *args, **kwargs) # TODO likely will also need to rebind the cache class as well #@property #def dataset_relative_path(self, __drp=dataset_path): #return self.relative_path_from(self.__class__(__drp)) CacheX._local_class = PathX PathX._bind_flavours() # XXX monkey patch TODO sigh FIXME DatasetStructure calls Path directly inside #PathL.dataset_relative_path = Path.dataset_relative_path # must caste before anything else is done so that anchor and # datasets are known dataset_path = PathX(dataset_path) CacheX._dataset_dirs = [CacheX(dataset_path)] # FIXME this is very much not ideal because we don't actually want # the parent in this case CacheX._asserted_anchor = CacheX(dataset_path.parent) class context: path = dataset_path.resolve() id = path.id uri_api = path.as_uri() uri_human = path.as_uri() class lifters: id = context.id remote = 'local' folder_name = context.path.name uri_api = context.uri_api uri_human = context.uri_human timestamp_export_start = time_now.START_TIMESTAMP affiliations = lambda *args, **kwargs: None techniques = tuple() modality = None organ_term = None protocol_uris = tuple() award_manual = None return pipes.PipelineEnd(dataset_path, lifters, context) return pipes.SDSPipeline(dataset_path, lifters, context) # shouldn't need network def main(path=PathL.cwd(), time_now=None, export_local=False, export_parent_path=None, _entry_point=False, validate=False, **kwargs): # ('../resources/DatasetTemplate') pipeline = makeValidator(path, time_now=time_now) data = pipeline.data if _entry_point: from sparcur.simple.export import export_blob export_blob_path = export_blob( data, 'curation-export.json', time_now=time_now, export_parent_path=export_parent_path if export_parent_path is not None else path, ,**kwargs) return export_blob_path else: return data if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main) #+end_src ** Load Python IR #+begin_src python from sparcur.utils import path_ir ir = path_ir('curation-export.json') #+end_src #+begin_src python def main(): from sparcur.reports import Report from sparcur.utils import path_ir from pyontutils.core import OntResIri ori = OntResIri('https://cassava.ucsd.edu/sparc/preview/exports/curation-export.ttl') graph = ori.graph ir = path_ir('/tmp/curation-export-test.json') rows = Report._hubmap_terms(graph, ir) anat = [r for r in rows if r[1].prefix in ('UBERON', 'FMA', 'ILX')] all_r = expand_label_curie(rows) ana_r = expand_label_curie(anat) return all_r, ana_r if __name__ == '__main__': return main() #+end_src ** Retrieve protocols Create a protocols.io account and get API keys. Then run the following to register with protocols.io. NOTE this is broken at the moment. Manual steps can be found in [[./setup.org::#config-templates]] #+begin_src bash python -c "import idlib; idlib.Pio._setup()" #+end_src You can then run the following to retrieve protocol data. #+begin_src python import idlib from pyontutils.core import OntResIri from pyontutils.namespaces import sparc, rdf def getpio(i): try: return idlib.Pio(i) except idlib.exc.IdlibError as e: pass def getdata(s): try: return s.data() except (idlib.exc.NotAuthorizedError) as e: print(e) except (idlib.exc.IdDoesNotExistError) as e: print(e) def main(): ori = OntResIri("https://cassava.ucsd.edu/sparc/preview/exports/protcur.ttl") g = ori.graph pids = list(g[:rdf.type:sparc.Protocol]) streams = [s for i in pids for s in (getpio(i),) if s] datas = [getdata(s) for s in streams] return datas if __name__ == '__main__': main() #+end_src * Extending implementation ** Adding a new xml type #+begin_src python def new_xml_format(path): from sparcur.extract import xml ex = xml.XmlSource(path) top_tag = ex.e.getroot().tag #+end_src * Workflow ** All datasets *** _*Retrieve*_ **** Overview :ignore: The dependency DAG is as follows. # NOTE the workflow for generating these diagrams takes multiple steps # first write the graph in racket, where we can use dashes in names # conver to dot and add clusters as needed #+name: graph-retrieve-all #+header: :wrap "src dot :file ./images/graph-retrieve-all.png :cmdline -Kdot -Tpng :exports results :cache yes" #+begin_src racket :lang racket/base :exports none :noweb no-export :cache yes <> (define g (dag-notation fetch-all -> fetch-metadata-files -> pull -> sparse-materialize -> fetch-remote-metadata fetch-all -> fetch-files -> pull -> clone fetch-all -> fetch-remote-metadata fetch-all -> fetch-annotations )) (graphviz g) #+end_src #+RESULTS[5b1ab6330a12cfe55439af47a6bd717498fc6c7d]: graph-retrieve-all #+begin_src dot :file ./images/graph-retrieve-all.png :cmdline -Kdot -Tpng :exports results :cache yes digraph G { node0 [label="fetch-metadata-files"]; node1 [label="clone"]; node2 [label="fetch-all"]; node3 [label="fetch-remote-metadata"]; node4 [label="pull"]; node5 [label="fetch-annotations"]; node6 [label="sparse-materialize"]; node7 [label="fetch-files"]; subgraph U { edge [dir=none]; } subgraph cluster_F { color=none; node2; } subgraph cluster_D { label="Dataset"; color=green; node0 -> node4; node2 -> node7; node2 -> node0; node2 -> node3; node4 -> node1; node4 -> node6; node6 -> node3; node7 -> node4; } subgraph cluster_P { label="Protcur"; color=purple; node2 -> node5; } } #+end_src #+RESULTS[8febbedbaf66631abc1d1c9ed53915698665c236]: [[file:./images/graph-retrieve-all.png]] **** Bash implementation :ignore: # TODO we need some way to snapshot and deliver the contents of # --cache-path (usually ~/.cache/sparcur/) so that pipelines can be # rerun in a way that is deterministic, this is primarily an issue for # the pennsieve remote metadata that is updated at every run the quick # and dirty fix is to set SPARCUR_CACHE_PATH=${PARENT_PATH}/cache and # then symlink in everything except the blackfynn-meta folder but the # right thing to do is probably to add a configuration option for the # location of each cache folder # FIXME this is really an env file not a bin file ... #+name: pipeline-functions-sparc-get-all-remote-data #+begin_src bash -r -l "\([[:space:]]*#[[:space:]]*(ref:%s)\|[[:space:]]*(ref:%s)\)$" :tangle ../bin/pipeline-functions.sh :mkdirp yes function sparc-time-friendly () { local UTC_OFFSET_START local TIME_START_NO_OFFSET # gnu coreutils gdate needed for osx support/freebsd # gdate on darwin only has millisecond resolution? # this also won't work on freebsd without gnu coreutils iso8601millis="+%FT%T,%6N" # FIXME do we _really_ need millis!? yemaybe? concurrent startups? utcoffset="+%:z" # I really hope the utc offset doesn't change between start & end # but laptops and airplains do exist, so it could # also how utterly annoying that the date separator and the # negative utc offset share the same symbol ... talk about # an annoying design flaw that is going to haunt humanity # with double the number of calls to date for # ... as # long as anoyone is writing code to deal with time TIME_START_NO_OFFSET=$(date ${iso8601millis} || gdate ${iso8601millis}) UTC_OFFSET_START=$(date ${utcoffset} || gdate ${utcoffset}) local TIME_START="${TIME_START_NO_OFFSET}${UTC_OFFSET_START}" # XXX unused local TIME_START_NO_OFFSET_FS_OK=${TIME_START_NO_OFFSET//:/} local UTC_OFFSET_START_FS_OK=${UTC_OFFSET_START//:/} local TIME_START_FRIENDLY=${TIME_START_NO_OFFSET_FS_OK}${UTC_OFFSET_START_FS_OK} # So. iso8601 guidance on what to do about subsecond time and the utc offset in the compact # representation is not entirely clear, however I _think_ that %FT%T%H%M%S,%6N%z is ok but # the -/+ must stay between the timezone and the rest, so we will have to grab tz by itself local TIME_START_SAFE=${TIME_START_NO_OFFSET_FS_OK//-/}${UTC_OFFSET_START_FS_OK} # XXX unused mv "$(mktemp --directory sparcur-all-XXXXXX)" "${TIME_START_FRIENDLY}" || \ { CODE=$?; echo 'mv failed'; return $CODE; } echo "${TIME_START_FRIENDLY}" } function sparc-get-all-remote-data () { # NOTE not quite all the remote data, the google sheets # don't have caching functionality yet # parse args local POSITIONAL=() while [[ $# -gt 0 ]] do key="$1" case $key in # (ref:(((((((sigh) --project-id) local PROJECT_ID="${2}"; shift; shift ;; --symlink-objects-to) local SYMLINK_OBJECTS_TO="${2}"; shift; shift ;; --log-path) local LOG_PATH="${2}"; shift; shift ;; --parent-path) local PARENT_PATH="${2}"; shift; shift ;; --only-filesystem) local ONLY_FILESYSTEM="ONLY_FS"; shift ;; -h|--help) echo "${HELP}"; return ;; ,*) POSITIONAL+=("$1"); shift ;; esac done # Why, you might be asking, are we declaring a local project path here without assignment? # Well. Let me tell you. Because local is a command with an exist status. So it _always_ # returns zero. So if you need to check the output of the command running in a subshell # that you are assigning to a local variable _ALWAYS_ set local separately first. # Yes, shellcheck does warn about this. See also https://superuser.com/a/1103711 local PROJECT_PATH if [[ -z "${PARENT_PATH}" ]]; then local PARENT_PATH set -o pipefail PARENT_PATH=$(sparc-time-friendly) || { CODE=$?; echo "Creating "'${PARENT_PATH}'" failed!" set +o pipefail return $CODE; } set +o pipefail fi local LOG_PATH=${LOG_PATH:-"${PARENT_PATH}/logs"} #local LOG_PATH=$(python -c "from sparcur.config import auth; print(auth.get_path('log-path'))") local PROJECT_ID=${PROJECT_ID:-$(python -c "from sparcur.config import auth; print(auth.get('remote-organization'))")} local maybe_slot=() if [[ -n "${SYMLINK_OBJECTS_TO}" ]]; then # MUST use arrays to capture optional arguments like this otherwise # arg values with spaces in them will destroy your sanity maybe_slot+=(--symlink-objects-to "${SYMLINK_OBJECTS_TO}") fi echo "${PARENT_PATH}" # needed to be able to follow logs if [ ! -d "${LOG_PATH}" ]; then mkdir "${LOG_PATH}" || { CODE=$?; echo 'mkdir of ${LOG_PATH} failed'; return $CODE; } fi if [[ -z "${ONLY_FILESYSTEM}" ]]; then # fetch annotations (ref:bash-pipeline-fetch-annotations) echo "Fetching annotations metadata" python -m sparcur.simple.fetch_annotations > "${LOG_PATH}/fetch-annotations.log" 2>&1 & local pids_final[0]=$! # fetch remote metadata (ref:bash-pipeline-fetch-remote-metadata-all) # if this fails with 503 errors, check the # remote-backoff-factor config variable echo "Fetching remote metadata" python -m sparcur.simple.fetch_remote_metadata_all \ --project-id "${PROJECT_ID}" \ > "${LOG_PATH}/fetch-remote-metadata.log" 2>&1 & local pids[0]=$! fi local FAIL=0 # clone aka fetch top level # we do not background this assignment because it runs quickly # and everything that follows depends on it finishing, plus we # need it to finish to set the PROJECT_PATH variable here echo python -m sparcur.simple.clone --project-id "${PROJECT_ID}" --parent-path "${PARENT_PATH}" "${maybe_slot[@]}" echo "Cloning top level" set -o pipefail PROJECT_PATH=$(python -m sparcur.simple.clone \ --project-id "${PROJECT_ID}" \ --parent-path "${PARENT_PATH}" \ "${maybe_slot[@]}" \ 2>&1 | tee "${LOG_PATH}/clone.log" | tail -n 1) || { # TODO tee the output when verbose is passed CODE=$?; tail -n 100 "${LOG_PATH}/clone.log"; echo "Clone failed! The last 100 lines of ${LOG_PATH}/clone.log are listed above."; apids=( "${pids[@]}" "${pids_final[@]}" ); for pid in "${apids[@]}"; do kill $pid; done; set +o pipefail return $CODE; } set +o pipefail # explicit export of the current project path for pipelines # ideally we wouldn't need this, and when this pipeline # finished the export pipeline would kick off, or the export # pipeline would search for ... an existing project path ... # by ... oh right, looking for an environment variable or # checksing some other persistent state ... so this is the one # unless some controlling process sets it top down from the start # but we can't assume that export SPARCUR_PROJECT_PATH="${PROJECT_PATH}" for pid in "${pids[@]}"; do wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; } done if [[ $FAIL -ne 0 || -z "${PROJECT_PATH}" ]]; then echo "${FAIL} commands failed. Cannot continue." echo "${PROJECT_PATH}" return 1 fi # pull aka fetch file system metadata echo "Fetching file system metadata" echo python -m sparcur.simple.pull --project-path "${PROJECT_PATH}" python -m sparcur.simple.pull \ --project-path "${PROJECT_PATH}" \ > "${LOG_PATH}/pull.log" 2>&1 || { CODE=$?; tail -n 100 "${LOG_PATH}/pull.log"; echo "Pull failed! The last 100 lines of ${LOG_PATH}/pull.log are listed above."; echo "${PROJECT_PATH}"; return $CODE; } # fetch metadata files echo "Fetching metadata files" # have to pass project path as a position argument here so that it # does not try to pull aka fetch the file system metadata again echo python -m sparcur.simple.fetch_metadata_files --project-path "${PROJECT_PATH}" python -m sparcur.simple.fetch_metadata_files \ --project-path "${PROJECT_PATH}" \ > "${LOG_PATH}/fetch-metadata-files.log" 2>&1 & pids_final[1]=$! # fetch files echo "Fetching files" # XXX at some point this will probably also depend on the manifests # so we don't fetch everything with a matching extension # TODO derive --extension from manifests or all it to be passed in echo python -m sparcur.simple.fetch_metadata_files --project-path "${PROJECT_PATH}" --extension xml # FIXME fetch_files fails silently here :/ python -m sparcur.simple.fetch_files \ --project-path "${PROJECT_PATH}" \ --extension xml \ > "${LOG_PATH}/fetch-files.log" 2>&1 & pids_final[2]=$! local FAIL=0 for pid in "${pids_final[@]}"; do wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; } done # FIXME HACK #find -type f -size 0 -exec getfattr -d {} \; #find -type f -size 0 -exec spc fetch --limit=-1 {} \; if [[ $FAIL -ne 0 ]]; then echo "${FAIL} commands failed. Cannot continue." echo "${PROJECT_PATH}" return 1 fi echo "All fetching completed successfully." } #+end_src *** _*Validate*_ **** Overview :ignore: This is the graph of the existing approach more or less as implemented by ~spc export~. A slightly more sane version is being implemented as part of =sparcur.simple= which will sandbox the network dependencies. # runs both but I'm fairly cerain that it fails to update the second code block # #+name: graph-validate-run-both # #+begin_src elisp :var one=graph-validate-all() two=graph-validate-all-dot() :results none # #+end_src #+name: graph-validate-all #+header: :wrap "src dot :file ./images/graph-validate-all.png :cmdline -Kdot -Tpng :exports results :cache yes" #+begin_src racket :lang racket/base :exports none :noweb no-export :cache yes <> (define g (dag-notation ; I had description listed depending on dataset-structure ; but that is really an implementation detail pipeline-end -> pipeline-extras -> sparc-ds -> pipeline-start -> description -> fetch-all pipeline-start -> dataset-structure -> fetch-all pipeline-start -> dataset-metadata -> fetch-all ; note that this is the idealized flow ; the actual flow is through pipeline-start sparc-ds -> submission -> description submission -> fetch-all sparc-ds -> subjects -> description subjects -> fetch-all sparc-ds -> samples -> description samples -> fetch-all sparc-ds -> manifest -> description manifest -> fetch-all pipeline-extras -> submission-normalized -> submission pipeline-extras -> pipeline-files-xml -> cache -> fetch-all ; -> fetch-files pipeline-extras -> contributors -> affiliations #;lifters -> affiliations-sheet -> sheets -> network contributors -> member #;state -> pennsieve-api -> network contributors -> description pipeline-extras -> meta-extra -> dataset-doi -> pipeline-remote-metadata -> cache ; -> fetch-remote-metadata meta-extra -> dataset-remote-readme -> pipeline-remote-metadata meta-extra -> dataset-remote-status -> pipeline-remote-metadata meta-extra -> organ-term #;lifters -> organs-sheet -> sheets meta-extra -> modality #;lifters -> organs-sheet meta-extra -> techniques #;lifters -> organs-sheet meta-extra -> protocol-uris #;lifters -> organs-sheet meta-extra -> award-manual #;lifters -> organs-sheet meta-extra -> award-organ #;lifters -> submission-normalized award-organ -> scraped-award-organ pipeline-extras -> pipeline-extras-updates -> identifier-resolution -> network pipeline-extras -> pipeline-protcur -> cache ; -> fetch-annotations ??? -> overview-sheet -> sheets)) ;; subgraphs (define lifters '(affiliations organ-term modality techniques protocol-uris award-manual award-organ)) (define state '(member)) (define network '(network pennsieve-api sheets affiliation-sheet organs-sheet overview-sheet)) (define-vertex-property g vertex-id #:init $id) ; doesn't work to get the graphviz node numbering (define-vertex-property g in-lifters?) (for-each (λ (v) (in-lifters?-set! v #t)) lifters) (define-vertex-property g in-state?) (for-each (λ (v) (in-state?-set! v #t)) state) (define-vertex-property g in-network?) (for-each (λ (v) (in-network?-set! v #t)) network) (graphviz g) #+end_src #+name: graph-validate-all-dot #+RESULTS[16bcd2566c9bc6aca9c4c547144fe50c5a542558]: graph-validate-all #+begin_src dot :file ./images/graph-validate-all.png :cmdline -Kdot -Tpng :exports results :cache yes digraph G { node0 [label="description"]; node1 [label="modality"]; node2 [label="dataset-doi"]; node3 [label="pennsieve-api"]; node4 [label="dataset-metadata"]; node5 [label="samples"]; node6 [label="subjects"]; node7 [label="award-manual"]; node8 [label="submission-normalized"]; node9 [label="organs-sheet"]; node10 [label="scraped-award-organ"]; node11 [label="member"]; node12 [label="sheets"]; node13 [label="award-organ"]; node14 [label="network"]; node15 [label="submission"]; node16 [label="fetch-all"]; node17 [label="manifest"]; node18 [label="techniques"]; node19 [label="overview-sheet"]; node20 [label="pipeline-extras"]; node21 [label="pipeline-end"]; node22 [label="pipeline-start"]; node23 [label="protocol-uris"]; node24 [label="affiliations"]; node25 [label="affiliations-sheet"]; node26 [label="contributors"]; node27 [label="organ-term"]; node28 [label="meta-extra"]; node29 [label="dataset-structure"]; node30 [label="sparc-ds"]; node31 [label="???"]; subgraph U { edge [dir=none]; } subgraph cluster_M { label="Metadata Files"; color=green; node0; node5; node6; node15; node17; } subgraph cluster_L { label="Lifters (bad design)"; color=red; node1; node7; node13; node18; node24; node23; node27; } subgraph D { node0 -> node16; node1 -> node9; node2 -> node3; node3 -> node14; node4 -> node16; node5 -> node0; node5 -> node16; node6 -> node0; node6 -> node16; node7 -> node9; node8 -> node15; node9 -> node12; node11 -> node3; node12 -> node14; node13 -> node10; node13 -> node8; node15 -> node0; node15 -> node16; node17 -> node0; node17 -> node16; node18 -> node9; node19 -> node12; node20 -> node30; node20 -> node28; node20 -> node26; node20 -> node8; node21 -> node20; node22 -> node0; node22 -> node29; node22 -> node4; node23 -> node9; node24 -> node25; node25 -> node12; node26 -> node0; node26 -> node11; node26 -> node24; node27 -> node9; node28 -> node1; node28 -> node2; node28 -> node18; node28 -> node13; node28 -> node23; node28 -> node7; node28 -> node27; node29 -> node16; node30 -> node17; node30 -> node5; node30 -> node15; node30 -> node22; node30 -> node6; node31 -> node19; } } #+end_src #+RESULTS[20008f92af2cbbe5a5aa89221885829ea3bd0f11]: graph-validate-all-dot [[file:./images/graph-validate-all.png]] **** ??? implementation :ignore: *** _*Export*_ **** Overview :ignore: In the current implementation validation and export are conflated. This is bad, and will be changed. =spc export= must only be run after =sparc-get-all-remote-data=, otherwise there will be network sandbox violations. For the record there are multiple way invoke =spc export=. #+begin_src bash :eval never # pushd to the project location pushd "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}" spc export popd # pass the project location as a positional argument spc export "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}" # pass the project location as an option spc export --project-path "${PROJECT_PATH:-SPARCUR_PROJECT_PATH}" #+end_src At the moment =sparc-export-all= is just a wrapper around =spc export=. This will change as we move to a single dataset export model. There will then likely be a function that checks for datasets that have changed since last export, updates only those and then collects the outputs. **** Bash implementation :ignore: #+name: pipeline-functions-sparc-export-all #+begin_src bash -r -l "\([[:space:]]*#[[:space:]]*(ref:%s)\|[[:space:]]*(ref:%s)\)$" :tangle ../bin/pipeline-functions.sh function sparc-export-all () { # parse args local POSITIONAL=() while [[ $# -gt 0 ]] do key="$1" case $key in # (ref:(((sigh) --project-path) local PROJECT_PATH="${2}"; shift; shift ;; -h|--help) echo "${HELP}"; return ;; ,*) POSITIONAL+=("$1"); shift ;; esac done local PROJECT_PATH="${PROJECT_PATH:-$SPARCUR_PROJECT_PATH}" spc export --project-path "${PROJECT_PATH}" } #+end_src ** Single dataset *** _*Retrieve*_ **** Overview :ignore: #+name: graph-retrieve-single #+header: :wrap "src dot :file ./images/graph-retrieve-single.png :cmdline -Kdot -Tpng :exports results :cache yes" #+begin_src racket :lang racket/base :exports none :noweb no-export :cache yes <> (define g (dag-notation : pull -> clone digest-2 -> file-contents -> fetch-files -> filter-files -> path-metadata -> pull -> sparse-materialize -> fetch-remote-metadata : fetch-2 -> fetch-files : path-metadata-validate -> path-metadata : digest-1 -> path-metadata : digest-1 -> pipeline-start -> fetch-metadata-files -> pull : fetch-1 -> fetch-metadata-files : fetch-1 -> fetch-annotations )) (graphviz-2 g #:clusters (list (new-cluster "retrieve.py" 'green '(fetch-remote-metadata clone sparse-materialize pull fetch-metadata-files)) (new-cluster "path_metadata_validate.py" 'purple '(path-metadata-validate path-metadata)))) #+end_src #+RESULTS[0a6edf0f4a2695740e58f78d9a31e1c88fb4ba3e]: graph-retrieve-single #+begin_src dot :file ./images/graph-retrieve-single.png :cmdline -Kdot -Tpng :exports results :cache yes digraph G { node0 [label="path-metadata-validate"]; node1 [label="fetch-2"]; node2 [label="digest-2"]; node3 [label="fetch-metadata-files"]; node4 [label="filter-files"]; node5 [label="file-contents"]; node6 [label="sparse-materialize"]; node7 [label="fetch-annotations"]; node8 [label="fetch-files"]; node9 [label="clone"]; node10 [label="fetch-remote-metadata"]; node11 [label="fetch-1"]; node12 [label="digest-1"]; node13 [label="path-metadata"]; node14 [label="pull"]; node15 [label="pipeline-start"]; subgraph U { edge [dir=none]; } subgraph cluster_0 { label="retrieve.py"; color=green; node10; node9; node6; node14; node3; } subgraph cluster_1 { label="path_metadata_validate.py"; color=purple; node0; node13; } subgraph D { node0 -> node13; node1 -> node8; node2 -> node5; node3 -> node14; node4 -> node13; node5 -> node8; node6 -> node10; node8 -> node4; node11 -> node7; node11 -> node3; node12 -> node15; node12 -> node13; node13 -> node14; node14 -> node9; node14 -> node6; node15 -> node3; } } #+end_src #+RESULTS[e1bd20c4ba0201415ec2998cd18e8226bc9f801f]: [[file:./images/graph-retrieve-single.png]] **** Bash implementation :ignore: Desired invocation. #+begin_src bash sparc-fexport ${UUID} sparc-fexport dataset:${UUID} sparc-fexport N:dataset:${UUID} sparc-fexport https://api.pennsieve.io/datasets/N:dataset:${UUID} sparc-fexport https://app.pennsieve.io/N:organization:618e8dd9-f8d2-4dc4-9abb-c6aaab2e78a0/datasets/N:dataset:${UUID} #+end_src Desired behavior. | dataset state | objects state | action | outcome | |---------------+-------------------+-------------------+---------------------------| | not-existing | do symlink to | retrieve | spc export; path-metadata | | existing | either don't fail | retrieve or equiv | spc export; path-metadata | Desired output. Dataset to =exports/datasets/${UUID}/${TIMESTAMP_FRIENDLY}/= and is symlinked to =exports/${UUID}/LATEST=. The last run dataset itself goes to =exports/datasets/LATEST=. We don't have to care about the project identifier because each UUID is unique. TODO Logging goes ???? This is a quick and dirty version that should just do the right thing given only the dataset id as an input. Usage =sparc-fexport N:dataset:totally-not-a-uuid=. The id provided may be any of the variants, url, curie, api, human, uuid, etc. #+name: pipeline-functions-sparc-fexport #+begin_src bash :tangle ../bin/pipeline-functions.sh :mkdirp yes function sparc-export () { echo TODO not ready yet return 1 } function sparc-fexport () { local DATASET_ID="${1}" local DATASET_UUID local DATASET_PATH local EXPORT_PATH DATASET_UUID="$(python -m sparcur.simple.utils --dataset-id ${DATASET_ID})" python -m sparcur.simple.retrieve --dataset-id ${DATASET_UUID} && EXPORT_PATH="$(realpath "${DATASET_UUID}/exports")" && DATASET_PATH="$(realpath "${DATASET_UUID}/dataset")" && pushd "${DATASET_PATH}" && # FIXME we shouldn't need this refetch so I think that retrieve is # broken if files/folders already exist python -m sparcur.cli find \ --name '*.xlsx' \ --name '*.xml' \ --name 'submission*' \ --name 'code_description*' \ --name 'dataset_description*' \ --name 'subjects*' \ --name 'samples*' \ --name 'manifest*' \ --name 'resources*' \ --name 'README*' \ --no-network \ --limit -1 \ --fetch wait $! python -m sparcur.cli export --export-path "${EXPORT_PATH}" & # FIXME TODO this conflates phases local pids[0]=$! # FIXME TODO for now export_single_dataset produces this so we don't run it independently # FIXME there is also a difference in the export folder because the path metadata targets # the last updated data and thus overwrites if the data has not changed but the code has #python -m sparcur.simple.path_metadata_validate --export-path "${EXPORT_PATH}" & #local pids[1]=$! local FAIL=0 # TODO log/notify export failure for pid in "${pids[@]}"; do wait $pid || { FAIL=$((FAIL+1)); echo "${pid} failed!"; } done if [[ $FAIL -ne 0 ]]; then echo "${FAIL} commands failed. Cannot continue." echo "${DATASET_UUID}" echo "${DATASET_PATH}" return 1 fi popd # or do it yourself because we might need to explore?? } #+end_src # FIXME TODO #+begin_src bash for d in $(ls *-* -d); do refetch ${d}; done for d in $(ls *-* -d); do find ~/".local/share/sparcur/export/datasets/${d}/LATEST/curation-export.json" -name 'curation-export.json'; done #+end_src *** _*Validate*_ **** Overview :ignore: See ref:graph-validate-all for all the related bits. #+name: graph-validate-single #+header: :wrap "src dot :file ./images/graph-validate-single.png :cmdline -Kdot -Tpng :exports results :cache yes" #+begin_src racket :lang racket/base :exports none :noweb no-export :cache yes <> (define g (dag-notation ; inside tabular -> json-ir ... it goes ... data-0 -> condense -> cull-rtk -> normalize-values -> clean -> expand-string-lists -> filter -> cull -> transformed -> keyed -> add_missing -> sanity-check -> objectify -> norm_alt_headers -> headers -> tabular ; VERY large number of steps, and I am 99% sure that we need to take normalize-values out of there, but there was a good reason to include them, ; namely that it is way easier to run everything backward from normalize-values to the spreadsheets than it is to go from json-ir all the way back ; to the original sheets, however we need to rearchitect to get there ; another VERY long pipeline that does the json processing, and which desparately needs to be simplified and made properly composable ; (as we are trying to do in this dev guide) data-n -> added -> pipeline-end -> augmented -> updated -> cleaned -> moved -> copied -> subpipelined -> ffail -> previous-pipeline -> data-0 json-ir -> data-n json-ir -> json-raw ttl -> json-ld -> json-ir ttl -> json-ir json-export -> json-ir )) (graphviz-2 g #:clusters '( )) #+end_src #+RESULTS[54c9e97e880b92a4c88f7e34a20be8a06198c76c]: graph-validate-single #+begin_src dot :file ./images/graph-validate-single.png :cmdline -Kdot -Tpng :exports results :cache yes digraph G { node0 [label="cull-rtk"]; node1 [label="tabular"]; node2 [label="subpipelined"]; node3 [label="pipeline-end"]; node4 [label="clean"]; node5 [label="copied"]; node6 [label="moved"]; node7 [label="updated"]; node8 [label="sanity-check"]; node9 [label="objectify"]; node10 [label="keyed"]; node11 [label="ffail"]; node12 [label="cleaned"]; node13 [label="cull"]; node14 [label="normalize-values"]; node15 [label="data-n"]; node16 [label="expand-string-lists"]; node17 [label="previous-pipeline"]; node18 [label="transformed"]; node19 [label="filter"]; node20 [label="data-0"]; node21 [label="json-export"]; node22 [label="ttl"]; node23 [label="augmented"]; node24 [label="condense"]; node25 [label="add_missing"]; node26 [label="headers"]; node27 [label="json-ir"]; node28 [label="json-ld"]; node29 [label="json-raw"]; node30 [label="added"]; node31 [label="norm_alt_headers"]; subgraph U { edge [dir=none]; } subgraph D { node0 -> node14; node2 -> node11; node3 -> node23; node4 -> node16; node5 -> node2; node6 -> node5; node7 -> node12; node8 -> node9; node9 -> node31; node10 -> node25; node11 -> node17; node12 -> node6; node13 -> node18; node14 -> node4; node15 -> node30; node16 -> node19; node17 -> node20; node18 -> node10; node19 -> node13; node20 -> node24; node21 -> node27; node22 -> node28; node22 -> node27; node23 -> node7; node24 -> node0; node25 -> node8; node26 -> node1; node27 -> node29; node27 -> node15; node28 -> node27; node30 -> node3; node31 -> node26; } } #+end_src #+RESULTS[e46fa5a6ebbfedba0f31b96f7f5a9a31a179b71f]: [[file:./images/graph-validate-single.png]] **** Extract #+begin_src bash python -m sparcur.simple.extract "${DATASET_PATH}" #+end_src **** Retrieve 2 *** _*Export*_ ** Future correct single dataset workflow Network access should only be possible during the retrieve phase. The validate step may happen during extract and transform as well since structure or data content issues may be easier to detect during certain phases. Ideally this would not be the case, but practically it will take more work than necessary given our use cases. We have to be careful to separate basic validation of the structure of the dataset data from the validation that the identifiers provided in that structure point to values known to their respective remotes. For example we need to be able to say =you are missing a protocol reference= at a separate point in time from saying =the remote(s) we asked had no record= =of the protocol reference you provided=. *** Hypothes.is This is pulled in bulk independently in a different workflow but it is probably worth checking to see if we need to run it again whenever we run a dataset. *** Structure metadata **** Retrieve This is ref:clone.py, ref:fetch_remote_metadata_all.py, and ref:pull.py. **** Extract This is now called ref:path_metadata, but it is also dealt with as part of specimen_dirs. **** Transform **** Validate We can't do this right now because the current dataset template cannot be statically validated. Only some of it can be validated when we have the data from the subjects and samples sheets. In this future pipeline the type prefixes will be required so that the structure can be statically verified. *** Dataset metadata Run this if the structure metadata is in a state where we can proceed (i.e. that there is a dataset_description file). **** Retrieve This is ref:fetch_metadata_files.py. **** Extract **** Transform **** Validate *** File metadata **** Retrieve This is ref:fetch_files.py. Depends on the manifest files and the dataset structure. **** Extract **** Transform **** Validate This is local validation, not remote networked validation. *** Identifiers and mapping **** Retrieve This is the retrieval/dereferencing of identifier metadata. It must happen after the file metadata step has been completed so that e.g. identifiers used in MBF segmentation files can be validated. In this step in particular validation and retrieval are essentially the same step. If there is an error during retrieval then it must produce a validation error. *** Protocols Checking and/or retrieving these depends on 3 things. The protocols.io group, the hypothesis group, and the dataset metadata. *** Protc Needs hypothesis and protocols. *** Export **** Convert **** Serialize Probably also includes load in some cases e.g. for the file level metadata that will be attached to package-id file-id pairs. # TODO issue with dumping packages into mongo is that we will # have to flag packages and collections as deleted ** Protocols #+name: graph-protocols #+header: :wrap "src dot :file ./images/graph-protocols.png :cmdline -Kdot -Tpng :exports results :cache yes" #+begin_src racket :lang racket/base :exports none :noweb no-export :cache yes <> (define g (dag-notation export-protcur -> pipeline-protcur -> cache-annotations -> fetch-annotations pipeline-protcur -> fetch-protocols-io -> network ; FIXME fetching the protocols.io metadata is a major network sandbox violation ; ideally we can untangle this, but it is difficult, also issues with how we ; are caching put this at risk of going stale )) (graphviz g) #+end_src #+RESULTS[aaeaed353b6b51181c18cdb722696d821a27f63f]: graph-protocols #+begin_src dot :file ./images/graph-protocols.png :cmdline -Kdot -Tpng :exports results :cache yes digraph G { node0 [label="fetch-protocols-io"]; node1 [label="pipeline-protcur"]; node2 [label="network"]; node3 [label="cache-annotations"]; node4 [label="export-protcur"]; node5 [label="fetch-annotations"]; subgraph U { edge [dir=none]; } subgraph D { node0 -> node2; node1 -> node3; node1 -> node0; node3 -> node5; node4 -> node1; } } #+end_src #+RESULTS[e419d8438b4609bab73327984f217d394a78f995]: [[file:./images/graph-protocols.png]] ** Release Since we are moving to run individual datasets the aggregate release process is decouple, and mediated via git:36a749b5c321cdb81ba81f9d35e050ceb8479976 *** Testing **** Code **** Data *** Reporting After a release ** SCKAN :PROPERTIES: :CUSTOM_ID: sckan :END: *** Overview #+name: sckan-ideal-run #+header: :wrap "src dot :file ./images/sckan-ideal-run.png :cmdline -Kdot -Tpng :exports results :cache yes" #+begin_src racket :lang racket/base :exports none :noweb no-export :cache yes <> (define g (dag-notation sckan-release-docker-image -> sckan-scigraph : sckan-scigraph-zip -> sckan-scigraph sckan-release-docker-image -> sckan-blazegraph : sckan-raw-zip -> sckan-blazegraph ; scigraph sckan-scigraph -> apinat-ttl ; FIXME this is bad due to sync issues sckan-scigraph -> nifstd sckan-scigraph -> mkdir -> sys-apps/coreutils -> gentoo : |git clone pyontutils| -> git sckan-scigraph -> run-load-graph-sparc-sckan -> scigraph-readme -> |git clone pyontutils| -> remote : scigraph-readme -> emacs : scigraph-readme -> bash : run-load-graph-sparc-sckan -> ontload -> scigraph-load -> dev-java/scigraph-bin -> tgbugs-overlay : ontload -> dev-python/pyontutils -> tgbugs-overlay sckan-scigraph -> ontologies-sparc-sckan.yaml -> |git clone sparc-curation| -> git -> dev-vcs/git -> gentoo sckan-scigraph -> sparc-sckan.ttl -> sparc-data.ttl -> |git clone sparc-curation| : sparc-sckan.ttl -> sed -> sys-apps/sed -> gentoo : sparc-sckan.ttl -> cat -> sys-apps/coreutils -> gentoo : sparc-sckan.ttl -> cassava-protcur.ttl -> remote : sparc-sckan.ttl -> nif.ttl -> nifstd ; TODO derive the owl import chains by actually reading the file : nif.ttl -> anatomy-bridge.ttl -> uberon-bridge.ttl -> uberon.owl : nif.ttl -> methods-bridge.ttl -> methods.ttl -> approach.ttl : nif.ttl -> neuron-bridge.ttl : sparc-sckan.ttl -> extra.ttl -> nifstd : extra.ttl -> emapa.owl : sparc-sckan.ttl -> scicrunch-registry.ttl -> nifstd : sparc-sckan.ttl -> curation-export-published.owl -> cassava-curation-export-published.ttl : curation-export-published.owl -> ttlfmt -> dev-python/ttlser -> tgbugs-overlay ; blazegraph sckan-blazegraph -> blazegraph-runner -> remote : blazegraph-runner -> dev-java/icedtea-bin sckan-blazegraph -> prefixes.conf sckan-blazegraph -> queries.org -> |git clone sparc-curation| -> remote sckan-blazegraph -> approach.ttl -> nifstd : approach.ttl -> nifstd sckan-blazegraph -> protcur.ttl -> cassava-protcur.ttl sckan-blazegraph -> hypothesis-protcur sckan-blazegraph -> sparc-community-terms.ttl -> nifstd : -> sparc-community-terms.ttl -> sparc-community-terms-update -> scigraph-readme : sparc-community-terms-update -> nifstd : sparc-community-terms-update -> |interlex server alt| -> interlex -> dev-python/interlex -> tgbugs-overlay : sparc-community-terms-update -> git : sparc-community-terms-update -> bash : sparc-community-terms-update -> emacs : sparc-community-terms-update -> curl -> net-misc/curl -> gentoo sckan-blazegraph -> cassava-apinatomy-*.ttl sckan-blazegraph -> cassava-curation-export-published.ttl sckan-blazegraph -> res-uberon -> uberon.owl -> remote : res-uberon -> robot -> dev-java/robot-bin -> gentoo -> repos -> remote sckan-blazegraph -> res-emapa -> emapa.owl -> remote : res-emapa -> robot sckan-blazegraph -> res-methods -> methods.ttl -> nifstd : res-methods -> robot sckan-blazegraph -> res-npo -> npo.ttl -> nifstd : npo.ttl -> neuron-bridge.ttl : npo.ttl -> |python -m neurondm.build release| -> |interlex server alt| : |python -m neurondm.build release| -> python : |python -m neurondm.build release| -> dev-python/neurondm : |python -m neurondm.build release| -> |git clone pyontutils| : res-npo -> robot sckan-blazegraph -> res-mis -> sparc-methods.ttl -> nifstd ; XXX not quite acurate since unres is direct : res-mis -> robot sckan-blazegraph -> res ; phony for organization purposes only : res -> res-uberon : res -> res-emapa : res -> res-methods : res -> res-npo : res -> res-mis : res -> robot ; ontology nifstd -> |git clone NIF-Ontology| -> git : |git clone NIF-Ontology| -> remote ; apinat : |git clone apinatomy-models| -> remote cassava-apinatomy-*.ttl -> apinat-ttl -> |apinat-build --all| -> |git clone apinatomy-models| -> git : |apinat-build --all| -> emacs : |apinat-build --all| -> apinat-converter -> dev-node/apinat-converter -> tgbugs-overlay -> repos cassava-apinatomy-*.ttl -> remote ; protcur cassava-protcur.ttl -> remote cassava-protcur.ttl -> curation-export -> python -> dev-lang/python -> gentoo ; XXX not entirely accurate : curation-export -> bash -> app-shells/bash -> gentoo : curation-export -> emacs -> app-editors/emacs -> gentoo : curation-export -> |python -m sparcur.cli| -> dev-python/sparcur -> gentoo ; alt dep ; curation-export cassava-curation-export-published.ttl -> curation-export -> |git clone sparc-curation| cassava-curation-export-published.ttl -> remote )) (graphviz-2 g #:clusters (map (λ (ncl) (apply new-cluster ncl)) '(("packages" yellow ( gentoo tgbugs-overlay repos app-editors/emacs app-shells/bash dev-java/icedtea-bin dev-java/robot-bin dev-java/scigraph-bin dev-lang/python dev-python/pyontutils dev-python/sparcur dev-python/ttlser dev-vcs/git sys-apps/coreutils sys-apps/sed net-misc/curl dev-python/interlex dev-node/apinat-converter dev-python/neurondm )) ("build-graph-sckan" cyan ( sckan-scigraph-zip sckan-scigraph ontologies-sparc-sckan.yaml sparc-sckan.ttl ontload scigraph-load run-load-graph-sparc-sckan )) ("release" cyan ( sckan-blazegraph sckan-raw-zip prefixes.conf queries.org blazegraph-runner )) ("ont" green ( anatomy-bridge.ttl approach.ttl emapa.owl extra.ttl methods-bridge.ttl methods.ttl methods.ttl neuron-bridge.ttl npo.ttl scicrunch-registry.ttl sparc-community-terms.ttl sparc-methods.ttl uberon.owl uberon-bridge.ttl nif.ttl #; ; renders better without nifstd )) ("apinat" red ( cassava-apinatomy-*.ttl apinat-ttl |apinat-build --all| apinat-converter |git clone apinatomy-models| )) ("protcur" magenta ( hypothesis-protcur cassava-protcur.ttl protcur.ttl )) ("curation-export" blue ( curation-export cassava-curation-export-published.ttl curation-export-published.owl )) ("res" purple ( res-emapa res-methods res-mis res-npo res-uberon ))))) #+end_src #+RESULTS[52b162604dfc475d3f664c6d68b4b125978871ed]: [[file:./images/sckan-ideal-run.png]] *** Implementation It should be possible to run all of these steps in a container derived from a =tgbugs/musl:kg-dev-user= docker image. See https://github.com/tgbugs/dockerfiles/blob/master/source.org#kg-dev-user https://hub.docker.com/repository/docker/tgbugs/musl/tags?name=kg-dev-user # In fact you should be able to get an interactive version of this file # in those images directly and be able to run these blocks directly from # emacs, but we aren't quite there yet. **** Data synchronization All of these things either run manually or are independent of the SCKAN release process. In most cases manual intervention is needed to ensure that various component sources are up to date. ***** SPARC curation export See [[./setup.org::#export-v4]] for the most recent workflow for batch release. ***** SPARC protcur #+begin_src bash python -m sparcur.simple.fetch_annotations python -m sparcur.cli export protcur #+end_src ***** NPO #+begin_src bash python -m neurondm.build release python -m neurondm.models.apinat_pops_more #+end_src Super manual curation, identifier sync, review, merge, commit, push, etc. # FIXME partial orders process is not documented here # for good reason, it currently requires building scigraph and querying The current workflow used to produce [file:~/git/NIF-Ontology/ttl/generated/neurons/apinat-partial-orders.ttl] is to run [[file:./queries.org::py-nbr-1]] and then [[file:./queries.org::apinat-add-partial-orders]]. Below is the start of some code to extract the partial orders without having to load scigraph first. #+begin_src jupyter-python :session pys #ori = OntResIri('https://cassava.ucsd.edu/ApiNATOMY/ontologies/.ttl') import rdflib from pathlib import Path from pyontutils.core import OntResPath, OntConjunctiveGraph from neurondm import orders as nord b = Path('/tmp/build') rp = sorted(b.glob('release-*-sckan'))[-1] orp = OntResPath(rp / "data/sparc-data.ttl") apinat_imports = [i for i in orp.imports if 'ApiNATOMY' in i.identifier] # FIXME conjunctive graph has to conjoin by sharing a store ??? wat apinat_graphs = [i.graph for i in apinat_imports] ocg = OntConjunctiveGraph() _ = [ocg.add((*t, g.boundIdentifier)) for g in apinat_graphs for t in g] set(ocg.predicates()) apinat = rdflib.Namespace('https://apinatomy.org/uris/readable/') elements = rdflib.Namespace('https://apinatomy.org/uris/elements/') # TODO and the we have to recreate the neru-7 query ancsl = list(ocg[:apinat.nextChainStartLevels:]) anext = list(ocg[:apinat.next:]) _soma = OntId('NLX:154731').u somas = [s for s in ocg[:apinat.ontologyTerms:_soma] if (s, rdf.type, elements.Lyph) in ocg] soma_links = set() def soma_link_edges(soma): # XXX uses internalIn instead of housingLyph for link in ocg[soma:apinat.conveys:]: soma_links.add(link) for node_s in ocg[link:apinat.source:]: for other_link in ocg[node_s:apinat.sourceOf:]: # FIXME only by convention that the direction matches if other_link != link: yield link, other_link for node_t in ocg[link:apinat.target:]: for other_link in ocg[node_t:apinat.sourceOf:]: # FIXME only by convention that the direction matches if other_link != link: yield link, other_link soma_edges = [pair for s in somas for pair in soma_link_edges(s)] def get_neuron(link): # FIXME this requires somas for lyph in ocg[link:apinat.conveyingLyph:]: for group in ocg[lyph:apinat.seedIn:]: for ot in ocg[group:apinat.ontologyTerms:]: return ot seeds = {l: neuron for l in soma_links if (neuron := get_neuron(l))} # FIXME right now we always seed on soma, but that is not required link_lookup = {} other_lookup = {} def link_to_ont_region_layer(link): # FIXME this doesn not quite match what we did in cypher # FIXME most importantly it is missing the soma links if link in soma_links: hlii = apinat.internalIn else: hlii = apinat.housingLyph cl = None for cl in ocg[link:apinat.conveyingLyph:]: hl = None for hl in ocg[cl:hlii:]: if hl in other_lookup: link_lookup[link] = other_lookup[hl] continue i = None for i, ot in enumerate(ocg[hl:apinat.ontologyTerms:]): pass if i is None: for i, ot in enumerate(ocg[hl:apinat.inheritedOntologyTerms:]): pass if i is None: for co in ocg[hl:apinat.cloneOf:]: for i, ot in enumerate(ocg[co:apinat.ontologyTerms:]): pass if i is None: for i, ot in enumerate(ocg[co:apinat.inheritedOntologyTerms:]): pass if i is None: log.error(('rol', hl)) continue elif i > 0: log.warning(f'multiple 0 ontology terms {i + 1} for {hl}') li = None for li in ocg[hl:apinat.layerIn:]: j = None for j, liot in enumerate(ocg[li:apinat.ontologyTerms:]): pass if j is None: for co in ocg[li:apinat.cloneOf:]: for j, liot in enumerate(ocg[co:apinat.ontologyTerms:]): pass if j is None: log.error(('r', li)) pass elif j > 0: log.warning(f'multiple 1 ontology terms {j + 1} {li}') if li is None: other_lookup[hl] = link_lookup[link] = nord.rl(region=ot) else: other_lookup[hl] = link_lookup[link] = nord.rl(region=liot, layer=ot) # til that -liot -> rdflib.NegatedPath if hl is None: log.error(f'conveying lyph for link has no housing lyph??? {cl} {link}') # FIXME axon-chain-gastric-duodenum-neuron-4 has housingLyphs on the chain # and housingLayers on the chain, but those are not being materialized to the # generated lyphs, which they need to be, also wbkg terms are not being lifted # so they are not appearing in the chain housingLyphs, also reordering of these # on the chain during serialization to ttl is another reason why they need to be # materialized down to the generated lyphs link_lookup[link] = nord.rl(region=rdflib.URIRef(f'ERROR-{link}')) if cl is None: log.error(f'something has gone very wrong {link}') link_lookup[link] = nord.rl(region=rdflib.URIRef(f'EXTREME-ERROR-{link}')) neuron_parts = { OntId('SAO:1770195789').u, # axon OntId('SAO:1211023249').u, # dendrite or sensory axon OntId('SAO:280355188').u, # regional part of axon OntId('SAO:420754792').u, # regional part of dendrite _soma, } def filter_for_neuron_parts(link): for cl in ocg[link:apinat.conveyingLyph:]: for ot in ocg[cl:apinat.ontologyTerms:]: if ot in neuron_parts: return ot for iot in ocg[cl:apinat.inheritedOntologyTerms:]: if iot in neuron_parts: return iot _l_adj = soma_edges + ancsl + anext l_adj = [(a, b) for (a, b) in _l_adj if filter_for_neuron_parts(a) or filter_for_neuron_parts(b)] l_skipped = set(_l_adj) - set(l_adj) # TODO review to make sure there aren't lurking issues torep = set(e for es in l_adj for e in es) _ = [link_to_ont_region_layer(link) for link in torep] l_nst = nord.adj_to_nst(l_adj) # this is the ord over the links we may have to do this first and then replace because different neurons may converge, or do this first, get the distinct graphs per neuron, separate, replace and then h_adj -> h_nst l_split_nst = l_nst[1:] l_split_adj = [nord.nst_to_adj(n) for n in l_split_nst] h_split_adj = [[(link_lookup[a], link_lookup[b]) for a, b in s] for s in l_split_adj] h_split_nst = [nord.adj_to_nst(a) for a in h_split_adj] # h_adj = [(link_lookup[a], link_lookup[b]) for a, b in l_adj] # can't do this only produces 52 distinct graphs # h_nst = nord.adj_to_nst(h_adj) nrns = [set(e for es in al for e in es) for al in l_split_adj] def get_seed_neuron(ns): key = ns & set(seeds) assert len(key) <= 1, len(key) if key: return seeds[next(iter(key))] nrn_index = [get_seed_neuron(ns) for ns in nrns] results = list(zip(nrn_index, h_split_nst)) # TODO export to partial orders file for comparison # a quick spot check suggests that there are issues #+end_src ***** ApiNATOMY #+begin_src bash ~/git/sparc-curation/docs/apinatomy.org --all #+end_src deploy to remote #+begin_src bash apinat-build --deploy # XXX manual and not fully implemented #+end_src For new ApiNATOMY models also update [[../resources/scigraph/sparc-data.ttl]]. ***** NIF-Ontology dev branch Use [[~/git/prrequaestor/prcl.lisp]] with [[~/ni/sparc/sync-specs.lisp]]. =prcl.lisp= is now published, still figuring out how to put all the cross-cutting processes that I oversee in one place, =sync-specs.lisp= will live wherever that is. #+begin_src bash pushd ~/git/prrequaestor ./prcl-build.lisp bin/prcl --specs ~/ni/sparc/sync-specs.lisp --fun nif-sct bin/prcl --specs ~/ni/sparc/sync-specs.lisp --fun nif-scr bin/prcl --specs ~/ni/sparc/sync-specs.lisp --fun nif-slim #+end_src For now manually pull the changes to the working repo. **** Blazegraph and SciGraph builds # SIGH set -e vs putting things in functions ... bash why are you so bad #+name: build-sckan-release #+begin_src bash set -e ### make sure all code is up-to-date # scigraph/README.org sh ~/git/pyontutils/nifstd/scigraph/README.org tangle ### release artifacts # TODO prepare all ontology files first up here # TODO add success steps for each of these probably? # release.org to retrieve ~/git/sparc-curation/docs/release.org build --sckan --no-blaze --no-load # XXX add --no-annos if ~/.ssh/config doesn't have cassava-sparc # we have a circular dependency issue here, which is that # we want to use the apinatomy models from blazegraph for scigraph # but we also want to use the configured NIF-Ontology repo from # scigraph for blazegraph, too many layers ... ~/git/pyontutils/nifstd/scigraph/bin/run-load-graph-sparc-sckan # if you are iterating hard on apinat model development, this is the point # at which you usually want to deploy to dev scigraph and test # release.org ~/git/sparc-curation/docs/release.org build --sckan --no-blaze --resume # XXX add --no-annos if ~/.ssh/config doesn't have cassava-sparc ### docker image build # source.org build docker images ~/git/dockerfiles/source.org build-image run:sckan-build-save-image ### testing phase 1, probably move to docker # XXX manual echo manual step waiting for signal from human to start next automated portion #+end_src **** Build docker images This has now been integrated into ref:build-sckan-release by using [[../../dockerfiles/source.org]] to run the block(s) directly. #+begin_src bash ~/git/dockerfiles/source.org build-image run:sckan-build-save-image #+end_src ***** old :noexport: # TODO we can absolutely automate this, but bash is a bad top level to # try to express what needs to be done wrt nowebbing blocks in from # other files using the library of babel and also setting the # directory (pushd popd likely ok) run the docker build blocks in [[../../dockerfiles/source.org]] for the sckan release that do all the copying of the build state to the requisite locations order is bottom to top (maybe use ob-lob) [[../../dockerfiles/source.org::&musl-build-sckan-base]] [[../../dockerfiles/source.org::&musl-build-sckan-services]] # FIXME we should be able to defer dumping the images until after # testing however the current docker build process expects the # temporary build dirs to still exist, so we can't clean them up and # can't run the dump independently, which is bad, need two paths, one # for if the build directory exists, another for if it does not, but # and artifact has been specified dump the docker image for the zenodo release [[../../dockerfiles/source.org::&sckan-save-image]] **** Initial testing Deploy to dev scigraph. #+begin_src bash ~/ni/dev/bin/run-deploy-graph-selene #+end_src Deploy to local dev blazegraph. [[./release.org::*Deploy journal and prefixes to local server]] critical checks: 1. run the [[file:queries.org::#apinat-dashboard][ApiNATOMY dashboard]] queries and inspect the numbers 2. check files in data folder for empty classes e.g. via =grep 'a owl:Class \.$'= (can do this before scigraph load) 3. check prov-record matches expected 4. future run the [[file:queries.org::#npo-dashboard][NPO dashboard]] **** Archive release artifacts #+begin_src bash :results drawer # archive all publication artifacts at one of the usual locations e.g. ~/nas/data # FIXME hardcoded paths # FIXME need to set -e on ALL of these blocks I think? _archive=~/"nas/data/" _sckanrz="$(ls -d /tmp/build/release-*-sckan.zip | sort -u | tail -n 1)" _sckansz=/tmp/scigraph-build/sparc-sckan/$(readlink /tmp/scigraph-build/sparc-sckan/LATEST) _sckandz="$(ls -d /tmp/docker-sckan-data-*Z.tar.gz | sort -u | tail -n 1)" declare -a _arts=(${_sckanrz} ${_sckansz} ${_sckandz}) for _art in "${_arts[@]}"; do echo $(basename ${_art}) rsync -a ${_art} ${_archive} done #+end_src **** Deploy to staging #+begin_src bash ### deploy to sparc-scigraph # XXX NOTE we NEVER deploy to sckan-scigraph directly always via # sparc-scigraph and then once that passes we do the release ~/git/pyontutils/nifstd/scigraph/bin/run-deploy-graph-sparc-sckan ### update services as needed # ~/git/pyontutils/nifstd/scigraph/bin/run-deploy-services-sparc #+end_src **** Test staging Use mapknowledge to test over all neurons where all neurons is the list from some other query #+begin_src elisp (defun advise--obe-python-path (command &rest args) (let* ((params (cadr args)) (_ (message "%s" params)) (python-path (cdr (assq :python-path params))) (process-environment (or (and python-path (cons (format "PYTHONPATH=%s" python-path) process-environment)) process-environment))) (apply command args))) (advice-add #'org-babel-execute:python :around #'advise--obe-python-path) #+end_src # #+header: :python "/usr/bin/urxvt -e /usr/bin/python -- " # #+header: :epilogue "\nreturn result" #+begin_src python :noweb yes :python-path (expand-file-name "~/git/NOFORK/map-knowledge:") #import sys #return [[p] for p in sys.path] import random from mapknowledge import KnowledgeStore from mapknowledge.scicrunch import SCICRUNCH_PRODUCTION, SCICRUNCH_STAGING, SciCrunch from pyontutils.scigraph_codegen import moduleDirect from pyontutils.config import auth from sparcur.utils import log def test_prod(curie): return test_endpoint(curie, SCICRUNCH_PRODUCTION) def test_stage(curie): return test_endpoint(curie, SCICRUNCH_STAGING) def test_endpoint(curie, endpoint): store = KnowledgeStore(scicrunch_release=endpoint) store._KnowledgeStore__scicrunch._SciCrunch__scicrunch_key = auth.get('scigraph-api-key') # XXX SSIIIIIGH try: wat, *rest = curie.rsplit('/', 1) result = store.entity_knowledge(wat) log.info(result) finally: store.close() # skip = ('id', 'label', 'long-label', 'phenotypes', 'references') out = {k:v for k, v in result.items() if v and k not in skip} return out # !?!??!?!!?! apparently a newline before this breaks everything !??! def testn(curie): p = test_prod(curie) s = test_stage(curie) # XXX probably not quite right given ps and s values if p and not s: return False, ('removed', p, s) elif not s: return False, ('missing', p, s) if s and not p: log.info(f'new neuron {curie}') # return True, ('ok', p, s) def testnall(curies): bads = [] for curie in curies: try: ok, data = testn(curie) except Exception as e: ok, data = False, ('error', e, e) if not ok: bads.append((curie, data)) if bads: log.error(bads) # return bads def dewit(endpoint): base = f'https://scicrunch.org/api/1/{endpoint}' # XXX no trailing path scigraphd = moduleDirect(base, 'scigraphd') scigraphd.restService._api_key = auth.get('scigraph-api-key') sgc = scigraphd.Cypher() #sgd = scigraphd.Dynamic() # XXX FIXME all pops no in the dynamic endpoints atm curies_raw = sgc.execute( # XXX must use `org-babel-lob-ingest' on queries.org for this to work """ OPTIONAL MATCH (start:Ontology) <-[:isDefinedBy]-(graph:NamedIndividual) -[:type]->({iri: "https://apinatomy.org/uris/elements/Graph"}) , (start) <-[:isDefinedBy]-(external:Class) -[:subClassOf*]->(:Class {iri: "http://uri.interlex.org/tgbugs/uris/readable/NeuronEBM"}) return external """, limit=99999, output='application/json') #curies_raw = sgd curies = [c['id'] for c in curies_raw['nodes']] #breakpoint() count = 2 sample_size = 15 bads = [] for n in range(count): # randomly sample the subset to test test_set = random.sample(curies, sample_size) _bads = testnall(test_set) bads.extend(_bads) return bads result = dewit(SCICRUNCH_STAGING) #+end_src Future: similar for NPO **** GitHub release Create a new pre-release at https://github.com/SciCrunch/NIF-Ontology/releases Tag ymd should match the results of the release version query from the embedded provenance record. tag: =sckan-%Y-%m-$d= target: =dev= title: =sckan-%Y-%m-$d= Upload the blazegraph, scigraph, and docker archives. Add link to changelog entry. **** Push docker images #+begin_src bash # push docker images docker push tgbugs/sckan:$(docker image ls tgbugs/sckan:base-*Z | sort | tail -n 1 | awk '{ print $2 }') docker push tgbugs/sckan:$(docker image ls tgbugs/sckan:data-*Z | sort | tail -n 1 | awk '{ print $2 }') docker push tgbugs/sckan:base-latest docker push tgbugs/sckan:latest #+end_src **** Test docker images Most of the testing at this stage is of the functionality in tgbugs/musl:kg-release-user so it is ok to push the data images first. #+begin_src bash ### testing echo manual step waiting for signal from human to start next automated portion #+end_src TODO automate. Create the new docker container volume for sckan data run it with the latest version of tgbugs/musl:kg-release-user run all the examples etc. Use the shebang block in queries.org or similar to execute all the blocks and check to make sure they are working out as expected. Better yet if we can write some internal consistency checks i.e. between NPO and ApiNATOMY. **** Update changelog #+begin_src bash ### publication echo manual step waiting for signal from human to start next automated portion #+end_src #+begin_src bash ### changelog # XXX manual #+end_src One way to generate a changelog, not the best, but possible. #+begin_src bash pushd ~/git/apinatomy-models git diff $(the last commit before the previous release)..HEAD -- models pushd ~/git/NIF-Ontology git diff $(the last commit before the previous release)..HEAD -- *.ttl ttl #+end_src **** Zenodo release #+begin_src bash # prepare the new zenodo release # deploy scigraph image to production #+end_src **** Promote to production Once all tests have passed and we receive the ok from MAP we promote sparc-scigraph to sckan-scigraph. 1. back up existing prod services.yaml 2. copy data from aws-scigraph-scigraph to aws-scigraph-sckan-scigraph 1. alternately curl from github release or rsync from internal stores, the issue is the release id is not predictable right now 3. copy services.yaml (and the raw) from aws-scigraph-scigraph to aws-scigraph-sckan-scigraph 4. unzip data 5. stop service 6. unlink/ln -s 7. start service #+begin_src bash :tangle ../bin/run-promote-both-sckan-prod :shebang "#!/usr/bin/env bash" echo "not running to avoid accidental deployment" echo "if you want to do this comment out these lines and the exit line" exit 1 source "$(eval echo ~/git/pyontutils/nifstd/scigraph/bin/scigraph-functions.sh)" host_stage=aws-scigraph-scigraph host_prod=aws-scigraph-sckan-scigraph host_prod_sudo=aws-scigraph-sckan path_stamped="$(ssh ${host_stage} "realpath graph")" path_zip="${path_stamped}.zip" path_yaml="$(ssh ${host_stage} "realpath services.yaml")" path_yaml_raw="$(ssh ${host_stage} 'realpath $(head -n 1 services.yaml | cut -d" " -f 2)')" __TEST='echo ${USER}@${HOSTNAME} ' __TEST='' # prod backup existing services ssh ${host_prod} "${__TEST}"'cp services.yaml services.yaml-'"$(date +%s)" || exit $? # stage>prod data and services # XXX no rsync because the service users should not have ssh access to anything # FIXME the egress on this is stupid, ideally run this whole script from within aws paths=("${path_zip}" "${path_yaml}" "${path_yaml_raw}") for path in "${paths[@]}"; do ssh ${host_stage} "${__TEST}"'sha256sum '"${path}"; # XXX FIXME BEWARE cannot test the pipe and redirection easily ssh ${host_stage} "${__TEST}"'cat '"${path}" |\ ssh ${host_prod} 'cat - > '"${path}"; ssh ${host_prod} "${__TEST}"'sha256sum '"${path}"; done # prod unzip ssh ${host_prod} "${__TEST}"'unzip -n '"${path_zip}" || exit $? # prod stop relink start ssh -t ${host_prod_sudo} "$(typeset -f service-manager);"\ "${__TEST}"'service-manager scigraph stop &&'\ "${__TEST}"'sudo unlink /var/lib/scigraph/graph;'\ "${__TEST}"'sudo ln -s '"${path_stamped}"' /var/lib/scigraph/graph;'\ "${__TEST}"'service-manager scigraph start' #+end_src * Internal Structure :PROPERTIES: :header-args:python: :comments link :exports code :mkdirp yes :header-args:python+: :shebang "#!/usr/bin/env python3" :END: ** Pipelines Easier to read, harder to debug. The python paradox. *** _*Retrieve*_ **** _Protocols_ Cache annotations. See [[(bash-pipeline-fetch-annotations)]] for usage. #+name: fetch_annotations.py #+begin_src python :tangle ../sparcur/simple/fetch_annotations.py :mkdirp yes from pathlib import Path from hyputils import hypothesis as hyp from sparcur.config import auth def from_group_name_fetch_annotations(group_name): """ pull hypothesis annotations from remote to local """ group_id = auth.user_config.secrets('hypothesis', 'group', group_name) cache_file = Path(hyp.group_to_memfile(group_id + 'sparcur')) get_annos = hyp.Memoizer(cache_file, group=group_id) get_annos.api_token = auth.get('hypothesis-api-key') # FIXME ? annos = get_annos() return cache_file # needed for next phase, annos are not def from_group_name_cached_annos(group_name): group_id = auth.user_config.secrets('hypothesis', 'group', group_name) cache_file = Path(hyp.group_to_memfile(group_id + 'sparcur')) get_annos = hyp.AnnoReader(cache_file, group=group_id) annos = get_annos() return annos def from_user_name_group_name_h(user_name, group_name): group_id = auth.user_config.secrets('hypothesis', 'group', group_name) h = hyp.HypothesisUtils(username=user_name, group=group_id) h.token = auth.user_config.secrets('hypothesis', 'api', user_name) return h def main(hypothesis_group_name=None, **kwargs): if hypothesis_group_name is None: hypothesis_group_name = 'sparc-curation' from_group_name_fetch_annotations(hypothesis_group_name) if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main) #+end_src Temporary location for some helper code to clone protcur annos to a new group. #+begin_src python from sparcur.simple.fetch_annotations import ( from_group_name_fetch_annotations, from_group_name_cached_annos, from_user_name_group_name_h) from protcur import document as ptcdoc #def main(): annos = from_group_name_cached_annos('sparc-curation') bannos = [ptcdoc.Annotation(a) for a in annos] pool = ptcdoc.Pool(bannos) anno_counts = ptcdoc.AnnoCounts(pool) if False: # sort debug ta = pool.topological_annos def asdf(ta): hrm = [a.id for a in ta] de = [(i, [(hrm.index(pid) if pid in hrm else 'oops') for pid in a.text_parent_ids]) for i, a in enumerate(ta) if a.text_parent_ids] qq = [(i, l) for i, l in de if [_ for _ in l if _ != 'oops' and i < _]] sigh = sorted(set([x for i, l in qq for x in [i] + l if x != 'oops'])) return qq, sigh qq, sigh = asdf(ta) bads = [ta[f] for f in sigh] sbads = ptcdoc.toposort(bads) qq2, sigh2 = asdf(sbads) group_name = 'fixed-sparc-curation' cache_file = from_group_name_fetch_annotations(group_name) html_annos = from_group_name_cached_annos(group_name) html_bannos = [ptcdoc.Annotation(a) for a in html_annos] #html_bannos = [] html_pool = ptcdoc.Pool(html_bannos) h_p_f = from_user_name_group_name_h('protbot', group_name) #pool.clone_to(html_pool, h_p_f) test = False if test: test_bannos = list(pool.bySlugTail('wzuff6w')) # published with most annos #test_bannos = list(pool.bySlugTail('ba8hiht6')) # published with low number of annos test_pool = ptcdoc.Pool(test_bannos) test_pool.clone_to(html_pool, h_p_f, test=False) else: pool.clone_to(html_pool, h_p_f, test=test) [a._row for a in html_pool._annos] # [h_p_f.delete_annotation(a.id) for a in html_pool._annos] if __name__ == '__main__': main() #+end_src **** _Datasets_ ***** Clone This is an example of how to clone the top level of a project. See ref:utils.py for a good way to instantiate =RemotePath=. #+name: clone.py #+begin_src python :tangle ../sparcur/simple/clone.py from pathlib import Path # clone top level def from_path_id_and_backend_project_top_level(parent_path, project_id, RemotePath, symlink_objects_to=None,): """ given the enclosing path to clone to, the project_id, and a fully configured (with Local and Cache) backend remote path, anchor the project pointed to by project_id along with the first level of children """ project_path = _from_path_id_remote_project(parent_path, project_id, RemotePath, symlink_objects_to) return _from_project_path_top_level(project_path) def from_path_project_backend_id_dataset(parent_path, project_id, dataset_id, RemotePath, symlink_objects_to=None,): project_path = _from_path_id_remote_project(parent_path, project_id, RemotePath, symlink_objects_to) return _from_project_path_id_dataset(project_path, dataset_id) def _from_path_id_remote_project(parent_path, project_id, RemotePath, symlink_objects_to): RemotePath.init(project_id) # calling init is required to bind RemotePath._api anchor = RemotePath.smartAnchor(parent_path) anchor.local_data_dir_init(symlink_objects_to=symlink_objects_to) project_path = anchor.local return project_path def _from_project_path_top_level(project_path): """ given a project path with existing cached metadata pull the top level children WARNING: be VERY careful about using this because it does not gurantee that rmeta is available to mark sparse datasets. It may be the case that the process will fail if the rmeta is missing, or it may not. Until we are clear on the behavior this warning will stay in place. """ # this is a separate function in case the previous step fails # which is also why it is hidden, it makes too many assuptions # to be used by itself anchor = project_path.cache list(anchor.children) # this fetchs data from the remote path to the local path return project_path # returned instead of anchor & children because it is needed by next phase def _from_project_path_id_dataset(project_path, dataset_id): anchor = project_path.cache remote = anchor._remote_class(dataset_id) cache = anchor / remote return cache.local def main(parent_path=None, project_id=None, parent_parent_path=Path.cwd(), project_id_auth_var='remote-organization', # FIXME move default to clifun symlink_objects_to=None, id=None, dataset_id=None, ,**kwargs): """ clone a project into a random subfolder of the current folder or specify the parent path to clone into """ from sparcur.simple.utils import backend_pennsieve if parent_path is None: breakpoint() # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX FIXME import tempfile parent_path = Path(tempfile.mkdtemp(dir=parent_parent_path)) if project_id is None: from sparcur.config import auth from sparcur.utils import PennsieveId project_id = auth.get(project_id_auth_var) project_id = PennsieveId(project_id) # FIXME abstract the hardcoded backend RemotePath = backend_pennsieve() if id and dataset_id: # FIXME doesn't check for existing so if the name changes we get duped folders # this issue possibly upstream in retrieve, clone just clones whatever you tell # it to clone, but maybe it should check the existing metadata and fail or warn? dataset_path = from_path_project_backend_id_dataset( parent_path, project_id, id, # FIXME multiple datasets RemotePath, symlink_objects_to,) return dataset_path project_path = from_path_id_and_backend_project_top_level( parent_path, project_id, RemotePath, symlink_objects_to,) return project_path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print) #+end_src ***** Remote metadata Remote metadata must be retrieved prior to the first pull in order to ensure that large datasets can be marked as sparse datasets before they are pulled. ****** From id Remote metadata can be retrieved using only a project_id. However, for all retrieval after the first pull it is usually more effective to retrieve it at the same time as fetching metadata files since it runs in parallel per dataset. See [[(bash-pipeline-fetch-remote-metadata-all)]] for usage. #+name: fetch_remote_metadata_all.py #+begin_src python :tangle ../sparcur/simple/fetch_remote_metadata_all.py from joblib import Parallel, delayed from sparcur.backends import PennsieveDatasetData from sparcur.simple.utils import backend_pennsieve def from_id_fetch_remote_metadata(id, project_id=None, n_jobs=12): """ given an dataset id fetch its associated dataset metadata """ if id.type == 'organization': RemotePath = backend_pennsieve() project = RemotePath(id) prepared = [PennsieveDatasetData(r) for r in project.children] if n_jobs <= 1: [p() for p in prepared] else: # FIXME Paralle isn't really parallel here ... # can't use multiprocessing due to broken aug.RemotePath implementation # LOL PYTHON everything is an object, except when you want to pickle it # then some objects are more equal than others Parallel(n_jobs=n_jobs)(delayed(p._no_return)() for p in prepared) elif id.type == 'dataset': RemotePath = backend_pennsieve(project_id) dataset = RemotePath(id) bdd = PennsieveDatasetData(dataset) bdd() else: raise NotImplementedError(id) def main(id=None, project_id=None, project_id_auth_var='remote-organization', # FIXME move to clifun all_projects=False, n_jobs=12, ,**kwargs): if all_projects: from sparcur.utils import PennsieveId as RemoteId from sparcur.config import auth project_ids = [ RemoteId(p) for p in auth.get_list('remote-organizations')] for project_id in project_ids: main(id=project_id, project_id=project_id, all_projects=False, n_jobs=n_jobs, **kwargs) return if project_id is None: if id is not None: msg = 'id must be passed with project id' raise TypeError(msg) from sparcur.utils import PennsieveId as RemoteId from sparcur.config import auth project_id = auth.get(project_id_auth_var) project_id = RemoteId(project_id) # FIXME abstract the hardcoded backend if id is None: id = project_id from_id_fetch_remote_metadata(id, project_id=project_id, n_jobs=n_jobs,) if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main) # nothing to print or do after #+end_src ****** From path :PROPERTIES: :CUSTOM_ID: fetch-remote-metadata :END: The implementation of =sparcur.backends.PennsieveDatasetData= supports the ability to retrieve metadata directly from the remote without the need for an intervening local path. However this functionality is obscured here because we want to derive a consistent view of the data from the file system snapshot. # NOTE check where this is used, currently it is only used in simple.pull.main # which will trigger only if pull is called without any other arguments and not # in an existing project which is not a good way to do any of this # XXXXXXX This is mostly useless, fetch_remote_metadata_all works way better # because it can simply accpet a dataset id #+name: fetch_remote_metadata.py #+begin_src python :tangle ../sparcur/simple/fetch_remote_metadata.py from joblib import Parallel, delayed from sparcur.paths import Path from sparcur.backends import PennsieveDatasetData def _from_project_path_fetch_remote_metadata(project_path, n_jobs=12, cached_ok=False): if n_jobs <= 1: prepared = [PennsieveDatasetData(dataset_path.cache) for dataset_path in project_path.children] [bdd() for bdd in prepared if not (cached_ok and bdd.cache_path.exists())] else: fetch = lambda bdd: bdd() if not (cached_ok and bdd.cache_path.exists()) else None fetch_path = (lambda path: fetch(PennsieveDatasetData(path.cache))) Parallel(n_jobs=n_jobs)(delayed(fetch_path)(dataset_path) for dataset_path in project_path.children) # fetch remote metadata def from_path_fetch_remote_metadata(path, n_jobs=12, cached_ok=False): """ Given a path fetch remote metadata associated with that path. """ cache = path.cache if cache.is_organization(): _from_project_path_fetch_remote_metadata(path, n_jobs=n_jobs, cached_ok=cached_ok) else: # dataset_path # TODO more granular rather than roll up to dataset if inside? bdd = PennsieveDatasetData(cache) if not (cached_ok and bdd.cache_path.exists()): bdd() def main(path=Path.cwd(), n_jobs=12, rmeta_cached_ok=False, **kwargs): if path is None or path.find_cache_root() not in (path, *path.parents): from sparcur.simple.clone import main as clone path = clone(path=path, n_jobs=n_jobs, **kwargs) # NOTE path is passed along here, but kwargs is expected to contain # parent_path or parent_parent_path and project_id note that if that # happens then the path returned from clone will change accordingly from_path_fetch_remote_metadata(path, n_jobs=n_jobs, cached_ok=rmeta_cached_ok) return path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main) # we probably don't print here? #+end_src ***** Pull Pull a single dataset or pull all datasets or clone and pull all datasets. #+name: pull.py #+begin_src python :tangle ../sparcur/simple/pull.py from joblib import Parallel, delayed from sparcur.paths import Path from sparcur.utils import GetTimeNow # pull dataset def from_path_dataset_file_structure(path, time_now=None, exclude_uploaded=False): """ pull the file structure and file system metadata for a single dataset right now only works from a dataset path """ if time_now is None: time_now = GetTimeNow() path._pull_dataset(time_now, exclude_uploaded) # pull all in parallel def from_path_dataset_file_structure_all(project_path, ,*args, paths=None, time_now=None, n_jobs=12, exclude_uploaded=False): """ pull all of the file structure and file system metadata for a project paths is a keyword argument that accepts a list/tuple of the subset of paths that should be pulled """ if time_now is None: time_now = GetTimeNow() project_path.pull( paths=paths, time_now=time_now, # TODO debug=False, # TODO n_jobs=n_jobs, log_level='DEBUG' if False else 'INFO', # TODO Parallel=Parallel, delayed=delayed, exclude_uploaded=exclude_uploaded,) # mark datasets as sparse def sparse_materialize(path, sparse_limit:int=None): """ given a path mark it as sparse if it is a dataset and beyond the sparse limit """ cache = path.cache if cache.is_organization(): # don't iterate over cache children because that pulls remote data for child in path.children: sparse_materialize(child, sparse_limit=sparse_limit) else: cache._sparse_materialize(sparse_limit=sparse_limit) def main(path=Path.cwd(), time_now=None, sparse_limit:int=None, n_jobs=12, exclude_uploaded=False, no_index=False, ,**kwargs): if path != path.resolve(): raise ValueError(f'Path not resolved! {path}') project_path = None # path could be None so can't find_cache_root here if path is None or path.find_cache_root() not in (path, *path.parents): from sparcur.simple.fetch_remote_metadata import main as remote_metadata project_path = remote_metadata(path=path, **kwargs) # transitively calls clone else: project_path = path.find_cache_root() if path != project_path: # dataset_path case sparse_materialize(path, sparse_limit=sparse_limit) from_path_dataset_file_structure(path, time_now=time_now, exclude_uploaded=exclude_uploaded) if path == Path.cwd(): print('NOTE: you probably need to run `pushd ~/ && popd` ' 'to get a sane view of the filesystem if you ran this' 'from within a dataset folder') return path if not list(project_path.children): raise FileNotFoundError(f'{project_path} has no children.') # somehow clone failed # WARNING if rmeta failed you may get weirdness # FIXME from sparcur.simple.clone import _from_project_path_top_level _from_project_path_top_level(project_path) if no_index: Path._noindex = True sparse_materialize(project_path, sparse_limit=sparse_limit) from_path_dataset_file_structure_all(project_path, time_now=time_now, n_jobs=n_jobs, exclude_uploaded=exclude_uploaded) return project_path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print) #+end_src ****** Internal implementation The non-simple implementation of this is quite convoluted so here are links to the current implementation, from outside in. In reverse order the basic steps are pull from dataset packages endpoint, resolve hierarchy, convert to remote paths, and covert to cache paths which materialize the pull as folders or symlinks. 1. [[https://github.com/SciCrunch/sparc-curation/blob/0661c3ebd164d0beaa6c57382a459ffd28d9dc92/sparcur/paths.py#L941-L996][entry point to pull]] 1. [[https://github.com/SciCrunch/sparc-curation/blob/0661c3ebd164d0beaa6c57382a459ffd28d9dc92/sparcur/paths.py#L973][the call in pull that actually retrieves data]] 2. [[https://github.com/SciCrunch/sparc-curation/blob/0661c3ebd164d0beaa6c57382a459ffd28d9dc92/sparcur/backends.py#L696-L798][the implementation of rchildren for Pennsieve]] 3. [[https://github.com/SciCrunch/sparc-curation/blob/0661c3ebd164d0beaa6c57382a459ffd28d9dc92/sparcur/backends.py#L777-L792][looping over packages to covert them to paths]] 4. [[https://github.com/SciCrunch/sparc-curation/blob/0661c3ebd164d0beaa6c57382a459ffd28d9dc92/sparcur/monkey.py#L115-L349][transform dataset packages endpoint json into Pennsieve api objects]] ***** Fetch #+caption: NOTE this block is unused! #+begin_src python :tangle ../sparcur/simple/fetch.py :exports none from sparcur.paths import Path from sparcur.simple.fetch_metadata_files import main as files from sparcur.simple.fetch_remote_metadata import main as rmeta def main(path=Path.cwd(), **kwargs): if path is None or not path.find_cache_root() in (path, *path.parents): from sparcur.simple.pull import main as pull path = pull(path=path, n_jobs=n_jobs, **kwargs) # FIXME these can be run in parallel # python is not its own best glue code ... rmeta(path=path) files(path=path) return path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print) #+end_src ****** Individual file #+name: fetch_file.py #+begin_src python :tangle ../sparcur/simple/fetch_file.py def main(path=None, **kwargs): if path is not None: # FIXME this will fail if the remote for the file is not in # the current project, or if the cachedir is not a child of # the top level project directory e.g. in .operations/objects cache = path.cache cache.fetch(size_limit_mb=None) if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main) #+end_src ****** Metadata files # ugh I gave myself the name in a loop variable colliding with # name at higher level of indentation still in a loop bug, so # totally will overwrite the name and cause madness to ensue #+name: fetch_metadata_files.py #+begin_src python :tangle ../sparcur/simple/fetch_metadata_files.py from itertools import chain from sparcur import exceptions as exc from sparcur.utils import log, logd from sparcur.paths import Path from sparcur.datasets import DatasetStructure from sparcur.simple.utils import fetch_paths_parallel, rglob # fetch metadata files fetch_prefixes = ( ('dataset_description', 'glob'), ('subjects', 'glob'), ('samples', 'glob'), ('sites', 'glob'), ('performances', 'glob'), ('submission', 'glob'), ('manifest', 'rglob'), # XXX NOTE the rglob here ) def _from_path_fetch_metadata_files_simple(path, fetch=True): """ transitive yield paths to all metadata files, fetch them from the remote if fetch == True """ for glob_prefix, glob_type in fetch_prefixes: if glob_type == 'rglob': gp0 = glob_prefix[0] pattern = f'[{gp0.upper()}{gp0}]{glob_prefix[1:]}*' yield from rglob(path, pattern) continue ds = DatasetStructure(path) for path_to_metadata in ds._abstracted_paths(glob_prefix, glob_type=glob_type, fetch=fetch): # FIXME fetch here is broken yield path_to_metadata def _from_path_fetch_metadata_files_parallel(path, n_jobs=12): """ Fetch all metadata files within the current path in parallel. """ paths_to_fetch = _from_path_fetch_metadata_files_simple(path, fetch=False) try: first = next(paths_to_fetch) paths_to_fetch = chain((first,), paths_to_fetch) except StopIteration: log.warning('No paths to fetch, did you pull the file system metadata?') return # FIXME passing in a generator here fundamentally limits the whole fetching # process to a single thread because the generator is stuck feeding from a # single process, IF you materialize the paths first then the parallel fetch # can actually run in parallel, but bugs/errors encountered late in collecting # the paths will force all previous work to be redone # XXX as a result of this we use the posix find command to implement rglob # in a way that is orders of magnitude faster paths_to_fetch = list(paths_to_fetch) fetch_paths_parallel(paths_to_fetch, n_jobs=n_jobs) def from_path_fetch_metadata_files(path, n_jobs=12): """ fetch metadata files located within a path """ #if n_jobs <= 1: #gen = _from_path_fetch_metadata_files_simple(path) # FIXME broken ??? somehow abstracted paths doesn't fetch when # we run in directly, or somehow fetch_paths_parallel does something # different #paths = list(gen) #else: _from_path_fetch_metadata_files_parallel(path, n_jobs=n_jobs) def main(path=Path.cwd(), n_jobs=12, **kwargs): if path is None or path.find_cache_root() not in (path, *path.parents): from sparcur.simple.pull import main as pull path = pull(path=path, n_jobs=n_jobs, **kwargs) from_path_fetch_metadata_files(path, n_jobs=n_jobs) return path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print) #+end_src ****** File level metadata extraction Fetch files by extension. #+name: fetch_files.py #+begin_src python :tangle ../sparcur/simple/fetch_files.py import os from functools import wraps from sparcur.paths import Path from sparcur.utils import _find_command, log from sparcur.simple.utils import fetch_paths_parallel def _datasets_with_extension(path, extension): """ Hack around the absurd slowness of python's rglob """ # TODO query multiple extensions with -o at the same time command = fr"""for d in */; do {_find_command} "$d" \( -type l -o -type f \) -name '*.{extension}' \ -exec getfattr -n user.bf.id --only-values "$d" \; -printf '\n' -quit ; done""" with path: with os.popen(command) as p: string = p.read() has_extension = string.split('\n') datasets = [p for p in path.children if p.cache_id in has_extension] return datasets def _from_path_fetch_files_simple(path, filter_function, fetch=True, _ce=False): files = filter_function(path) errors = [] if _ce: def _collect_errors(f, file): @wraps(f) def inner(*args, **kwargs): try: return f(*args, **kwargs) except Exception as e: errors.append((file, e)) return inner else: def _collect_errors(f, file): return f if fetch: # FIXME keep an eye on the gc here [_collect_errors(f.cache.fetch, f)(size_limit_mb=None) for f in files if not f.exists() or f.size == 0 or f.size != f.cache.meta.size] if errors: msg = f'{len(errors)} errors:\n{errors}' log.error(msg) #Async(rate=5)(deferred(f.fetch)(size_limit_mb=None) #for f in files if not f.exists()) return files def _from_path_fetch_files_parallel(path, filter_function, n_jobs=12): paths_to_fetch = _from_path_fetch_files_simple(path, filter_function, fetch=False) fetch_paths_parallel(paths_to_fetch, n_jobs=n_jobs) def filter_extensions(extensions): """ return a function that selects files in a path by extension """ def filter_function(path): cache = path.cache if cache.is_organization(): paths = set() for ext in extensions: ds = _datasets_with_extension(path, ext) paths.update(ds) else: # dataset_path paths = path, # FIXME exclude folders, some people put . in a folder name files = [matching # FIXME stream ? for path in paths for ext in extensions for matching in path.rglob(f'*.{ext}') if not matching.is_dir()] return files return filter_function def filter_manifests(dataset_blob): """ return a function that selects certain files listed in manifest records """ # FIXME this needs a way to handle organization level? # NOTE this filter is used during the second fetch phase after the inital # metadata has been ingested to the point where it can be use to guide further fetches # TODO this is going to require the implementation of partial fetching I think # TODO preprocessing here? def filter_function(path): # TODO check that the path and the dataset blob match cache = path.cache if cache.id != dataset_blob['id']: msg = f'Blob is not for this path! {dataset_blob["id"]} != {cache.id}' raise ValueError(msg) files = [] # TODO get_files_for_secondary_fetch(dataset_blob) return files return filter_function def from_path_fetch_files(path, filter_function, n_jobs=12): if n_jobs <= 1: _from_path_fetch_files_simple(path, filter_function) else: _from_path_fetch_files_parallel(path, filter_function, n_jobs=n_jobs) def main(path=Path.cwd(), n_jobs=12, extensions=('xml',), **kwargs): #breakpoint() # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX if path is None or path.find_cache_root() not in (path, *path.parents): from sparcur.simple.pull import main as pull path = pull(path=path, n_jobs=n_jobs, **kwargs) filter_function = filter_extensions(extensions) # remote init outside async to avoid race condition in boto3 on windows # from sparcur.paths.BFPNCacheBase.data if not hasattr(path.cache._remote_class, '_api'): path.cache._remote_class.anchorToCache(path.cache.anchor) from_path_fetch_files(path, filter_function, n_jobs=n_jobs) return path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main) #+end_src ****** Second fetch Once the initial pass over the dataset is complete extract the list of additional files that need to be retrieved and fetch them. # TODO partial fetch of headers for MBF embedded metadata #+name: fetch_secondary.py #+begin_src python :tangle ../sparcur/simple/fetch_secondary.py from sparcur.paths import Path from sparcur.simple.fetch_files import from_path_fetch_files, filter_manifests def from_blob_fetch_files(dataset_blob, path=None): # should the blob contain a reference to the path # it was derived from? filter_function = filter_manifests(dataset_blob) from_path_fetch_files(path, filter_function, n_jobs=n_jobs) def main(path=Path.cwd(), n_jobs=12, **kwargs): #breakpoint() # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX # if not dataset_blob: get_blob vs path blob pairs? # starting from a partial blob means that this probably # should not kick off from the file system, but we know # that we will want to be able to kick it off from the # file system ... maybe the intermediate blobs can encode # the prov of where the file system reference they were # derived from lives ? dataset_blob = get_blob(path.cache_id) # FIXME TODO from_blob_fetch_files(dataset_blob, path) if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main) #+end_src ****** unused :noexport: #+begin_src python from_id_remote_metadata = lambda id: ds.PennsieveDatasetData(id)() compose = lambda f, g: (lambda *x: f(g(*x))) #from_path_remote_metadata = compose(lambda id: from_id_remote_metadata(id), #lambda path: path.cache.id) #+end_src ***** Retrieve Putting it all together into a single command. The behavior of retrieve works exactly as it does for clone the difference is that it runs for just a single dataset and the parent_path is made to be the dataset_id uuid if you are running a single dataset pipeline you will still need the project folder structure for logs and jobs etc. you can also still run all datasets together off of a single SPARC Consoritum folder, in which case all you need to do is pass the communal parent_path Usage example. #+begin_src bash python -m sparcur.simple.retrieve \ --dataset-id N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18 \ --symlink-objects-to /mnt/str/tom/cache/bf-object-cache #+end_src Example python usage. #+begin_src python from sparcur.paths import Path from sparcur.utils import PennsieveId from sparcur.simple.retrieve import main as retrieve p = PennsieveId('N:organization:618e8dd9-f8d2-4dc4-9abb-c6aaab2e78a0') d = PennsieveId('N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18') ppp = Path('~/files/sparc-datasets').expanduser().resolve() retrieve(id=d, dataset_id=d, project_id=p, parent_parent_path=ppp) #+end_src =sparcur.simple.retrieve= implementation # FIXME xml fetching still broken sporadically #+name: retrieve.py #+begin_src python :tangle ../sparcur/simple/retrieve.py from sparcur.paths import Path from sparcur.utils import symlink_latest from sparcur.simple.clone import main as clone from sparcur.simple.fetch_remote_metadata_all import main as remote_metadata from sparcur.simple.pull import main as pull from sparcur.simple.fetch_metadata_files import main as fetch_metadata_files from sparcur.simple.fetch_files import main as fetch_files def main(id=None, dataset_id=tuple(), parent_path=None, parent_parent_path=Path.cwd(), path=None, # keep path out of kwargs invariant_local_path='dataset', #extensions=('xml',), # not needed right now ,**kwargs): # FIXME parent_path and project_id seem like they probably need to # be passed here, it would be nice if project_path could just be # the current folder and if the xattrs are missing for the # project_id then ... it is somehow inject from somewhere else? # this doesn't really work, because that would mean that we would # have to carry the project id around in the xattr metadata for # all dataset folders, which might not be the worst thing, but # definitely redundant if id is None: raise TypeError('id is a required argument!') if parent_path is None: uuid = id.uuid # FIXME hardcoded backend assumption parent_path = parent_parent_path / uuid parent_path.mkdir(exist_ok=True) elif not parent_path.exists(): parent_path.mkdir() invariant_path = parent_path / invariant_local_path # XXX for now we do these steps in order here # rather than trusting that calling simple.pull.main will do # the right thing if there is no path ... it should but probably # doesn't right now due to assumptions about paths existing # remote metadata from path (could do from id too?) remote_metadata(id=id, **kwargs) # could run parallel to clone, easier in bash # clone single without organization parent somehow seems likely broken? path = clone(id=id, dataset_id=dataset_id, parent_path=parent_path, parent_parent_path=parent_parent_path, ,**kwargs) # XXX symlink_objects_to will just work if you pass it symlink_latest(path, invariant_path) # pull single pull(path=path, **kwargs) # fetch metadata files fetch_metadata_files(path=path, **kwargs) # FIXME symlink_to # XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX fetch_metadata_files does NOT USE the extensions kwarg! # fetch additional files fetch_files(path=path, **kwargs) return path if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print) #+end_src *** _*Validate*_ **** _Protocols_ **** _Datasets_ :PROPERTIES: :header-args: :shebang "#!/usr/bin/env python3" :END: ***** Extract Usage example. #+begin_src bash python -m sparcur.simple.extract \ --dataset-id N:dataset:21957eae-0824-4fb5-b18f-04d6ed12ce18 \ --export-parent-path 21957eae-0824-4fb5-b18f-04d6ed12ce18/exports #+end_src #+name: dataset-extract #+begin_src python :tangle ../sparcur/simple/extract.py from sparcur import datasets as dat from sparcur import pipelines as pipes from sparcur import exceptions as exc from sparcur.utils import log, logd from sparcur.paths import Path from sparcur.backends import PennsieveDatasetData from sparcur.simple.utils import combinate, multiple, early_failure, DataWrapper from sparcur.simple.fetch_metadata_files import fetch_prefixes class ManifestFiles(DataWrapper): """ wrapper for manifest files. """ def merge_manifests(vs): """ Merge individual manifest records into the same list """ # FIXME massive hack :/ k = 'manifest_file' # FIXME errors key ... ? is it allowed up there? it shouldn't be ... # FIXME {'content': m} return ManifestFiles([m for v in vs for m in v.data[k]]) def object_from_find_path(glob_prefix, object_from_path_function, glob_type='glob', onfail=None): """ Return a function that will find files that start with glob_prefix""" # FIXME should be in utils but depends on fetch_prefixes if glob_prefix not in dict(fetch_prefixes): raise ValueError('glob_prefix not in fetch_prefixes! ' f'{glob_prefix!r} not in {fetch_prefixes}') def func(path, *args, **kwargs): ds = dat.DatasetStructure(path) rpath = None for rpath in ds._abstracted_paths(glob_prefix, sandbox=True): yield object_from_path_function(rpath, *args, **kwargs) if rpath is None and onfail is not None: raise onfail(f'No match for {glob_prefix} in {path.name}') return func # file type -> dataset blob key indirection _TOP = object() # SIGH SIGH SIGH always need a escape hatch otkm = {ThingFilePath.obj:prefix + '_file' for prefix, ThingFilePath in dat.DatasetStructure.sections.items()} otkm[ManifestFiles] = 'manifest_file' otkm[PennsieveDatasetData] = 'remote_dataset_metadata' #otkm[type(dat.DatasetStructure(None))] = 'structure' # hack around Pathlib type mangling #otkm[type(dat.DatasetMetadata(None))] = _TOP # stream of objects -> place in dataset blob def dataset_raw(*objects, object_type_key_map=otkm): data = {} log.debug(objects) #path_structure, description, subjects, samples, submission, manifests, *rest = objects for obj in objects: log.debug(obj) key = object_type_key_map[type(obj)] try: if key is not _TOP: data.update({key: obj.data}) else: data.update(obj.data) except Exception as e: # FIXME current things that leak through # MalformedHeaderError # something in the objects list is a dict breakpoint() pass return data # path + version -> python object # TODO how to attach and validate schemas orthogonally in this setting? # e.g. so that we can write dataset_1_0_0 dataset_1_2_3 etc. # we capture version as early as possible in the process, yes we # could also gather all the files and folders and then pass the version # in as an argument when we validate their structure, but there are # elements of the locations or names of those files that might depend # on the template version, therefore we get maximum flexibility by only # need to look for the dataset description file def description(path): return dat.DatasetDescriptionFilePath(path).object def submission(path, version): return dat.SubmissionFilePath(path).object_new(version) def subjects(path, version): return dat.SubjectsFilePath(path).object_new(version) def samples(path, version): return dat.SamplesFilePath(path).object_new(version) def manifest(path, version): return dat.ManifestFilePath(path).object_new(version) # dataset path -> python object def from_path_remote_metadata(path): return PennsieveDatasetData(path.cache) def from_path_local_metadata(path): return dat.DatasetMetadata(path) from_path_dataset_description = object_from_find_path('dataset_description', description, onfail=exc.MissingFileError) comb_metadata = combinate( # dataset description is not included here because it is special # see from_path_dataset_raw for details from_path_remote_metadata, from_path_local_metadata, ) # dataset path + version -> python object def from_path_dataset_path_structure(path, version): return dat.DatasetStructure(path) from_path_subjects = object_from_find_path('subjects', subjects) from_path_samples = object_from_find_path('samples', samples) from_path_submission = object_from_find_path('submission', submission) from_path_manifests = multiple(object_from_find_path('manifest', manifest, 'rglob'), merge_manifests) # combinate all the individual dataset path + version -> data functions comb_dataset = combinate( #from_path_dataset_description, # must be run prior to combination to get version from_path_dataset_path_structure, from_path_subjects, from_path_samples, from_path_submission, from_path_manifests, #from_path_file_metadata, # this must wait until 2nd fetch phase ) # dataset path -> raw data def from_path_dataset_raw(dataset_path): """ Main entry point for getting dataset metadata from a path. """ gen = from_path_dataset_description(dataset_path) try: ddo = dataset_description_object = next(gen) except exc.MissingFileError as e: # TODO return a stub with embedded error logd.critical(e) dataset_blob = dataset_raw(*comb_metadata(dataset_path)) return early_failure(dataset_path, e, dataset_blob) try: next(gen) # TODO return a stub with embedded error except StopIteration: pass data = ddo.data ddod = type('ddod', tuple(), {'data': data}) dtsv = data['template_schema_version'] return dataset_raw(ddo, *comb_metadata(dataset_path), *comb_dataset(dataset_path, dtsv)) # unused def from_path_file_metadata(path, _version): # FIXME doesn't go in this file probably # FIXME this is going to depend on the manifests # and a second fetch step where we kind of cheat # and prefetch file files we know we will need pass def from_export_path_protocols_io_data(curation_export_json_path): pass def protocols_io_ids(datasets): pass def protocols_io_data(protocols_io_ids): pass def from_group_name_protcur(group_name): pass def protcur_output(): pass def summary(datasets, protocols_io_data, protcur_output): pass def from_path_summary(project_path): dataset_path_structure summary(( dataset( dataset_path_structure, dataset_description, subjects, samples, submission, manifests, ,*rest ))) def main(path=Path.cwd(), id=None, dataset_id=tuple(), time_now=None, export_path=None, export_parent_path=None, _entry_point=False, **kwargs): # TODO path from dataset_id and retrieve conventions? or just pass path from retrieve final output? # TODO parallelize if multiple paths # This assumes that all retrieve operations have # finished successfully for the current dataset from sparcur.simple.export import prepare_dataset_export, export_blob if id is not None: # XXX note that id should be dataset_id # TODO parent_path ?? uuid = id.uuid # FIXME hardcoded backend assumption path = path / uuid / 'dataset' # FIXME hardcoded see invariant_path path = path.resolve() # make sure that invariant_path is expanded as we expect. cache = path.cache if not cache.is_dataset(): raise TypeError('can only run on a single dataset') if _entry_point: if export_parent_path is None: export_parent_path = prepare_dataset_export(path, export_path) kwargs.update({'export_path': export_path, 'export_parent_path': export_parent_path, 'time_now': time_now,}) dataset_raw = from_path_dataset_raw(path) if _entry_point: export_blob_path = export_blob(dataset_raw, 'ir.json', **kwargs) return export_blob_path else: return dataset_raw if __name__ == '__main__': import pprint from sparcur.simple.utils import pipe_main pipe_main(main, after=pprint.pprint) #+end_src ***** Transform There is not a clean separation between transformation and validation because there are multiple transformation and validation steps that are interleaved. #+name: dataset-transform #+begin_src python :tangle ../sparcur/simple/transform.py from pathlib import Path from sparcur import schemas as sc from sparcur import pipelines as pipes from sparcur.core import DictTransformer as DT def __apply_step(step, spec, data, **kwargs): return step(data, spec, **kwargs) def popl(data, pops, source_key_optional=False): popped = list(DT.pop(data, pops, source_key_optional)) return data def simple_add(data, adds): pass def execute_pipeline(pipeline, data): for func, *args, kwargs in pipeline: # man variable arity is a pain to deal with here # where are lambda lists when you need them :/ # FIXME maybe we can make steps functional instead of mutating? if not kwargs: kwargs = {} func(data, *args, **kwargs) return data def __wd(transformer): # not needed atm since transformers do in place modification def inner(data, *args, **kwargs): transformer(data, *args, **kwargs) return data def schema_validate(data, schema, fail=False, pipeline_stage_name=None): if isinstance(schema, type): # we were passed a class so init it # doing it this way makes it easier to # use remote schemas that hit the network # since the network call doesn't have to # happen at the top level but can mutate # the class later before we init here schema = schema() ok, norm_or_error, data = schema.validate(data) if not ok: if fail: logd.error('schema validation has failed and fail=True') raise norm_or_error if 'errors' not in data: data['errors'] = [] if pipeline_stage_name is None: pipeline_stage_name = f'Unknown.checked_by.{schema.__class__.__name__}' data['errors'] += norm_or_error.json(pipeline_stage_name) # TODO make sure the step is noted even if the schema is the same simple_moves = ( [['structure', 'dirs',], ['meta', 'dirs']], # FIXME not quite right ... [['structure', 'files',], ['meta', 'files']], [['structure', 'size',], ['meta', 'size']], [['remote_dataset_metadata'], ['inputs', 'remote_dataset_metadata']], ,*pipes.SDSPipeline.moves[3:] ) # function, *args, **kwargs # functions should accept data as the first argument core_pipeline = ( # FIXME validation of dataset_raw is not being done right now (DT.copy, pipes.SDSPipeline.copies, dict(source_key_optional=True)), (DT.move, simple_moves, dict(source_key_optional=True)), # TODO clean has custom behavior (popl, pipes.SDSPipeline.cleans, dict(source_key_optional=True)), (DT.derive, pipes.SDSPipeline.derives, dict(source_key_optional=True)), #(DT.add, pipes.SDSPipeline.adds), # FIXME lifter issues (schema_validate, sc.DatasetOutSchema, None), # extras (missing) # xml files # contributors # submission (DT.copy, pipes.PipelineExtras.copies, dict(source_key_optional=True)), # TODO clean has custom behavior (DT.update, pipes.PipelineExtras.updates, dict(source_key_optional=True)), (DT.derive, pipes.PipelineExtras.derives, dict(source_key_optional=True)), #(DT.add, pipes.PipelineExtras.adds), # TODO and lots of evil custom behavior here # TODO filter_failures (schema_validate, sc.DatasetOutSchema, None), (pipes.PipelineEnd._indexes, None), # FIXME this is conditional and in adds # TODO pipeline end has a bunch of stuff in here (schema_validate, sc.PostSchema, dict(fail=True)), ) def main(path=Path.cwd(), path_dataset_raw=None, dataset_raw=None, **kwargs): # FIXME TODO need to follow the behavior of main in extract if dataset_raw is None: if path_dataset_raw is None: cache = path.cache if not cache.is_dataset(): raise TypeError('can only run on a single dataset') from sparcur.simple.extract import main as extract dataset_raw = extract(path) else: from sparcur.utils import path_ir dataset_raw = path_ir(path_dataset_raw) data = execute_pipeline(core_pipeline, dataset_raw) breakpoint() if __name__ == '__main__': from sparcur.simple.utils import pipe_main pipe_main(main, after=print) #+end_src **** Network resources **** _Dataset Path Metadata_ This is a bit out of order since validation is run after an initial export. #+name: dataset-path-metadata-validate #+begin_src python :tangle ../sparcur/simple/path_metadata_validate.py # FIXME this is not really doing what we want yet from sparcur.paths import Path from sparcur.simple import path_metadata def from_path_validated_json_metadata(path): tm = path_metadata.from_path_transitive_metadata(path) from_blob_validated_json_metadata(tm) return tm def from_blob_validated_json_metadata(blob): """ Mutates in place. """ Path.validate_path_json_metadata(blob) # perferred # accepted # banned # known # unknown def main(_entry_point=False, **kwargs): # FIXME we want to be able to accept # --dataset-id, , and some other things probably? return path_metadata.main(validate=True, _entry_point=_entry_point, **kwargs) if export_path: from_blob_validate_path_json_metadata else: from_path_validate_path_json_metadata(path) if __name__ == '__main__': #import pprint from sparcur.simple.utils import pipe_main pipe_main(main)#, after=pprint.pprint) #+end_src #+name: dataset-path-data #+begin_src python :tangle ../sparcur/simple/path_data.py #+end_src *** _*Export*_ **** _Setup for per-dataset export_ #+name: export.py #+begin_src python :tangle ../sparcur/simple/export.py import json from socket import gethostname import augpathlib as aug from pyontutils.utils import timeformat_friendly import sparcur from sparcur.core import JEncode from sparcur.paths import Path from sparcur.utils import loge, symlink_latest from sparcur.config import auth def prepare_dataset_export(path, export_path=None): # FIXME do we need export_base? if export_path is None: # FIXME confusing and breaks w/ convention -> Options maybe? export_path = Path(auth.get_path('export-path')) # FIXME allow alt? from sparcur.utils import PennsieveId identifier = PennsieveId(path.cache.id) uuid = identifier.uuid # we don't use cache._fs_safe_id here because we know the # identifier type from the folder structure # FIXME dataset metadata export setup basically needs to do all of this first # set latest run and then latest complete at the end, but complete is kind of arbitrary # from the atomic point of view tupdated = path.updated_cache_transitive() # FIXME this causes us to traverse all files twice # XXX TODO I think that the dataset updated date is now transitive as well? though the dataset # updated timestamp seems to happen a bit before the created/updated date on the file itself? # FIXME somehow tupdated can be None !??!?! XXX yes, it happens on empty sparse datasets export_dataset_folder = export_path / 'datasets' / uuid export_parent = export_dataset_folder / timeformat_friendly(tupdated) if not export_parent.exists(): export_parent.mkdir(parents=True) export_latest_run = export_dataset_folder / 'LATEST_RUN' symlink_latest(export_parent, export_latest_run) # FIXME need to symlink to LATEST return export_parent def export_blob(blob, blob_file_name, time_now=None, export_path=None, export_parent_path=None, **kwargs): if export_parent_path is None: # NOTE if export_parent_path is not None then it is up to the user # to make sure that the contents of the dataset directory do not change # resulting in confusion about mismatched transitive update dates export_parent_path = prepare_dataset_export(path, export_path) elif not export_parent_path.exists(): # safe to mkdir here since outside has a record of the path name export_parent_path.mkdir(parents=True) export_blob_path = export_parent_path / blob_file_name add_prov(blob, time_now) with open(export_blob_path, 'wt') as f: json.dump(blob, f, indent=2, cls=JEncode) loge.info(f'path metadata exported to {export_blob_path}') return export_blob_path def add_prov(blob, time_now): """ Mutate blob in place to add prov. """ # FIXME this will klobber cases where prov was embedded by the pipelines blob['prov'] = {'timestamp_export_start': time_now.START_TIMESTAMP, 'export_system_identifier': Path.sysid, 'export_hostname': gethostname(), 'sparcur_version': sparcur.__version__, 'sparcur_internal_version': sparcur.__internal_version__, } rp = aug.RepoPath(sparcur.core.__file__) if rp.working_dir is not None: blob['prov']['sparcur_commit'] = rp.repo.active_branch.commit.hexsha def main(path=Path.cwd(), export_path=None, **kwargs): return prepare_dataset_export(path, export_path=export_path) if __name__ == '__main__': import pprint from sparcur.simple.utils import pipe_main pipe_main(main, after=pprint.pprint) #+end_src **** _Dataset Path Metadata_ :PROPERTIES: :header-args: :shebang "#!/usr/bin/env python3" :END: In principle this could run as part of the dataset metadata export, however since it produces separate files and can run at the same time, it is its own module. Usage example #+begin_src bash find -maxdepth 1 -type d -not -path '.operations*' -not -path '.' -exec python -m sparcur.simple.path_metadata {} \; #+end_src =xargs= variant. #+begin_src bash find -maxdepth 1 -type d -not -path '.operations*' -not -path '.' -print0 | \ xargs -0 -I{} -P8 -r -n 1 python -m sparcur.simple.path_metadata {} #+end_src Alternate example #+begin_src bash python -m sparcur.simple.path_metadata \ 21957eae-0824-4fb5-b18f-04d6ed12ce18/dataset \ --export-parent-path 21957eae-0824-4fb5-b18f-04d6ed12ce18/exports/ #+end_src # : path_metadata.py #+name: dataset-path-metadata-extract #+begin_src python :tangle ../sparcur/simple/path_metadata.py from pathlib import Path def from_path_transitive_metadata(path): tml = path._transitive_metadata() # FIXME TODO handle sparse cases return {'type': 'path-metadata', 'data': tml,} def main(path=Path.cwd(), id=None, time_now=None, parent_path=None, invariant_local_path='dataset', parent_parent_path=Path.cwd(), export_local=False, export_path=None, export_parent_path=None, _entry_point=False, validate=False, **kwargs): from sparcur.paths import Path from sparcur.simple.export import prepare_dataset_export, export_blob if path == Path.cwd() and (id is not None or parent_path is not None): if parent_path is None: uuid = id.uuid parent_path = parent_parent_path / uuid invariant_path = parent_path / invariant_local_path path = invariant_path.expanduser().resolve() else: parent_parent_path = None # TODO path from dataset_id and retrieve conventions? or just pass path from retrieve final output? # TODO parallelize if multiple paths # This assumes that all retrieve operations have # finished successfully for the current dataset # FIXME Options calls resolve on all paths but not if Path.cwd slips through ... path = Path(path) # FIXME even here some paths don't have caches ?! cache = path.cache # XXX this should have errored when Path was applied below !?!?!??! pipe_main wat ??? if not cache.is_dataset(): raise TypeError('can only run on a single dataset') if _entry_point: if export_parent_path is None: export_parent_path = prepare_dataset_export(path, export_path) kwargs.update({'export_path': export_path, 'export_parent_path': export_parent_path, 'time_now': time_now,}) tm = from_path_transitive_metadata(path) # FIXME TODO validate file formats, which means this also needs to support the remoteless case # FIXME TODO process metadata for each timepoint when things enter should go in prov I think # or we need to be collecting prov along the way, we don't have an overseer or conductor # so we can't keep everything embedded # FIXME TODO embed the transitive cache updated value that is used in prepare_dataset_export if validate: # FIXME raw vs validated and FIXME pipeline from sparcur import schemas as sc from sparcur.simple.transform import schema_validate Path.validate_path_json_metadata(tm) schema_validate(tm, sc.PathTransitiveMetadataSchema) if _entry_point: export_blob_path = export_blob(tm, 'path-metadata.json', **kwargs) # FIXME naming for raw vs validated return export_blob_path else: return tm if __name__ == '__main__': #import pprint from sparcur.simple.utils import pipe_main pipe_main(main)#, after=pprint.pprint) #+end_src *** _*Combine*_ This is equivalent to creating objects that match the summary schema for legacy organization level curation-export.json files. FIXME it might be the case that individual datasets were exported under different schemas, which means that there would no longer be a single consistent schema for checking the merge, this is why we sometimes need to rerun all datasets when there is a schema change even if ther is not a schema change it very well may be the case that indvidiual datasets will have been run on different versions of the export pipeline, and slightly different images #+name: combine.py #+begin_src python :tangle ../sparcur/simple/combine.py import json import rdflib from pathlib import Path from dateutil import parser as dateparser from pyontutils.core import OntResPath from pyontutils.utils import TZLOCAL, timeformat_friendly from pyontutils.namespaces import TEMP, rdf, sparc #from sparcur.utils import fromJson, register_all_types # FIXME this should also go in sparcron from sparcur.export.triples import TriplesExportSummary from sparcur.export.published import _merge_graphs from sparcur.simple.export import add_prov tu = 'timestamp_updated' tuc = 'timestamp_updated_contents' ip = 'inputs' rmk = 'remote_dataset_metadata' class TriplesExportSummaryPublished(TriplesExportSummary): @property def ontid(self): return rdflib.URIRef(super().ontid.replace('ontologies/', 'ontologies/published/')) @property def header_label(self): return rdflib.Literal(f'{self.folder_name} curation export published graph') def max_dataset_or_contents_updated(datasets_list): return max(set.union( set([a['meta'][tuc] for a in datasets_list if tuc in a['meta'] and a['meta'][tuc]]), set([a['meta'][tu] for a in datasets_list if tu in a['meta'] and a['meta'][tu]]))) def from_dataset_export_path_snapshot(dataset_export_path, snapshots_path, time_now): derefs = [l.resolve() for l in [c / 'LATEST' for c in dataset_export_path.children] if l.exists()] snapshot_path = snapshots_path / time_now.START_TIMESTAMP_LOCAL_FRIENDLY snapshot_path.mkdir(parents=True) [(snapshot_path / d.parts[-2]).symlink_to((snapshot_path / 'asdf').relative_path_to(d)) for d in derefs] return snapshot_path def from_snapshot_path_datasets_lists(snapshot_path): alld = [] pubd = [] for uuid_path in snapshot_path.children: with open(uuid_path / 'curation-export.json', 'rt') as f: j = json.load(f) # TODO validate the load XXX this should probably # happen as a final roundtrip check during export # TODO filter by organization alld.append(j) if ip in j and rmk in j[ip] and 'id_published' in j[ip][rmk]: pubd.append(j) return alld, pubd def from_snapshot_path_summary_json(snapshot_path, project_id, time_now): l_all, l_pub = from_snapshot_path_datasets_lists(snapshot_path) sum_all = from_datasets_list_summary_json(l_all, project_id, time_now) sum_pub = from_datasets_list_summary_json(l_pub, project_id, time_now) return sum_all, sum_pub def from_snapshot_path_summary_ttl(snapshot_path, project_id, time_now, blob): tes = TriplesExportSummary(blob) tesp = TriplesExportSummaryPublished(blob) orps = [OntResPath(uuid_path / 'curation-export.ttl') for uuid_path in sorted(snapshot_path.children, key=lambda p: p.name)] graphs_all = [o.graph for o in orps] graphs_pub = [ g for g, uripub in [(g, list(g[ds:TEMP.hasUriPublished])) for g in graphs_all for ds in g[:rdf.type:sparc.Dataset]] if uripub] merged_all = _merge_graphs(graphs_all) merged_all.populate_from_triples(tes.triples_header) merged_pub = _merge_graphs(graphs_pub) merged_pub.populate_from_triples(tesp.triples_header) return merged_all, merged_pub, graphs_all, graphs_pub def from_snapshot_path_summary_ttl_BAD(snapshot_path, project_id, time_now, blob): # this variant is too complex, trying to reuse the published graph as the all graph # and the implementation of the OntConjunctiveGraph is not far enough along to do it tes = TriplesExportSummary(blob) # FIXME nasty annoying dep graph_meta = OntGraph() graph_meta.populate_from_triples(tes._triples_header(tes.ontid, time_now._start_time)) rev_replace_pairs = _fix_for_pub(graph_meta, graph_meta) replace_pairs = tuple([(b, a) for a, b in rev_replace_pairs]) orps = [OntResPath(uuid_path / 'curation_export.ttl') for uuid_path in snapshot_path.children] graphs = [o.graph for o in orps] # FIXME should use id_published here as well, but that isn't being # propagated at the moment graphs_pub = [] graphs_nop = [] for g, uripub in [(g, list(g[ds:TEMP.hasUriPublished])) for g in graphs for ds in g[:rdf.type:sparc.Dataset]]: if uripub: graphs_pub.append(g) else: graphs_nop.add(g) graph_pub = _merge_graphs(published_graphs) graph_pub.populate_from(graph_meta) # FIXME this is manually aligned with TriplesExport.triples_header graph_pub.asdf for g in graphs_nop: graph_pub.namespace_manager.populate_from( {k:v for k, v in dict(g.namespace_manager).items() if k not in ('contributor', 'sample', 'subject')}) ttl_all = None ttl_pub = _populate_published(curation_export, graph) def from_dataset_export_path_datasets_lists(dataset_export_path): dep = dataset_export_path alld = [] pubd = [] derefs = [l.resolve() for l in [c / 'LATEST' for c in dep.children] if l.exists()] # TODO consider creating a folder that is just symlinks before this for lp in sorted(derefs, key=lambda p: p.name): with open(lp / 'curation-export.json', 'rt') as f: j = json.load(f) # TODO validate the load XXX this should probably # happen as a final roundtrip check during export # TODO filter by organization alld.append(j) if ip in j and rmk in j[ip] and 'id_published' in j[ip][rmk]: pubd.append(j) return alld, pubd def from_datasets_list_summary_json(datasets_list, project_id, time_now): # XXX FIXME issue with datasets from multiple projects fn = Path(datasets_list[0]['prov']['export_project_path']).name out = { 'id': project_id.id, 'meta': { 'count': len(datasets_list), 'folder_name': fn, # WHAT A RELIEF we don't need network 'uri_api': project_id.uri_api, 'uri_human': project_id.uri_human(), }, 'datasets': datasets_list, } add_prov(out, time_now) # some potential prov summaries, but lets not do that here # most prov stats should be on the single dataset level #'export_timestamp_start_min': min(tes), #'export_timestamp_start_max': max(tes), return out def from_dataset_export_path_summary_json(dataset_export_path, project_id, time_now): l_all, l_pub = from_dataset_export_path_datasets_lists(dataset_export_path) #[a['meta']['timestamp_updated'] < a['meta']['timestamp_updated_contents'] #for a in l_all if a['meta']['timestamp_updated_contents']] sum_all = from_datasets_list_summary_json(l_all, project_id, time_now) sum_pub = from_datasets_list_summary_json(l_pub, project_id, time_now) return sum_all, sum_pub def main(project_id=None, export_path=None, time_now=None, project_id_auth_var='remote-organization', # FIXME move to clifun disco=False, **kwargs): from sparcur.paths import Path from sparcur.config import auth from sparcur.core import OntTerm OntTerm._nofetch = True if project_id is None: from sparcur.config import auth from sparcur.utils import PennsieveId project_id = auth.get(project_id_auth_var) project_id = PennsieveId(project_id) # FIXME abstract the hardcoded backend if export_path is None: # XXX see issues mentioned above export_path = Path(auth.get_path('export-path')) dataset_export_path = export_path / 'datasets' snapshots_path = export_path / 'snapshots' snapshot_path = from_dataset_export_path_snapshot( dataset_export_path, snapshots_path, time_now) sum_all, sum_pub = from_snapshot_path_summary_json( snapshot_path, project_id, time_now) # write symlink LATEST_PARTIAL ttl_all, ttl_pub, graphs_all, graphs_pub = from_snapshot_path_summary_ttl( snapshot_path, project_id, time_now, sum_all) # write symlink LATEST maxdt = max_dataset_or_contents_updated(sum_all['datasets']) dt_maxdt = dateparser.parse(maxdt) dt_maxdt_local = dt_maxdt.astimezone(TZLOCAL()) friendly_maxdt_local = timeformat_friendly(dt_maxdt_local) # FIXME there are some bad assumptions in here that should be refactored out # at some point, but for now we implicitly assume that all datasets come from # the same organization, which can easily be violated because we don't check # however the existing internal schema requires an id for the summary which is # currenty the organization id # FIXME summary is a hardcoded path # XXX WARNING it is possible to overwrite since maxdt might not change between runs # this is desirable behavior for development, but could cause issues in other cases pexpath = export_path / 'summary' / project_id.uuid latest = pexpath / 'LATEST' npath = pexpath / friendly_maxdt_local snapshot_link = npath / 'snapshot' if not npath.exists(): npath.mkdir(parents=True) else: # FIXME not sure if this makes sense? if snapshot_link.is_symlink(): snapshot_link.unlink() snapshot_link.symlink_to(snapshot_link.relative_path_to(snapshot_path)) npath_ce = npath / 'curation-export.json' npath_cep = npath / 'curation-export-published.json' for path, blob in ((npath_ce, sum_all), (npath_cep, sum_pub)): with open(path, 'wt') as f: json.dump(blob, f, indent=2) npath_ttl = npath / 'curation-export.ttl' npath_ttlp = npath / 'curation-export-published.ttl' ttl_all.write(npath_ttl) ttl_pub.write(npath_ttlp) if disco: # export xml and tsv for disco from sparcur.simple.disco import from_curation_export_json_path_datasets_json_xml_and_disco from_curation_export_json_path_datasets_json_xml_and_disco( npath_ce, sum_all['datasets'], graphs_all) if latest.is_symlink(): latest.unlink() latest.symlink_to(friendly_maxdt_local) return npath_ce, sum_all, sum_pub, graphs_all, graphs_pub if __name__ == '__main__': #import pprint from sparcur.simple.utils import pipe_main # these are really big, don't print them # pipe_main(main, after=pprint.pprint) pipe_main(main) #+end_src *** disco Legacy export for disco. #+name: disco.py #+begin_src python :tangle ../sparcur/simple/disco.py import json from pyontutils.core import OntGraph from sparcur import export as ex from sparcur.utils import PennsieveId from sparcur.utils import fromJson, register_all_types # FIXME this should also go in sparcron def from_curation_export_json_path_xml_and_disco(curation_export_json_path): with open(curation_export_json_path, 'rt') as f: summary = json.load(f) datasets_json = summary['datasets'] from_curation_export_json_path_datasets_json_xml_and_disco( curation_export_json_path, datasets_json) def from_curation_export_json_path_datasets_json_xml_and_disco( curation_export_json_path, datasets_json, graphs=None): # FIXME need the snapshot linked somehow, export time started if we are using summary # or summary prov timestamp_export_start will give us the snapshot path as well if we # parse it back to a date if not graphs: snapshot_path = curation_export_json_path.parent / 'snapshot' paths = [(snapshot_path / PennsieveId(d['id']).uuid / 'curation-export.ttl') for d in datasets_json] graphs = [OntGraph().parse(path) for path in paths] datasets_ir = fromJson(datasets_json) ex.export_xml(curation_export_json_path, datasets_ir) ex.export_disco(curation_export_json_path, datasets_ir, graphs) # XXX not doing jsonld here, it will be combined # from single dataset jsonld or similar def main(path=None, **kwargs): register_all_types() # FIXME the problem with this approach is that can cannot run # multiple downstream steps from the same upstream step, we would # need a compositional way to tell each downstream which upstreams # we wanted to run in any given situation, all to save additional # reads from disk if path is None: # assume the user wants to run combine first from sparcur.simple.combine import main as combine curation_export_json_path, summary_all, _, graphs_all, _ = combine(**kwargs) datasets_json = summary_all['datasets'] from_curation_export_json_path_datasets_json_xml_and_disco( curation_export_json_path, datasets_json, graphs) else: curation_export_json_path = path from_curation_export_json_path_xml_and_disco(curation_export_json_path) if __name__ == '__main__': #import pprint from sparcur.simple.utils import pipe_main # these are really big, don't print them # pipe_main(main, after=pprint.pprint) pipe_main(main) #+end_src *** compact find all export dataset folders that are older than the current snapshot and are not in a snapshot that is in a summary compact them #+name: compact.py #+begin_src python :tangle ../sparcur/simple/compact.py """ Examples: # look at the second one and then run the first one since it is easier and safer to stop and resume # XZ_OPT=-e9 python -m sparcur.simple.compact | xargs -P4 -r -I{} sh -c 'tar -cvJf "{}.tar.xz" "{}" && rm -r "{}"' python -m sparcur.simple.compact | xargs -P12 -r -I{} echo tar -cvJf '{}.tar.xz' '{}' # python -m sparcur.simple.compact | xargs -P6 -r -I{} echo rm -r '{}' """ from sparcur.paths import Path from sparcur.config import auth __dep_cache = {} def latest_snapped(dataset_export_path, snapped): if dataset_export_path not in __dep_cache: cs = set(c for c in dataset_export_path.children if c.is_dir() and not c.is_symlink()) csnap = cs.intersection(snapped) if not csnap: # no snap, pretend that latest is snapped # this can happen if there is no LATEST because we # were e.g. just exporting path metadata and nothing else maxsnap = sorted(cs, key=lambda p: p.name)[-1] else: maxsnap = sorted(csnap, key=lambda p: p.name)[-1] __dep_cache[dataset_export_path] = maxsnap.name return __dep_cache[dataset_export_path] def main(): export_path = Path(auth.get_path('export-path')) summary_path = export_path / 'summary' snapshots_path = export_path / 'snapshots' datasets_path = export_path / 'datasets' snap_shotted = [ dss.resolve() for d in summary_path.rchildren_dirs for l in d.rchildren if l.is_symlink() and l.name == 'snapshot' for dss in l.resolve().children] snapped = set(snap_shotted) latest_sums = [ d.resolve() for c in summary_path.children for d in (c / 'LATEST',) if d.exists()] all_builds = [ build for date in datasets_path.children if date.is_dir() and not date.is_symlink() for build in date.children if build.is_dir() and not build.is_symlink()] older_not_snap = [ a for a in all_builds if a not in snapped and a.name < latest_snapped(a.parent, snapped)] assert not snapped.intersection(older_not_snap) # newer = set(all_builds) - snapped - set(older_not_snap) _ = [print(p) for p in older_not_snap] if __name__ == '__main__': main() #+end_src ** Utility *** _*Init*_ #+begin_src python :tangle ../sparcur/simple/__init__.py :exports none :shebang "" #+end_src *** _*Utils*_ #+name: utils.py #+begin_src python :tangle ../sparcur/simple/utils.py :noweb yes from sparcur.config import auth __doc__ = f"""Common command line options for all sparcur.simple modules Usage: sparcur-simple manifest [options] [...] sparcur-simple get-uuid ... sparcur-simple datasets sparcur-simple for-racket sparcur-simple for-racket (meta|diff) [options] [ ...] sparcur-simple for-racket make-push-manifest [options] [ ...] sparcur-simple for-racket push [options] [ ...] sparcur-simple check-names sparcur-simple git-repos [update] [options] sparcur-simple [options] [...] sparcur-simple [options] [--dataset-id=...] sparcur-simple [options] [--extension=...] [...] Commands: manifest generate manifest files for path for-racket print data for reading into Racket Options: -h --help show this --hypothesis-group-name=NAME the hypotheis group name --project-id=ID the project id --dataset-id=... one or more datset ids --project-id-auth-var=VAR name of the auth variable holding the project-id --all-projects run for all projects listed in config remote-organizations --project-path=PATH the project path, will be path if ... is empty --parent-path=PATH the parent path where the project will be cloned to --parent-parent-path=PATH parent in which a random tempdir is generated or the dataset uuid is used as the parent path, don't use this ... --invariant-local-path=PATH path relative to parent path for dataset [default: dataset] --export-local set export-parent-path to {{:parent-path}}/exports/ --export-path=PATH base export path containing the standard path structure [default: {auth.get_path('export-path')}] --export-parent-path=PATH direct parent path into which exports will be placed --for-template=TEMPLATE run regularization steps when generating templates --cleaned-path=PATH base cleaned path containing the standard path structure [default: {auth.get_path('cleaned-path')}] --cleaned-output-path=PATH directory path into which cleaned paths will be placed --extension=... one or more file extensions to fetch -j --jobs=N number joblib jobs [default: 12] --exclude-uploaded do not pull files from remote marked as uploaded --sparse-limit=N package count that forces a sparse pull [default: {auth.get('sparse-limit')}] --no-index do not generate index files e.g. on pull --symlink-objects-to=PATH path to an existing objects directory --log-level=LEVEL log level info [default: INFO] --open=PROGRAM show file with PROGRAM --show show file with xopen --pretend show what would be done for update """ # XXX FIXME --dataset-id=... breaks the use of [options] ??? import os import sys from types import GeneratorType from pyontutils import clifun as clif from sparcur import exceptions as exc from sparcur.utils import _find_command from sparcur.utils import log, logd, loge, GetTimeNow, PennsieveId from sparcur.paths import Path, PennsieveCache from sparcur.backends import PennsieveRemote def backend_pennsieve(project_id=None, Local=Path, Cache=PennsieveCache): # (ref:def_babf) """return a configured pennsieve backend calling this is sufficient to get everything set up correclty You must call RemotePath.init(project_id) before using RemotePath. Passing the project_id argument to this function will do that for you. It is not required because there are cases where the project_id may not be known at the time that this function is called. """ RemotePath = PennsieveRemote._new(Local, Cache) if project_id is not None: RemotePath.init(project_id) return RemotePath class Options(clif.Options): @property def id(self): # dataset_id has priority since project_id can occure without a # dataset_id, but dataset_id may sometimes come with a project_id # in which case the dataset_id needs priority for functions that # branch on the most granular identifier type provided return (self.dataset_id[0] if self.dataset_id else (self.project_id if self.project_id else None)) @property def project_id(self): if not hasattr(self, '_cache_project_id'): id = self._args['--project-id'] if id is not None: identifier = PennsieveId(id, type='organization') self._cache_project_id = identifier else: return return self._cache_project_id @property def dataset_id(self): if not hasattr(self, '_cache_dataset_id'): ids = self._args['--dataset-id'] positional = self._args[''] if ids: identifiers = [PennsieveId(id, type='dataset') for id in ids] self._cache_dataset_id = identifiers elif positional: return [PennsieveId(positional, type='dataset')] else: return ids return self._cache_dataset_id @property def remote_id(self): if not hasattr(self, '_cache_remote_id'): ids = self._args[''] if ids: identifiers = [PennsieveId(id) for id in ids] self._cache_remote_id = identifiers else: return ids return self._cache_remote_id @property def jobs(self): return int(self._args['--jobs']) n_jobs = jobs # match internal kwargs conventions @property def paths(self): return [Path(p).expanduser().resolve() for p in self._args['']] @property def path(self): paths = self.paths if paths: return paths[0] elif self.project_path: return self.project_path else: # if no paths were listed default to cwd # consistent with how the default kwargs # are set on a number of mains # this is preferable to allow path=None # to be overwritten by the conventions of # individual pipeline mains return Path.cwd() @property def project_path(self): pp = self._args['--project-path'] if pp: return Path(pp).expanduser().resolve() @property def parent_parent_path(self): ppp = self._args['--parent-parent-path'] if ppp: return Path(ppp).expanduser().resolve() else: return Path.cwd() @property def parent_path(self): pap = self._args['--parent-path'] did = self.dataset_id if pap: return Path(pap).expanduser().resolve() elif did: id = self.id # dataset_id is a list so use id which handles that uuid = id.uuid return (self.parent_parent_path / uuid).expanduser().resolve() @property def export_path(self): ep = self._args['--export-path'] epp = self.export_parent_path if ep and epp: raise TypeError('Only one of --export-path and --export-parent-path may be set.') elif ep: return Path(ep).expanduser().resolve() else: raise Exception('should not happen') @property def export_parent_path(self): epp = self._args['--export-parent-path'] el = self.export_local pap = self.parent_path if epp and el: raise TypeError('Only one of --export-local and --export-parent-path may be set.') elif epp: return Path(epp).expanduser().resolve() elif el and pap: # it is ok to do this here becuase the TypeError above prevents # the case where both epp and el are set, so even though epp # is no longer always what was set on the command line, it is # it is the case that there won't be conflicting sources return pap / 'exports' @property def cleaned_path(self): cp = self._args['--cleaned-path'] cop = self.cleaned_output_path if cp and cop and cp != self._defaults['--cleaned-path']: # XXX NOTE if the default path is passed as the cleaned path this will not error # that is the cost of showing the user the default value in cli, we can't tell # whether the user set the option in that case, probably a good tradeoff # the alternative is to detect that the value is None and set it later, which # is done routinely here, but is somewhat opaque raise TypeError('Only one of --cleaned-path and --cleaned-output-path may be set.') elif cp: return Path(cp).expanduser().resolve() else: raise Exception('should not happen') @property def cleaned_output_path(self): cop = self._args['--cleaned-output-path'] if cop: return Path(cop).expanduser().resolve() @property def extensions(self): return self.extension @property def symlink_objects_to(self): slot = self._args['--symlink-objects-to'] if slot: return Path(slot).expanduser() @property def sparse_limit(self): # FIXME not being pulled in by asKwargs ?? return int(self._args['--sparse-limit']) @property def time_now(self): # FIXME make it possible to pass this in? if not hasattr(self, '_time_now'): self._time_now = GetTimeNow() return self._time_now @property def log_level(self): # FIXME not being pulled in by asKwargs ?? ll = self._args['--log-level'] if ll.isdigit() or ll[0] == '-' and ll[1:].isdigit(): return int(ll) else: return ll def pipe_main(main, after=None, argv=None): options, args, defaults = Options.setup(__doc__, argv=argv) # _entry_point is used as a way to switch behavior when a # pipeline main is run directly or actually in a pipeline try: log.setLevel(options.log_level) logd.setLevel(options.log_level) loge.setLevel(options.log_level) out = main(_entry_point=True, **options.asKwargs()) except Exception as e: log.exception(e) log.error(options.path) raise e if after: after(out) return out def rglob(path, pattern): """ Hack around the absurd slowness of python's rglob """ if sys.platform == 'win32': log.warning('no findutils on windows, watch out for unexpected files') return list(path.rglob(pattern)) doig = (hasattr(path, 'cache') and path.cache and path.cache.cache_ignore) exclude = ' '.join([f"-not -path './{p}*'" for p in path.cache.cache_ignore]) if doig else '' command = f"""{_find_command} {exclude} -name {pattern!r}""" with path: with os.popen(command) as p: string = p.read() path_strings = string.split('\n') # XXX posix path names can contain newlines paths = [path / s for s in path_strings if s] return paths def _fetch(cache): # sigh joblib multiprocessing pickling # lambda functions are great right up until you have to handle an # error function inside of them ... thanks python for yet another # failure to be homogenous >_< meta = cache.meta try: size_mb = meta.size.mb except AttributeError as e: if meta.errors: logd.debug(f'remote errors {meta.errors} for {cache!r}') return else: raise exc.StopTheWorld(cache) from e return cache.fetch(size_limit_mb=None) # FIXME somehow this is not working !? def _fetch_path(path): # sigh joblib multiprocessing pickling path = Path(path) if path.is_dir(): msg = f'trying to fetch a directory: {path}' log.error(msg) return cache = path.cache if path.exists() and path.size == cache.meta.size: return # already got this one if cache is None: raise exc.NoCachedMetadataError(path) # do not return to avoid cost of serialization back to the control process _fetch(cache) def fetch_paths_parallel(paths, n_jobs=12, use_async=True): if n_jobs <= 1: [_fetch_path(path) for path in paths] elif use_async: from pyontutils.utils import Async, deferred Async()(deferred(_fetch_path)(path) for path in paths) else: import pathlib from joblib import Parallel, delayed backend = 'multiprocessing' if hasattr(sys, 'pypy_version_info') else None # FIXME FIXME FIXME somehow this can result in samples.xlsx being fetched to subjects.xlsx !?!??!! # XXX either a race condition on our end or something insane from the remote Parallel(n_jobs=n_jobs, backend=backend)(delayed(_fetch_path)(pathlib.Path(path)) for path in paths) #Parallel(n_jobs=n_jobs)(delayed(fetch_path)(path) for path in paths) def combinate(*functions): def combinator(*args, **kwargs): for f in functions: # NOTE no error handling is needed here # in no cases should the construction of # the python object version of a path fail # all failures should happen _after_ construction # the way we have implemented this they fail when # the data attribute is accessed obj = f(*args, **kwargs) if isinstance(obj, GeneratorType): yield from obj # FIXME last one wins, vs yield tuple vs ...? # FIXME manifests are completely broken for this else: yield obj return combinator def multiple(func, merge=None): """ combine multiple results """ def express(*args, **kwargs): vs = tuple(func(*args, **kwargs)) if merge is not None: yield merge(vs) else: yield vs return express def early_failure(path, error, dataset_blob=None): # these are the 9999 5555 and 4444 errors # TODO match the minimal schema reqs as # we did in pipelines if dataset_blob is None: cache = path.cache return {'id': cache.id, 'meta': {'uri_api': cache.uri_api, 'uri_human': cache.uri_human,}, #'status': 'early_failure', # XXX note that status is not requried # if we cannot compute it, but if there are inputs there should be # a status 'errors': [error], # FIXME format errro } else: if 'errors' not in datset_blob: dataset_blob['errors'] = [] datset_blob['errors'].append(error) return dataset_blob class DataWrapper: # sigh patterns are stupid, move this to elsewhere so it doesn't taint everything def __init__(self, data): self.data = data def main(id=None, **kwargs): def ik(key): return key in kwargs and kwargs[key] if id is not None: print(id.uuid) if ik('get_uuid'): for id in kwargs['remote_id']: print(id.uuid) return if (ik('datasets') or (ik('for_racket') and not (ik('diff') or ik('make_push_manifest') or ik('push'))) or ik('check_names')): log.setLevel(60) # logging.CRITICAL = 50 from sparcur.config import auth from sparcur.simple.utils import backend_pennsieve if ik('project_id'): pass # project_id from command line else: project_id = auth.get('remote-organization') project_ids = auth.get_list('remote-organizations') if not project_ids: project_ids = [project_id] datasets = [] project_meta = [] for project_id in project_ids: PennsieveRemote = backend_pennsieve(project_id) root = PennsieveRemote(project_id) project_meta.append((root.id, root.name)) datasets.extend(root.children) if ik('datasets'): print('\n'.join([d.id for d in datasets])) if ik('for_racket'): import json from sxpyr import sxpyr from augpathlib import meta as apmeta from pyontutils.utils_fast import isoformat from dateutil import parser as dateparser if ik('meta'): paths = kwargs['paths'] if kwargs['paths'] else [kwargs['path']] for path in paths: cmeta = Path(path).cache_meta out = '(' rest = False ebf = apmeta._EncodeByField() for field in apmeta._PathMetaAsXattrs.fields: v = getattr(cmeta, field) if v or v == 0: if rest: out += '\n ' else: rest = True if field == 'checksum': v = v.hex() # bytes are annoying to port elif field in ('updated', 'created'): v = ebf._datetime(v) out += f':{field} {json.dumps(v)}' out += ')' print(out) elif ik('diff'): # FIXME own file? # FIXME TODO based on dataset-id path = kwargs['path'] dataset_id, updated_transitive, diff = path.diff() blob = { 'dataset-id': dataset_id.id, 'updated-transitive': isoformat(updated_transitive), 'diff': diff, } pl = sxpyr.python_to_sxpr(blob, str_as_string=True) sxpr = pl._print(sxpyr.configure_print_plist(newline_keyword=False)) print(sxpr) elif ik('make_push_manifest'): # FIXME own file? # this is separate from push because checking confim is a separate action from pushing dataset_id = id updated = dateparser.parse(kwargs['updated']) push_id = kwargs['push_id'] path = kwargs['path'] path.make_push_manifest(dataset_id, updated, push_id) log.info(f'make push manifest finished for {dataset_id}') # read list of files + expected ops to change # generate the full change manifest elif ik('push'): # FIXME own file? dataset_id = id updated = dateparser.parse(kwargs['updated']) push_id = kwargs['push_id'] path = kwargs['path'] # TODO error handling and retry or resume path.push_from_manifest(dataset_id, updated, push_id) log.info(f'push from manifest finished for {dataset_id}') # FIXME TODO consider pulling incremental index entries # for only the paths in the manifest so that the diff # doesn't go stale, also need to pull the latest remote # metadata for the updated files else: key = lambda d: d.updated _dsmeta = '\n'.join([("(" f"{json.dumps(d.id)} " f"{json.dumps(d.name)} " f"{json.dumps(d.updated)} " f"{json.dumps(d.owner.name_or_email)} " f"{json.dumps(d.parent_id)} " f"{json.dumps(d.bfobject.publication_status)}" ")") for d in sorted(datasets, key=key, reverse=True)]) dsmeta = f"({_dsmeta})" _projmeta = '\n'.join([f"({json.dumps(id)} {json.dumps(name)})" for id, name in project_meta]) dsmeta += f"\n({_projmeta})" print(dsmeta) if ik('check_names'): # you will want to run sparcur.simple.fetch_remote_metadata_all from pathlib import PurePosixPath def report(pid, exp, pub): pname = pub['name'] name_mismatch = ( False if exp['basename'] == pname else (exp['basename'], pname)) # [6:] to strip files/ ppname = PurePosixPath(pub['path']).name pathname_mismatch = ( False if exp['basename'] == ppname else (exp['basename'], ppname)) eppp = PurePosixPath(exp['dataset_relative_path']).parent.as_posix() pppp = PurePosixPath(pub['path'][6:]).parent.as_posix() parent_path_mismatch = ( False if eppp == pppp else (eppp, pppp)) # once we fix things on our end names should match # parent paths should match # name and pathname might match but do not have to match pid = f' {pid}' nsp = '\n ' if name_mismatch: if pathname_mismatch and pname != ppname: return (f'{pid} name mismatch and pathname mismatch ' f'{nsp}{nsp.join(name_mismatch)}{nsp}{nsp.join(pathname_mismatch)}') else: return (f'{pid} name mismatch ' f'{nsp}{nsp.join(name_mismatch)}') if parent_path_mismatch: if True: return (f'{pid} parent mismatch' f'{nsp}{nsp.join(parent_path_mismatch)}') if True: if True: return '' #return (f'{pid} ok ') import json from sparcur.backends import PennsieveDatasetData export_path = kwargs['export_path'] epd = export_path / 'datasets' for dataset in datasets: latest = epd / dataset.identifier.uuid / 'LATEST' export_path_metadata = latest / 'path-metadata.json' exported = export_path_metadata.exists() # pass the remote not just the id so that bfobject is # accessible to the RemoteDatasetData class pdd = PennsieveDatasetData(dataset) rmeta = pdd.fromCache() published = 'id_published' in rmeta if exported and published: with open(export_path_metadata, 'rt') as f: j = json.load(f) epni = {'N:' + o['remote_id']:o for o in j['data'] if o['remote_id'].startswith('package:')} ppni = pdd._published_package_name_index() se, sp = set(epni), set(ppni) e_missing = sp - se p_missing = se - sp s_common = sp & se rep = [report(c, epni[c], ppni[c]) for c in s_common] repstr = '\n'.join([r for r in rep if r]) if repstr: print(dataset.id, 'bad') print(repstr) else: print(dataset.id, 'ok') elif exported: print(dataset.id, 'unpublished') elif published: print(dataset.id, 'unexported') else: print(dataset.id, 'unpublished and unexported') if ik('git_repos'): import augpathlib as aug import sparcur from sparcur.config import auth from importlib import metadata d = metadata.distribution(sparcur.__package__) rps = [p for p in [aug.RepoPath(d._path) for d in d.discover()] if p.working_dir] setups = [p for p in [p.parent / 'setup.py' for p in rps] if p.exists()] wds = sorted(set([p.working_dir for p in rps])) never_update = auth.get('never-update') pretend=ik('pretend') or never_update if pretend: if never_update: print(f'never-update: true is set in {auth.user_config._path}') print('These are the commands that would be run.') def doupdate(rp): if pretend: print(f'git -C {rp.as_posix()} stash') print(f'git -C {rp.as_posix()} pull') return print(f'Pulling {rp.as_posix()}') print(rp.repo.git.stash()) # TODO checkout to a safety branch and tag for rollback print(rp.repo.git.pull()) if ik('update'): for wd in wds: if 'homebrew' in wd.as_posix(): continue doupdate(wd) # indescriminately run setup.py with --release set to tangle from importlib import util as imu oldargv = sys.argv try: for s in setups: if pretend: print(f'pushd {s.parent.as_posix()}; python setup.py --release; popd') continue sys.argv = ['setup.py', '--release'] # reset every time since most will mutate print(f'Maybe tangling via {s.as_posix()}') spec = imu.spec_from_file_location(f'setup_{s.parent.name}', s) mod = imu.module_from_spec(spec) try: with s.parent: # ah relative paths spec.loader.exec_module(mod) except SystemExit: pass except Exception as e: log.exception(e) finally: sys.argv = oldargv if ik('manifest'): from sparcur.utils import write_manifests paths = kwargs['paths'] if not paths: paths = [Path.cwd()] manifests_rendered = write_manifests(parents=paths) manifests, rendered = zip(*manifests_rendered) nl = '\n' print(f'generated manifests at:\n{nl.join([m.as_posix() for m in manifests])}') if ik('open'): cmd = kwargs['open'] [m.xopen(cmd) for m in manifests] elif ik('show'): [m.xopen() for m in manifests] if __name__ == '__main__': pipe_main(main) #+end_src *** _*Test*_ #+begin_src python :tangle ../test/simple/__init__.py #+end_src #+begin_src python :tangle ../test/simple/test_utils.py from .. import common from sparcur.simple.utils import pipe_main def test_pipe_main(): def main(id=None, project_path=None, **kwargs): print(id, project_path, kwargs) pipe_main(main, argv=['sparcur-simple']) #+end_src *** _*Clean metadata files*_ dataset id dataset path dataset relative paths cleaned file root metadata file objects cleaned objects write objects cleaned files at cleaned file paths # TODO consider whether we try to do replace automatically since we can track the old metadata #+name: clean_metadata_files.py #+begin_src python :tangle ../sparcur/simple/clean_metadata_files.py :noweb yes from types import MappingProxyType from sparcur.paths import Path from sparcur import datasets as dat from sparcur.utils import symlink_latest, merge_template_stems from sparcur.config import auth from sparcur.simple.utils import rglob, log _dmpt = MappingProxyType({}) def prepare_dataset_cleaned(dataset_path, cleaned_path=None, time_now=None): if cleaned_path is None: # FIXME confusing and breaks w/ convention -> Options maybe? cleaned_path = Path(auth.get_path('cleaned-path')) from sparcur.utils import PennsieveId identifier = PennsieveId(dataset_path.cache.id) uuid = identifier.uuid cleaned_dataset_folder = cleaned_path / uuid cleaned_output_path = cleaned_dataset_folder / time_now.START_TIMESTAMP_LOCAL_FRIENDLY if not cleaned_output_path.exists(): cleaned_output_path.mkdir(parents=True) cleaned_latest = cleaned_dataset_folder / 'LATEST' # FIXME do we symlink before we know things have succeeded ??? symlink_latest(cleaned_output_path, cleaned_latest) return cleaned_output_path def from_dataset_path_metadata_file_paths(dataset_path): matches = [] for candidate in rglob(dataset_path, '*.xlsx'): rp = candidate.relative_path_from(dataset_path) if not rp.parent.name or 'anifest' in rp.name: matches.append(candidate) return matches def from_path_cleaned_object(path, for_template=False, known_templates=_dmpt): t = dat.Tabular(path) sheet, wb, sparse_empty_rows = t._openpyxl_fixes(for_template=for_template, known_templates=known_templates) return wb def from_file_paths_objects(paths, for_template=False, known_templates=_dmpt): if for_template: template = known_templates[for_template] if for_template in known_templates else None tmpl = merge_template_stems(template, known_templates) if template else None # FIXME this gets run again stems = None if tmpl is None or 'stems' not in tmpl else tmpl['stems'] else: stems = None for path in paths: if stems and path.stem not in stems: yield None continue if path.suffix == '.xlsx': cleaned = from_path_cleaned_object(path, for_template, known_templates) yield cleaned else: yield None def from_dataset_path_cleaned_files(dataset_path, cleaned_output_path, for_template=False, known_templates=_dmpt): "NOTE this actually does the cleaning" paths = from_dataset_path_metadata_file_paths(dataset_path) for path, obj in zip(paths, from_file_paths_objects(paths, for_template, known_templates)): if obj is not None: drp = path.relative_path_from(dataset_path) target = cleaned_output_path / drp if not target.parent.exists(): target.parent.mkdir(parents=True) obj.save(target) def main(path=Path.cwd(), id=None, time_now=None, parent_path=None, invariant_local_path='dataset', parent_parent_path=Path.cwd(), cleaned_path=None, cleaned_output_path=None, for_template=False, **kwargs): # note that cleaned_path is better though of as cleaned_base_path # setup taken from path_metadata.py::main so check the notes there if path == Path.cwd() and (id is not None or parent_path is not None): if parent_path is None: uuid = id.uuid parent_path = parent_parent_path / uuid invariant_path = parent_path / invariant_local_path path = invariant_path.expanduser().resolve() else: parent_parent_path = None path = Path(path) cache = path.cache if cache is None: if cleaned_output_path is None: raise ValueError( 'No remote cached data and cleaned_output_path not set. Pass ' '--cleaned-output-path=path/to/cleaned-for-this-dataset') if not for_template: log.warning('running in the wild without cached metadata') cleaned_output_path.mkdir(parents=True, exist_ok=True) elif not cache.is_dataset(): raise TypeError('can only run on a single dataset') else: cleaned_output_path = prepare_dataset_cleaned(path, cleaned_path, time_now) if for_template: from sparcur.config import auth from orthauth.utils import sxpr_to_python resources = auth.get_path('resources') with open(resources / 'templates.sxpr') as f: known_templates = sxpr_to_python(f.read()) # TODO source externally _known_templates = { 'clean': False, 'default': { 'stems-closed': False, 'stems': { 'dataset_description': { 'header': 'row', 'remove': ['device_.+'], }}}} else: known_templates = None from_dataset_path_cleaned_files(path, cleaned_output_path, for_template, known_templates) if __name__ == '__main__': import pprint from sparcur.simple.utils import pipe_main pipe_main(main) #+end_src * Code :noexport: See also https://docs.racket-lang.org/graphviz/index.html =raco pkg install racket-graphviz= for more direct mapping of graphviz functionality but one that is also way more verbose. # XXX NOTE make sure that this block is never tangled, it complicates setup # #+header: :tangle ./y-u-no-compile-from-buffer.rkt #+header: :tangle no #+name: racket-graph-helper #+header: :prologue "#lang racket/base" :comments link #+begin_src racket :lang racket/base :exports none (require graph ; raco pkg install graph ;(only-in graph/adjlist-utils rename-vertex@) ; THIS IS WHAT HAPPENS WHEN YOU MAKE THINGS PRIVATE THAT SHOULD NOT BE (only-in racket/unsafe/ops unsafe-struct*-ref) ; THIS TOO racket/pretty racket/set racket/list (only-in racket/format ~a) (only-in racket/port with-output-to-string) (only-in racket/string string-trim string-replace) (for-syntax racket/base syntax/parse)) (define-for-syntax (list-to-pairs l) (for/list ([a l] [b (cdr l)]) (list a b))) (define-syntax (dag-notation stx) "Write out directed graphs using a -> b -> c syntax. You can use a colon : at the start of a line to preserve spacing." ; TODO <- ?? ; TODO (-> edge-type) syntax ; TODO a -> (b c d e) syntax for multiple terminal deps (syntax-parse stx #:datum-literals (-> :) [(_ (~seq (~optional :) left (~seq -> right) ...) ...) #:with (pairs ...) (datum->syntax this-syntax (apply append (map list-to-pairs (syntax->datum #'((left right ...) ...))))) #'(unweighted-graph/directed (quote (pairs ...)))])) (define (rename-vertex@ adj old new) "an implementation of rename-vertex! that correctly handles renamings that merge two verticies" (hash-set! adj new (set-union (hash-ref adj old) (set-remove (hash-ref adj new (set)) new))) (hash-remove! adj old) (for ([(u vs) (in-hash adj)] #:when (set-member? vs old)) (hash-set! adj u (set-add (set-remove vs old) new)))) (define (collapse-clusters! g clusters #:only [only-these #f]) ; prepare clusters ; string->symbol cluster invert mapping to get replacement table ; rename-vertex! for each vertex in the cluster to the cluster name (for ([cluster clusters]) (when (or (not only-these) (member (cluster-label cluster) only-these)) (let ((sym-label (string->symbol (cluster-label cluster)))) (for ([vert (cluster-members cluster)]) ;(with-output-to-file "/tmp/col-clu.rktd" #:exists 'append (λ () (pretty-print (list vert sym-label)))) (rename-vertex@ (unsafe-struct*-ref g 0) vert sym-label)))))) (define (subgraph->graphviz subgraph->hash) (let ([members (for/list ([(k v) (in-hash (subgraph->hash))] #:when v) k)] [label (string-replace (string-trim (symbol->string (object-name subgraph->hash)) "->hash") #rx"[-_?>]" "_")]) (string-append (format "subgraph cluster_~a" label) ; FIXME this won't quite work because we need to know ; the ids to which the nodes were assigned :/ ) )) (define (graphviz-subgraphs graph #:subgraphs [subgraph->hash-functions '()]) "wrap graphviz since it is too simple for our needs at the moment subgraphs should be specified using vertext properties or edge properties" ;; XXX really more clusters (define s (graphviz graph)) (let* ([sl (string-length s)] [split-at (- sl 2)] [start (substring s 0 split-at)] [end (substring s split-at sl)] [extra (map subgraph->graphviz subgraph->hash-functions)]) (apply string-append `(,start ,@extra ,end)))) (struct cluster (label color members labeljust)) (define (new-cluster label color members [labeljust "c"]) (cluster label color members labeljust)) ; it is extremely annoying that you have to locally redefine a ; function just because someone upstream forgot to provide it >_< (define (sanatize-name name) (cond [(string? name) (with-output-to-string (λ () (write name)))] [(symbol? name) (sanatize-name (symbol->string name))] [(number? name) (sanatize-name (number->string name))] [else (sanatize-name (with-output-to-string (λ () (pretty-print name))))])) (define (graphviz-2 g #:clusters [clusters-in '()] #:collapse-clusters [collapse #f] #:colors [colors #f] #:output [port #f]) (when collapse (collapse-clusters! g clusters-in #:only (and (list? collapse) (not (null? collapse)) collapse))) (define clusters (if collapse (if (and (list? collapse) (not (null? collapse))) (for/list ([c clusters-in] #:unless (member (cluster-label c) collapse)) c) '()) clusters-in)) (define (generate-graph) (parameterize ([current-output-port (or port (current-output-port))]) (define weighted? (weighted-graph? g)) (define cluster-count 0) (define (make-new-cluster) (begin0 (format "cluster_~a" cluster-count) (set! cluster-count (add1 cluster-count)))) (define node-count 0) (define node-id-table (make-hash)) (define (node-id-table-ref! node) (hash-ref! node-id-table node (λ () (begin0 (format "node~a" node-count) (set! node-count (add1 node-count)))))) (printf "digraph G {\n") ; Add vertices, color them using evenly spaced HSV colors if given colors (define color-count (and colors (add1 (apply max (hash-values colors))))) (for ([v (in-vertices g)]) (cond [(and color-count (hash-ref colors v #f)) (printf "\t~a [label=~a,color=\"~a 1.0 1.0\"];\n" (node-id-table-ref! v) (sanatize-name v) (~a #:max-width 5 (exact->inexact (/ (hash-ref colors v #f) color-count))))] [else (printf "\t~a [label=~a];\n" (node-id-table-ref! v) (sanatize-name v))])) ; Write undirected edges as one subgraph (printf "\tsubgraph U {\n") (printf "\t\tedge [dir=none];\n") (define undirected-edges (for/fold ([added (set)]) ([e (in-edges g)] #:when (and (not (set-member? added e)) (has-edge? g (second e) (first e)) (equal? (edge-weight g (first e) (second e)) (edge-weight g (second e) (first e))))) (printf "\t\t~a -> ~a~a;\n" (node-id-table-ref! (first e)) (node-id-table-ref! (second e)) (if weighted? (format " [label=\"~a\"]" (edge-weight g (first e) (second e))) "")) (set-add (set-add added e) (list (second e) (first e))))) (printf "\t}\n") ; write defined clusters (define defined-edges (for/fold ([added (set)]) ([cluster clusters]) (printf "\tsubgraph ~a {\n\t\tlabel=\"~a\";\n\t\tlabeljust=\"~a\";\n" (make-new-cluster) (cluster-label cluster) (cluster-labeljust cluster)) (when (cluster-color cluster) (printf "\t\tcolor=~a;\n" (cluster-color cluster))) (begin0 (for*/fold ([added added]) ([vertex-start (cluster-members cluster)] #; [vertex-end (in-neighbors g vertex-start)]) ; FIXME node vs edge clusterings ; node clustering is what we have right now and it works ; as expected for layout, but at some point we may want ; to enable explicit edge clustering as well, in the current ; impl clusters should be non-overlapping (let ([e '(1 2) #;(if (has-edge? g vertex-start vertex-end) (list vertex-start vertex-end) (list vertex-end vertex-start))]) (printf "\t\t~a;\n" (node-id-table-ref! vertex-start)) #; (printf "\t\t~a -> ~a~a;\n" (node-id-table-ref! (first e)) (node-id-table-ref! (second e)) (if weighted? (format " [label=\"~a\"]" (edge-weight g (first e) (second e))) "")) (set-add (set-add added e) (list (second e) (first e))))) (printf "\t}\n")))) ; Write directed edges as another subgraph (printf "\tsubgraph D {\n") (for ([e (in-edges g)] #:unless (or (set-member? undirected-edges e) #;(set-member? defined-edges e))) (printf "\t\t~a -> ~a~a;\n" (node-id-table-ref! (first e)) (node-id-table-ref! (second e)) (if weighted? (format " [label=\"~a\"]" (edge-weight g (first e) (second e))) ""))) (printf "\t}\n") (printf "}\n"))) (if port (generate-graph) (with-output-to-string generate-graph))) (module+ test (define g (dag-notation a -> b -> c b -> d -> e -> f)) #; (pretty-print g) (define gg (graphviz-2 g #:clusters (list (new-cluster "test" #f '(b c d))))) (display gg) (define ggc (graphviz-2 g #:collapse-clusters #t #:clusters (list (new-cluster "test" #f '(b c d))))) (display ggc) ) #; (module+ test-sigh ;; unfortunately the implementation of vertex and edge properties ;; is written as a macro so they are not composable and you cannot ;; iterate over multiple runtime specified properties so we have ;; to use something else (define-vertex-property g my-subgraph) (for-each (λ (v) (my-subgraph-set! v #t)) '(b c d)) (define sgh (list my-subgraph->hash)) (define ggs (graphviz-subgraphs g #:subgraphs sgh)) (display ggs) ) #; (module+ test ; TODO explor possibility of using -^ or -> ^ or | syntax ; to point up to the most recent definition chain containing ; the start of the chain in question and having one more ; elment than the current chain #; (dag-notation ; for example a -> b -> c -> d b -> q | b -> p | ; expands to a -> b -> c -> d b -> q -> d b -> p -> d) ; in theory this notation could also be used in reverse, but I'm worried about ; accidental hard to debug errors if a line accidentally ends with an arrow #; (dag-notation ; clearly confusing a -> b -> c -> d | -> d | -> e ; this would probably read as a -> b -> c -> d a -> d a -> e ; hrm a -> b -> c -> d a | | e ; not sure if I like this pretty sure I dont ... ) ) #+end_src * Bootstrap :noexport: #+name: test-fail-block #+begin_src bash set -e echo prepare for FAIL echo ERROR 1>&2 false echo this should not print echo ERROR ERROR 1>&2 #+end_src #+name: orgstrap #+begin_src elisp :results none :exports none :lexical yes :noweb yes <> (org-babel-do-load-languages 'org-babel-load-languages `((python . t) (shell . t) ,@org-babel-load-languages)) (when noninteractive (let ((tangle (member "tangle" argv)) (sckan-release (member "sckan-release" argv))) (message "%S" sckan-release) (condition-case err (progn (when tangle (let (enable-local-eval) ;; this pattern is required when tangling to avoid infinite loops (revert-buffer nil t nil) (setq-local find-file-literally nil)) (org-babel-tangle)) (when sckan-release (message "running fail block expecting fail") (ow-babel-eval ;; "test-fail-block" "build-sckan-release" nil 'error-on-fail))) (error (message "%s" (cadr err)) (kill-emacs 1))))) #+end_src #+name: reval-setup #+begin_src elisp ;; minimal reval (unless (featurep 'reval) (defvar reval-cache-directory (concat user-emacs-directory "reval/cache/")) (defun reval-minimal (cypher checksum path-or-url &rest alternates) "Simplified and compact implementation of reval." (let* (done (o url-handler-mode) (csn (symbol-name checksum)) (cache-path (concat reval-cache-directory (substring csn 0 2) "/" csn "-" (file-name-nondirectory path-or-url)))) (url-handler-mode) (unwind-protect (cl-loop for path-or-url in (cons cache-path (cons path-or-url alternates)) do (when (file-exists-p path-or-url) (let* ((buffer (find-file-noselect path-or-url)) (buffer-checksum (intern (secure-hash cypher buffer)))) (if (eq buffer-checksum checksum) (progn (unless (string= path-or-url cache-path) (let ((parent-path (file-name-directory cache-path)) make-backup-files) (unless (file-directory-p parent-path) (make-directory parent-path t)) (with-current-buffer buffer (write-file cache-path)))) (eval-buffer buffer) (setq done t)) (kill-buffer buffer) ; kill so cannot accidentally evaled (error "reval: checksum mismatch! %s" path-or-url)))) until done) (unless o (url-handler-mode 0))))) (defalias 'reval #'reval-minimal) (reval 'sha256 '3620321396c967395913ff19ce507555acb92335b0545e4bd05ec0e673a0b33b "https://raw.githubusercontent.com/tgbugs/orgstrap/300b1d5518af53d76d950097bcbcd7046cfa2285/reval.el")) (let ((ghost "https://raw.githubusercontent.com/tgbugs/orgstrap/")) (unless (featurep 'ow) (reval 'sha256 '670c68e5649987fb64a93a7b5610ace0f18a0b71f376faf7499de933247931f2 (concat ghost "021b66c8f1dd4bf55714a4de889f31741f8460f6" "/ow.el")))) #+end_src ** Local Variables :ARCHIVE: # close powershell comment #> # Local Variables: # eval: (progn (setq-local orgstrap-min-org-version "9.3.8") (let ((a (org-version)) (n orgstrap-min-org-version)) (or (fboundp #'orgstrap--confirm-eval) (not n) (string< n a) (string= n a) (error "Your Org is too old! %s < %s" a n))) (defun orgstrap-norm-func--dprp-1-0 (body) (let ((p (read (concat "(progn\n" body "\n)"))) (m '(defun defun-local defmacro defvar defvar-local defconst defcustom)) print-quoted print-length print-level) (cl-labels ((f (b) (cl-loop for e in b when (listp e) do (or (and (memq (car e) m) (let ((n (nthcdr 4 e))) (and (stringp (nth 3 e)) (or (cl-subseq m 3) n) (f n) (or (setcdr (cddr e) n) t)))) (f e))) p)) (prin1-to-string (f p))))) (unless (boundp 'orgstrap-norm-func) (defvar-local orgstrap-norm-func orgstrap-norm-func-name)) (defun orgstrap-norm-embd (body) (funcall orgstrap-norm-func body)) (unless (fboundp #'orgstrap-norm) (defalias 'orgstrap-norm #'orgstrap-norm-embd)) (defun orgstrap--confirm-eval-minimal (lang body) (not (and (member lang '("elisp" "emacs-lisp")) (eq orgstrap-block-checksum (intern (secure-hash orgstrap-cypher (orgstrap-norm body))))))) (unless (fboundp #'orgstrap--confirm-eval) (defalias 'orgstrap--confirm-eval #'orgstrap--confirm-eval-minimal)) (let (enable-local-eval) (vc-find-file-hook)) (let ((ocbe org-confirm-babel-evaluate) (obs (org-babel-find-named-block "orgstrap"))) (if obs (unwind-protect (save-excursion (setq-local orgstrap-norm-func orgstrap-norm-func-name) (setq-local org-confirm-babel-evaluate #'orgstrap--confirm-eval) (goto-char obs) (org-babel-execute-src-block)) (when (eq org-confirm-babel-evaluate #'orgstrap--confirm-eval) (setq-local org-confirm-babel-evaluate ocbe)) (ignore-errors (org-set-visibility-according-to-property))) (warn "No orgstrap block.")))) # End: