Source code for xorbits.core.execution

# Copyright 2022-2023 XProbe Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import builtins

from .adapter import MarsEntity, mars_execute
from .data import DataRef


def _get_mars_entity(ref: DataRef) -> MarsEntity:
    mars_entity = getattr(ref.data, "_mars_entity", None)
    if mars_entity is not None:
        return mars_entity
    else:  # pragma: no cover
        raise NotImplementedError(
            f"Unable to execute an instance of {type(ref).__name__} "
        )


[docs]def run(obj: DataRef | list[DataRef] | tuple[DataRef], **kwargs) -> None: """ Manually trigger execution. Parameters ---------- obj : DataRef or collection of DataRefs DataRef or collection of DataRefs to execute. """ refs = [] if isinstance(obj, DataRef): refs.append(obj) else: refs.extend(obj) refs_to_execute = _collect_executable_user_ns_refs(refs) for ref in refs: if id(ref) not in refs_to_execute and need_to_execute(ref): refs_to_execute[id(ref)] = ref mars_tileables = [_get_mars_entity(ref) for ref in refs_to_execute.values()] if mars_tileables: mars_execute(mars_tileables, **kwargs)
def need_to_execute(ref: DataRef) -> bool: mars_entity = _get_mars_entity(ref) return ( hasattr(mars_entity, "_executed_sessions") and len(getattr(mars_entity, "_executed_sessions")) == 0 ) def _is_in_final_results(ref: DataRef, results: list[DataRef]): mars_entity = _get_mars_entity(ref) result_mars_entities = [_get_mars_entity(result) for result in results] for result_mars_entity in result_mars_entities: stack = [result_mars_entity] while stack: e = stack.pop() if e.key == mars_entity.key: return True stack.extend(e.inputs) return False def _collect_executable_user_ns_refs(result_refs: list[DataRef]) -> dict[int, DataRef]: """ Collect DataRefs defined in user's interactive namespace that are able to execute. """ def _is_ipython_output_cache(name: str): # _, __, ___ or _<n> where <n> stands for the output line number. return name in ["_", "__", "___"] or name.startswith("_") and name[1:].isdigit() if not _is_interactive() or not _is_ipython_available(): return {} ipython = getattr(builtins, "get_ipython")() return dict( (id(v), v) for k, v in ipython.user_ns.items() if isinstance(v, DataRef) and not _is_ipython_output_cache(k) and need_to_execute(v) and _is_in_final_results(v, result_refs) ) def _is_interactive() -> bool: import sys # See: https://stackoverflow.com/a/64523765/7098025 return hasattr(sys, "ps1") def _is_ipython_available() -> bool: return ( hasattr(builtins, "get_ipython") and getattr(builtins, "get_ipython", None) is not None )