Skip to content

inspect_function

logger = logging.getLogger(__name__) module-attribute

helper functions

FunctionInspector dataclass

The FunctionInspector does two different loading steps.

  1. Load all the specs from disk with get_specs. This happens once on creation of the object.
  2. On initialization, and before every spec call, go through all the specs and "parse" any for modules we have already imported, which means turning the criteria into in memory objects, we can compare against when inspecting.
Source code in lineapy/execution/inspect_function.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
@dataclass
class FunctionInspector:
    """
    The FunctionInspector does two different loading steps.

    1. Load all the specs from disk with `get_specs`. This happens once on creation of the object.
    2. On initialization, and before every spec call, go through all the specs and "parse" any for modules we have already imported,
       which means turning the criteria into in memory objects, we can compare against when inspecting.
    """

    # Dictionary contains all the specs we haven't parsed yet, because they correspond to un-imported modules
    specs: Dict[str, List[Annotation]] = field(default_factory=get_specs)
    # Annotations we have already parsed, since we have already imported these modules.
    parsed: FunctionInspectorParsed = field(
        default_factory=FunctionInspectorParsed
    )

    def _parse(self) -> None:
        """
        Parses all specs which are for modules we have imported
        """
        for module_name in list(self.specs.keys()):
            module = get_imported_module(module_name)
            if not module:
                continue
            self.parsed.add_annotations(
                module,
                # Pop the spec once we have processed it
                self.specs.pop(module_name),
            )

    def __post_init__(self):
        self._parse()

    def reload_annotations(self) -> None:
        self.specs = get_specs()
        self._parse()

    def inspect(
        self,
        function: Callable,
        args: list[object],
        kwargs: dict[str, object],
        result: object,
    ) -> Iterable[InspectFunctionSideEffect]:
        """
        Inspects a function and returns how calling it mutates the args/result and
        creates view relationships between them.
        """
        # Try re-parsing during each function call, in case other modules were imported we can analyse
        self._parse()
        side_effects = self.parsed.inspect(function, kwargs) or []
        for side_effect in side_effects:
            processed_side_effect = process_side_effect(
                side_effect, args, kwargs, result
            )
            if processed_side_effect:
                yield processed_side_effect

inspect(function, args, kwargs, result)

Inspects a function and returns how calling it mutates the args/result and creates view relationships between them.

Source code in lineapy/execution/inspect_function.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def inspect(
    self,
    function: Callable,
    args: list[object],
    kwargs: dict[str, object],
    result: object,
) -> Iterable[InspectFunctionSideEffect]:
    """
    Inspects a function and returns how calling it mutates the args/result and
    creates view relationships between them.
    """
    # Try re-parsing during each function call, in case other modules were imported we can analyse
    self._parse()
    side_effects = self.parsed.inspect(function, kwargs) or []
    for side_effect in side_effects:
        processed_side_effect = process_side_effect(
            side_effect, args, kwargs, result
        )
        if processed_side_effect:
            yield processed_side_effect

FunctionInspectorParsed dataclass

Contains the parsed function inspector criteria.

Source code in lineapy/execution/inspect_function.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
@dataclass
class FunctionInspectorParsed:
    """
    Contains the parsed function inspector criteria.
    """

    # Function criteria
    function_to_side_effects: Dict[
        Callable, List[InspectFunctionSideEffect]
    ] = field(default_factory=lambda: defaultdict(list))
    # Method criteria
    method_name_to_type_to_side_effects: Dict[
        str, Dict[type, List[InspectFunctionSideEffect]]
    ] = field(default_factory=lambda: defaultdict(lambda: defaultdict(list)))
    # Method keyword argument criteria
    keyword_name_and_value_to_type_to_side_effects: Dict[
        Tuple[str, Hashable], Dict[type, List[InspectFunctionSideEffect]]
    ] = field(default_factory=lambda: defaultdict(lambda: defaultdict(list)))

    def inspect(
        self, fn: Callable, kwargs: Dict[str, object]
    ) -> Optional[List[InspectFunctionSideEffect]]:
        """
        Inspect a function call and return a list of side effects, if it matches any of the annotations
        """
        # We assume a function is a method if it has a __self__ and the __self__  is not a Module
        # Note that for functions defines in C, like `setitem`, they have a __self__, but it's the
        # module they were defined in, in `setitems` case, `operator`, so that's why we need the isinstance
        # check
        obj = getattr(fn, "__self__", None)
        is_method = obj is not None and not isinstance(obj, ModuleType)

        # If it's a function, we just do a simple lookup to see if it's exactly equal to any functions we saved
        if not is_method:
            return self.function_to_side_effects.get(fn, None)
        # If it's a class instance however, we have to consider superclasses, so we first do a lookup
        # on the name, then check for isinstance
        method_name = fn.__name__
        for tp, side_effects in self.method_name_to_type_to_side_effects[
            method_name
        ].items():
            if isinstance(obj, tp):
                return side_effects
        # Finally, if we haven't found something yet, try the keyword names mapping on the method
        for k, v in kwargs.items():
            # Ignore any non hashable keyword args we pass in
            if not isinstance(v, Hashable):
                continue  # type: ignore
            for (
                tp,
                side_effects,
            ) in self.keyword_name_and_value_to_type_to_side_effects[
                (k, v)
            ].items():
                if isinstance(obj, tp):
                    return side_effects
        return None

    def add_annotations(
        self, module: ModuleType, annotations: List[Annotation]
    ) -> None:
        """
        Parse a list of annotations and look them up to add them to our parsed criteria.
        """
        for annotation in annotations:
            self._add_annotation(
                module, annotation.criteria, annotation.side_effects
            )

    def _add_annotation(
        self,
        module: ModuleType,
        criteria: Criteria,
        side_effects: List[InspectFunctionSideEffect],
    ) -> None:
        if isinstance(criteria, KeywordArgumentCriteria):
            class_ = getattr(module, criteria.class_instance, None)
            if class_ is None:
                return None
            self.keyword_name_and_value_to_type_to_side_effects[
                (criteria.keyword_arg_name, criteria.keyword_arg_value)
            ][class_] = side_effects
        elif isinstance(criteria, FunctionNames):
            for name in criteria.function_names:
                fn = getattr(module, name, None)
                if fn is None:
                    return
                self.function_to_side_effects[fn] = side_effects
        elif isinstance(criteria, FunctionName):
            fn = getattr(module, criteria.function_name, None)
            if fn is None:
                return
            self.function_to_side_effects[fn] = side_effects
        elif isinstance(criteria, ClassMethodName):
            tp = getattr(module, criteria.class_instance, None)
            if tp is None:
                return
            self.method_name_to_type_to_side_effects[
                criteria.class_method_name
            ][tp] = side_effects
        elif isinstance(criteria, ClassMethodNames):
            tp = getattr(module, criteria.class_instance, None)
            if tp is None:
                return
            for name in criteria.class_method_names:
                self.method_name_to_type_to_side_effects[name][
                    tp
                ] = side_effects
        else:
            raise NotImplementedError(criteria)

add_annotations(module, annotations)

Parse a list of annotations and look them up to add them to our parsed criteria.

Source code in lineapy/execution/inspect_function.py
259
260
261
262
263
264
265
266
267
268
def add_annotations(
    self, module: ModuleType, annotations: List[Annotation]
) -> None:
    """
    Parse a list of annotations and look them up to add them to our parsed criteria.
    """
    for annotation in annotations:
        self._add_annotation(
            module, annotation.criteria, annotation.side_effects
        )

inspect(fn, kwargs)

Inspect a function call and return a list of side effects, if it matches any of the annotations

Source code in lineapy/execution/inspect_function.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def inspect(
    self, fn: Callable, kwargs: Dict[str, object]
) -> Optional[List[InspectFunctionSideEffect]]:
    """
    Inspect a function call and return a list of side effects, if it matches any of the annotations
    """
    # We assume a function is a method if it has a __self__ and the __self__  is not a Module
    # Note that for functions defines in C, like `setitem`, they have a __self__, but it's the
    # module they were defined in, in `setitems` case, `operator`, so that's why we need the isinstance
    # check
    obj = getattr(fn, "__self__", None)
    is_method = obj is not None and not isinstance(obj, ModuleType)

    # If it's a function, we just do a simple lookup to see if it's exactly equal to any functions we saved
    if not is_method:
        return self.function_to_side_effects.get(fn, None)
    # If it's a class instance however, we have to consider superclasses, so we first do a lookup
    # on the name, then check for isinstance
    method_name = fn.__name__
    for tp, side_effects in self.method_name_to_type_to_side_effects[
        method_name
    ].items():
        if isinstance(obj, tp):
            return side_effects
    # Finally, if we haven't found something yet, try the keyword names mapping on the method
    for k, v in kwargs.items():
        # Ignore any non hashable keyword args we pass in
        if not isinstance(v, Hashable):
            continue  # type: ignore
        for (
            tp,
            side_effects,
        ) in self.keyword_name_and_value_to_type_to_side_effects[
            (k, v)
        ].items():
            if isinstance(obj, tp):
                return side_effects
    return None

get_imported_module(name)

Return a module, if it has been imported.

Also handles the corner case where a submodule has not been imported, but is accessible as an attribute on the parent module. This is needed for the example tensorflow.keras.utils, which is not imported when importing tensorflow, but is accessible as a property of tensorflow.

Source code in lineapy/execution/inspect_function.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def get_imported_module(name: str) -> Optional[ModuleType]:
    """
    Return a module, if it has been imported.

    Also handles the corner case where a submodule has not been imported, but is accessible
    as an attribute on the parent module. This is needed for the example `tensorflow.keras.utils`, which
    is not imported when importing `tensorflow`, but is accessible as a property of `tensorflow`.
    """
    if name in sys.modules:
        return sys.modules[name]
    *parent_names, submodule_name = name.split(".")
    if not parent_names:
        return None
    parent_module = get_imported_module(".".join(parent_names))
    if not parent_module:
        return None
    return getattr(parent_module, submodule_name, None)

get_specs()

yaml specs are for non-built in functions. Captures all the .annotations.yaml files in the lineapy directory.

Source code in lineapy/execution/inspect_function.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_specs() -> Dict[str, List[Annotation]]:
    """
    yaml specs are for non-built in functions.
    Captures all the .annotations.yaml files in the lineapy directory.
    """
    relative_path = "../annotations/**/*.annotations.yaml"

    paths = glob.glob(os.path.join(os.path.dirname(__file__), relative_path))
    paths.extend(
        glob.glob(
            os.path.join(
                Path(
                    options.safe_get("customized_annotation_folder")
                ).resolve(),
                "./*" + CUSTOM_ANNOTATIONS_EXTENSION_NAME,
            )
        )
    )
    valid_specs: Dict[str, List[Annotation]] = defaultdict(list)

    for filename in paths:
        with open(filename, "r") as f:
            doc = yaml.safe_load(f)
            for item in doc:
                v = validate(item)
                if v is None:
                    continue
                valid_specs[v.module].extend(v.annotations)

    return valid_specs

is_mutable(obj)

Returns true if the object is mutable.

Note that currently, tempfile.NamedTemporaryFile() is not mutable, and the semantics is actually correct, because it doesn't end up changing the file system. However, the following registers as normal files (which are mutable).

filename = NamedTemporaryFile().name
handle = open(filename, "wb")
Source code in lineapy/execution/inspect_function.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def is_mutable(obj: object) -> bool:
    """
    Returns true if the object is mutable.

    Note that currently, `tempfile.NamedTemporaryFile()` is not mutable, and
    the semantics is actually correct, because it doesn't end up changing the
    file system. However, the following registers as normal files (which
    are mutable).

    ```python
    filename = NamedTemporaryFile().name
    handle = open(filename, "wb")
    ```
    """

    # We have to special case any types which are hashable, but are mutable.
    # Since there is no way to see if a class is mutable a priori, we could add a list of types
    # like this to our annotations
    mutable_hashable_types: Tuple[type, ...] = (
        ModuleType,
        type,
        type(iter([])),
        IOBase,
    )
    if "sklearn.base" in sys.modules:
        mutable_hashable_types += (sys.modules["sklearn.base"].BaseEstimator,)  # type: ignore

    # Special case some mutable hashable types
    if isinstance(obj, mutable_hashable_types):
        return True

    # Otherwise assume all hashable objects are immutable
    try:
        hash(obj)
    except Exception:
        return True
    return False

new_side_effect_without_all_positional_arg(side_effect, args)

This method must NOT modify the original side_effect, since these annotations are dependent on the runtime values that are different for each call — AllPositionalArgs will have a different set of arguments.

Note that we might need to add something like "all keyword arguments", but that use case hasn't come up yet.

Source code in lineapy/execution/inspect_function.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def new_side_effect_without_all_positional_arg(
    side_effect: ViewOfValues,
    args: list,
) -> ViewOfValues:
    """
    This method must NOT modify the original side_effect, since these
    annotations are dependent on the runtime values that are different
    for each call — AllPositionalArgs will have a different set of arguments.

    Note that we might need to add something like "all keyword arguments", but
    that use case hasn't come up yet.
    """
    new_side_effect = ViewOfValues(views=[])
    for view in side_effect.views:
        new_side_effect.views.append(view.copy(deep=True))
    for i, v in enumerate(new_side_effect.views):
        if isinstance(v, AllPositionalArgs):
            new_side_effect.views.pop(i)
            new_side_effect.views.extend(
                (
                    PositionalArg(positional_argument_index=i)
                    for i, a in enumerate(args)
                )
            )
            return new_side_effect
    return new_side_effect

validate(item)

We cannot filer the specs by module, because it might be loaded later. This causes a bit of inefficiency in our function inspection, but we can fix later if it's a problem.

Source code in lineapy/execution/inspect_function.py
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def validate(item: Dict) -> Optional[ModuleAnnotation]:
    """
    We cannot filer the specs by module, because it might be loaded later.
    This causes a bit of inefficiency in our function inspection, but we
    can fix later if it's a problem.
    """
    try:
        spec = ModuleAnnotation(**item)
        return spec
    except ValidationError as e:
        # want to warn the user but not break the whole thing
        logger.warning(
            f"Validation failed parsing {item} as annotation spec: {e}"
        )
        return None

Was this helpful?

Help us improve docs with your feedback!