Skip to content

tracer

Tracer dataclass

Source code in lineapy/instrumentation/tracer.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
@dataclass
class Tracer:
    db: RelationalLineaDB

    session_type: InitVar[SessionType]
    session_name: InitVar[Optional[str]] = None
    globals_: InitVar[Optional[Dict[str, object]]] = None

    variable_name_to_node: Dict[str, Node] = field(default_factory=dict)
    module_name_to_node: Dict[str, Node] = field(default_factory=dict)

    tracer_context: TracerContext = field(init=False)
    executor: Executor = field(init=False)
    mutation_tracker: MutationTracker = field(default_factory=MutationTracker)
    control_flow_tracker: ControlFlowTracker = field(
        default_factory=ControlFlowTracker
    )

    def __post_init__(
        self,
        session_type: SessionType,
        session_name: Optional[str],
        globals_: Optional[Dict[str, object]],
    ):
        """
        Tracer is internal to Linea and it implements the "hidden APIs"
        that are setup by the transformer.
        It performs the following key functionalities:
        - Creates the graph nodes and inserts into the database.
        - Maintains data structures to help creating the graph IR
          that is used later, which includes:
          - `variable_name_to_id`: for tracking variable/function/module
            to the ID responsible for its creation
        - Executes the program, using the `Executor`.

        Note that we don't currently maintain the variable names in the persisted
        graph (we used to at some point in the past), but we can add a serialized
        version of `variable_name_to_id` to the session if we want to persist
        the information. Which could be useful for e.g., post-hoc lifting of
        linea artifacts.
        """
        self.executor = Executor(self.db, globals_ or globals())

        session_context = SessionContext(
            id=get_new_id(),
            environment_type=session_type,
            python_version=get_system_python_version(),  # up to minor version
            creation_time=datetime.now(),
            working_directory=getcwd(),
            session_name=session_name,
            execution_id=self.executor.execution.id,
        )
        self.db.write_context(session_context)
        self.tracer_context = TracerContext(
            session_context=session_context, db=self.db
        )

    @property
    def values(self) -> Dict[str, object]:
        """
        Returns a mapping of variable names to their values, by joining
        the scoping information with the executor values.
        """
        return {
            k: self.executor.get_value(n.id)
            for k, n in self.variable_name_to_node.items()
        }

    def process_node(self, node: Node) -> None:
        """
        Execute a node, and adds it to the database.
        """

        ##
        # Update the graph from the side effects of the node,
        # If an artifact could not be created, quietly return without saving
        # the node to the DB.
        ##
        logger.debug("Executing node %s", node)
        try:
            side_effects = self.executor.execute_node(
                node,
                {k: v.id for k, v in self.variable_name_to_node.items()},
            )
        except ArtifactSaveException as exc_info:
            logger.error("Artifact could not be saved.")
            logger.debug(exc_info)
            return
        logger.debug("Processing side effects")

        # Iterate through each side effect and process it, depending on its type
        for e in side_effects:
            if isinstance(e, ImplicitDependencyNode):
                self._process_implicit_dependency(
                    node, self._resolve_pointer(e.pointer)
                )
            elif isinstance(e, ViewOfNodes):
                if len(e.pointers) > 0:  # skip if empty
                    self.mutation_tracker.set_as_viewers_of_each_other(
                        *map(self._resolve_pointer, e.pointers)
                    )
            elif isinstance(e, AccessedGlobals):
                self._process_accessed_globals(
                    node.session_id, node, e.retrieved, e.added_or_updated
                )
            # Mutate case
            else:
                mutated_node_id = self._resolve_pointer(e.pointer)
                for (
                    mutate_node_id,
                    source_id,
                ) in self.mutation_tracker.set_as_mutated(mutated_node_id):
                    mutate_node = MutateNode(
                        id=mutate_node_id,
                        session_id=node.session_id,
                        source_id=source_id,
                        call_id=node.id,
                        control_dependency=self.control_flow_tracker.current_control_dependency(),
                    )
                    self.process_node(mutate_node)

        # also special case for import node
        if isinstance(node, ImportNode):
            # must process after the call has been executed
            package_name, version = get_lib_package_version(node.name)
            node.version = version
            node.package_name = package_name

        self.db.write_node(node)

    def _resolve_pointer(self, ptr: ExecutorPointer) -> LineaID:
        if isinstance(ptr, ID):
            return ptr.id
        if isinstance(ptr, Variable):
            return self.variable_name_to_node[ptr.name].id
        # Handle external state case, by making a lookup node for it
        if isinstance(ptr, ExternalState):
            return (
                self.executor.lookup_external_state(ptr)
                or self.lookup_node(ptr.external_state).id
            )
        raise ValueError(f"Unsupported pointer type: {type(ptr)}")

    def _process_implicit_dependency(
        self, node: Node, implicit_dependency_id: LineaID
    ) -> None:
        """
        Add dependency of a node on a global implicit dependency,
        which is a dependency that lineapy has deemed essential in the
        reproduction of an artifact but is not explicitly passed as arguments
        """

        # Only call nodes can refer to implicit dependencies
        assert isinstance(node, CallNode)
        node.implicit_dependencies.append(
            self.mutation_tracker.get_latest_mutate_node(
                implicit_dependency_id
            )
        )

    def _process_accessed_globals(
        self,
        session_id: str,
        node: Node,
        retrieved: List[str],
        added_or_updated: List[str],
    ) -> None:

        # Only call nodes can access globals and have the global_reads attribute
        assert isinstance(node, CallNode)

        # Add the retrieved globals as global reads to the call node
        node.global_reads = {
            var: self.mutation_tracker.get_latest_mutate_node(
                self.variable_name_to_node[var].id
            )
            for var in retrieved
            # Only save reads from variables that we have already saved variables for
            # Assume that all other reads are for variables assigned inside the call
            if var in self.variable_name_to_node
        }

        # Create a new global node for each added/updated
        for var in added_or_updated:
            global_node = GlobalNode(
                id=get_new_id(),
                session_id=session_id,
                name=var,
                call_id=node.id,
                control_dependency=self.control_flow_tracker.current_control_dependency(),
            )
            self.process_node(global_node)
            self.variable_name_to_node[var] = global_node

    def lookup_node(
        self,
        variable_name: str,
        source_location: Optional[SourceLocation] = None,
    ) -> Node:
        """
        Cases for the node that we are looking up:

        - user defined variable & function definitions
        - imported libs
        - unknown runtime magic functions — special case to LookupNode

          - builtin functions, e.g., min
          - custom runtime, e.g., get_ipython

        """
        if variable_name in self.variable_name_to_node:
            # user define var and fun def
            return self.variable_name_to_node[variable_name]
        elif variable_name in self.module_name_to_node:
            return self.module_name_to_node[variable_name]
        else:
            new_node = LookupNode(
                id=get_new_id(),
                session_id=self.get_session_id(),
                name=variable_name,
                source_location=source_location,
                control_dependency=self.control_flow_tracker.current_control_dependency(),
            )
            self.process_node(new_node)
            return new_node

    def import_module(
        self,
        name: str,
        source_location: Optional[SourceLocation] = None,
    ) -> Node:
        """
        Import a module. If we have already imported it, just return its ID.
        Otherwise, create new module nodes for each submodule in its parents and return it.
        """
        if name in self.module_name_to_node:
            return self.module_name_to_node[name]
        # Recursively go up the tree, to try to get parents, and if we don't have them, import them
        *parents, module_name = name.split(".")
        if parents:
            parent_module = self.import_module(
                ".".join(parents),
                source_location,
            )
            node = self.call(
                self.lookup_node(l_import.__name__),
                source_location,
                self.literal(module_name),
                parent_module,
            )
        else:
            node = self.call(
                self.lookup_node(l_import.__name__),
                source_location,
                self.literal(module_name),
            )
        self.module_name_to_node[name] = node
        return node

    def trace_import(
        self,
        name: str,
        source_location: Optional[SourceLocation] = None,
        alias: Optional[str] = None,
        attributes: Optional[Dict[str, str]] = None,
    ) -> None:
        """
        Parameters
        ----------
        name: str
            the name of the module
        alias: Optional[str]
            the module could be aliased, e.g., import pandas as pd
        attributes: Optional[Dict[str, str]]
            a list of functions imported from the library.
            It keys the aliased name to the original name.

        ??? note
            - The input args would _either_ have alias or attributes, but not both
            - Didn't call the function import because I think that's a protected name

        note that version and path will be introspected at runtime
        """
        module_node = self.import_module(name, source_location)
        if alias:
            self.assign(alias, module_node, from_import=True)
        elif attributes:
            module_value = self.executor.get_value(module_node.id)
            if IMPORT_STAR in attributes:
                """
                Import the module, get all public attributes, and set them as globals
                """
                # Import star behavior copied from python docs
                # https://docs.python.org/3/reference/simple_stmts.html#the-import-statement
                if hasattr(module_value, "__all__"):
                    public_names = module_value.__all__  # type: ignore
                else:
                    public_names = [
                        attr
                        for attr in dir(module_value)
                        if not attr.startswith("_")
                    ]
                attributes = {attr: attr for attr in public_names}
            """
            load module `x`, check if `y` is an attribute of `x`, otherwise load `x.y`
            If `x.y` is a module, load that, otherwise get the `y` attribute of `x`.
            """
            for alias, attr_or_module in attributes.items():
                if hasattr(module_value, attr_or_module):
                    self.assign(
                        alias,
                        self.call(
                            self.lookup_node(GETATTR),
                            source_location,
                            module_node,
                            self.literal(attr_or_module),
                        ),
                        from_import=True,
                    )
                else:
                    full_name = f"{name}.{attr_or_module}"
                    sub_module_node = self.import_module(
                        full_name, source_location
                    )
                    self.assign(alias, sub_module_node, from_import=True)

        else:
            self.assign(name, module_node, from_import=True)

        node = ImportNode(
            id=get_new_id(),
            name=name,
            session_id=self.get_session_id(),
            source_location=source_location,
            control_dependency=self.control_flow_tracker.current_control_dependency(),
        )
        self.process_node(node)

    def literal(
        self,
        value: object,
        source_location: Optional[SourceLocation] = None,
    ):
        # this literal should be assigned or used later
        node = LiteralNode(
            id=get_new_id(),
            session_id=self.get_session_id(),
            value=value,
            source_location=source_location,
            control_dependency=self.control_flow_tracker.current_control_dependency(),
        )
        self.process_node(node)
        return node

    def __get_positional_arguments(self, arguments):
        for arg in arguments:
            if isinstance(arg, tuple) or isinstance(arg, list):
                yield PositionalArgument(
                    id=self.mutation_tracker.get_latest_mutate_node(arg[1].id),
                    starred=arg[0],
                )

            else:
                yield PositionalArgument(
                    id=self.mutation_tracker.get_latest_mutate_node(arg.id),
                    starred=False,
                )

    def __get_keyword_arguments(self, keyword_arguments):
        for k, n in keyword_arguments.items():
            values = self.mutation_tracker.get_latest_mutate_node(n.id)
            if k.startswith("unpack_"):
                yield KeywordArgument(key="**", value=values, starred=True)
            else:
                yield KeywordArgument(key=k, value=values, starred=False)

    def call(
        self,
        function_node: Node,
        source_location: Optional[SourceLocation],
        # function_name: str,
        *arguments: Union[Node, Tuple[bool, Node]],
        **keyword_arguments: Node,
    ) -> CallNode:
        """
        Parameters
        ----------
        function_node: Node
            the function node to call/execute
        source_location: Optional[SourceLocation]
            the source info from user code
        arguments: Union[Node, Tuple[bool, Node]]
            positional arguments. These are passed as either Nodes (named nodes, constants, etc)
            or tuples (starred, the node) where the starred is a boolean to indicate whether
            the argument is supposed to be splatted before passing to the function (This is
            the case where you might call a function like so ``foo(1, *[2, 3])`` ). The boolean is made
            optional simply to support the legacy way of calling this function and not having to pass
            the tuples for every single case from node_transformer
        keyword_arguments: Node
            keyword arguments. These are passed as a dictionary of keyword arguments to the
            function. Similar to ``*positional_arguments``, the keyword arguments can also be splatted
            by naming the key as ``unpack_<index>`` where <index> is the index of the argument. In this
            case, the dictionary will be unpacked and passed as keyword arguments to the function.
            The keyword arguments are processed in order of passing so any keyword conflicts will
            result in the last value accepted as the value for the keyword.

        Returns
        -------
        CallNode
            a call node

        ??? note
            - It's important for the call to return the call node
            so that we can programmatically chain the the nodes together,
            e.g., for the assignment call to modify the previous call node.
            - The call looks up if it's a locally defined function. We decided
            that this is better for program slicing.
        """

        node = CallNode(
            id=get_new_id(),
            session_id=self.get_session_id(),
            function_id=function_node.id,
            positional_args=self.__get_positional_arguments(arguments),
            keyword_args=self.__get_keyword_arguments(keyword_arguments),
            source_location=source_location,
            global_reads={},
            implicit_dependencies=[],
            control_dependency=self.control_flow_tracker.current_control_dependency(),
        )
        self.process_node(node)
        return node

    def get_control_node(
        self,
        type: NodeType,
        node_id: LineaID,
        companion_id: Optional[LineaID],
        source_location: Optional[SourceLocation] = None,
        test_id: Optional[LineaID] = None,
        unexec_id: Optional[LineaID] = None,
    ) -> ControlFlowContext:
        node: ControlNode
        if type == NodeType.IfNode:
            node = IfNode(
                id=node_id,
                session_id=self.get_session_id(),
                source_location=source_location,
                control_dependency=self.control_flow_tracker.current_control_dependency(),
                unexec_id=unexec_id,
                test_id=test_id,
                companion_id=companion_id,
            )
        elif type == NodeType.ElseNode:
            node = ElseNode(
                id=node_id,
                session_id=self.get_session_id(),
                source_location=source_location,
                control_dependency=self.control_flow_tracker.current_control_dependency(),
                companion_id=companion_id,
                unexec_id=unexec_id,
            )
        else:
            raise NotImplementedError(
                "Requested node type is not implemented as a control flow node type: ",
                type,
            )
        self.process_node(node)
        return ControlFlowContext(node, self.control_flow_tracker)

    def assign(
        self, variable_name: str, value_node: Node, from_import: bool = False
    ) -> None:
        """
        Assign updates a local mapping of variable nodes.
        """
        logger.debug("assigning %s = %s", variable_name, value_node)
        existing_value_node = self.variable_name_to_node.get(
            variable_name, None
        )
        if (
            existing_value_node is None
            or existing_value_node.id != value_node.id
            or not from_import
        ):
            self.variable_name_to_node[variable_name] = value_node
            self.db.write_assigned_variable(value_node.id, variable_name)
        return

    def tuple(
        self, *args: Node, source_location: Optional[SourceLocation] = None
    ) -> CallNode:
        return self.call(
            self.lookup_node(l_tuple.__name__),
            source_location,
            *args,
        )

    # tracer context method wrappers from here on
    def get_session_id(self) -> LineaID:
        return self.tracer_context.get_session_id()

    @property
    def graph(self) -> Graph:
        return self.tracer_context.graph

    def session_artifacts(self) -> List[ArtifactORM]:
        return self.tracer_context.session_artifacts()

    @property
    def artifacts(self) -> Dict[str, str]:
        return self.tracer_context.artifacts

    def slice(self, name: str) -> str:
        return self.tracer_context.slice(name)

    def get_working_dir(self) -> str:
        return self.tracer_context.session_context.working_directory

values: Dict[str, object] property

Returns a mapping of variable names to their values, by joining the scoping information with the executor values.

__post_init__(session_type, session_name, globals_)

Tracer is internal to Linea and it implements the "hidden APIs" that are setup by the transformer. It performs the following key functionalities: - Creates the graph nodes and inserts into the database. - Maintains data structures to help creating the graph IR that is used later, which includes: - variable_name_to_id: for tracking variable/function/module to the ID responsible for its creation - Executes the program, using the Executor.

Note that we don't currently maintain the variable names in the persisted graph (we used to at some point in the past), but we can add a serialized version of variable_name_to_id to the session if we want to persist the information. Which could be useful for e.g., post-hoc lifting of linea artifacts.

Source code in lineapy/instrumentation/tracer.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __post_init__(
    self,
    session_type: SessionType,
    session_name: Optional[str],
    globals_: Optional[Dict[str, object]],
):
    """
    Tracer is internal to Linea and it implements the "hidden APIs"
    that are setup by the transformer.
    It performs the following key functionalities:
    - Creates the graph nodes and inserts into the database.
    - Maintains data structures to help creating the graph IR
      that is used later, which includes:
      - `variable_name_to_id`: for tracking variable/function/module
        to the ID responsible for its creation
    - Executes the program, using the `Executor`.

    Note that we don't currently maintain the variable names in the persisted
    graph (we used to at some point in the past), but we can add a serialized
    version of `variable_name_to_id` to the session if we want to persist
    the information. Which could be useful for e.g., post-hoc lifting of
    linea artifacts.
    """
    self.executor = Executor(self.db, globals_ or globals())

    session_context = SessionContext(
        id=get_new_id(),
        environment_type=session_type,
        python_version=get_system_python_version(),  # up to minor version
        creation_time=datetime.now(),
        working_directory=getcwd(),
        session_name=session_name,
        execution_id=self.executor.execution.id,
    )
    self.db.write_context(session_context)
    self.tracer_context = TracerContext(
        session_context=session_context, db=self.db
    )

assign(variable_name, value_node, from_import=False)

Assign updates a local mapping of variable nodes.

Source code in lineapy/instrumentation/tracer.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def assign(
    self, variable_name: str, value_node: Node, from_import: bool = False
) -> None:
    """
    Assign updates a local mapping of variable nodes.
    """
    logger.debug("assigning %s = %s", variable_name, value_node)
    existing_value_node = self.variable_name_to_node.get(
        variable_name, None
    )
    if (
        existing_value_node is None
        or existing_value_node.id != value_node.id
        or not from_import
    ):
        self.variable_name_to_node[variable_name] = value_node
        self.db.write_assigned_variable(value_node.id, variable_name)
    return

call(function_node, source_location, *arguments, **keyword_arguments)

Parameters:

Name Type Description Default
function_node Node

the function node to call/execute

required
source_location Optional[SourceLocation]

the source info from user code

required
arguments Union[Node, Tuple[bool, Node]]

positional arguments. These are passed as either Nodes (named nodes, constants, etc) or tuples (starred, the node) where the starred is a boolean to indicate whether the argument is supposed to be splatted before passing to the function (This is the case where you might call a function like so foo(1, *[2, 3]) ). The boolean is made optional simply to support the legacy way of calling this function and not having to pass the tuples for every single case from node_transformer

()
keyword_arguments Node

keyword arguments. These are passed as a dictionary of keyword arguments to the function. Similar to *positional_arguments, the keyword arguments can also be splatted by naming the key as unpack_<index> where is the index of the argument. In this case, the dictionary will be unpacked and passed as keyword arguments to the function. The keyword arguments are processed in order of passing so any keyword conflicts will result in the last value accepted as the value for the keyword.

{}

Returns:

Type Description
CallNode

a call node

Note
  • It's important for the call to return the call node so that we can programmatically chain the the nodes together, e.g., for the assignment call to modify the previous call node.
  • The call looks up if it's a locally defined function. We decided that this is better for program slicing.
Source code in lineapy/instrumentation/tracer.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def call(
    self,
    function_node: Node,
    source_location: Optional[SourceLocation],
    # function_name: str,
    *arguments: Union[Node, Tuple[bool, Node]],
    **keyword_arguments: Node,
) -> CallNode:
    """
    Parameters
    ----------
    function_node: Node
        the function node to call/execute
    source_location: Optional[SourceLocation]
        the source info from user code
    arguments: Union[Node, Tuple[bool, Node]]
        positional arguments. These are passed as either Nodes (named nodes, constants, etc)
        or tuples (starred, the node) where the starred is a boolean to indicate whether
        the argument is supposed to be splatted before passing to the function (This is
        the case where you might call a function like so ``foo(1, *[2, 3])`` ). The boolean is made
        optional simply to support the legacy way of calling this function and not having to pass
        the tuples for every single case from node_transformer
    keyword_arguments: Node
        keyword arguments. These are passed as a dictionary of keyword arguments to the
        function. Similar to ``*positional_arguments``, the keyword arguments can also be splatted
        by naming the key as ``unpack_<index>`` where <index> is the index of the argument. In this
        case, the dictionary will be unpacked and passed as keyword arguments to the function.
        The keyword arguments are processed in order of passing so any keyword conflicts will
        result in the last value accepted as the value for the keyword.

    Returns
    -------
    CallNode
        a call node

    ??? note
        - It's important for the call to return the call node
        so that we can programmatically chain the the nodes together,
        e.g., for the assignment call to modify the previous call node.
        - The call looks up if it's a locally defined function. We decided
        that this is better for program slicing.
    """

    node = CallNode(
        id=get_new_id(),
        session_id=self.get_session_id(),
        function_id=function_node.id,
        positional_args=self.__get_positional_arguments(arguments),
        keyword_args=self.__get_keyword_arguments(keyword_arguments),
        source_location=source_location,
        global_reads={},
        implicit_dependencies=[],
        control_dependency=self.control_flow_tracker.current_control_dependency(),
    )
    self.process_node(node)
    return node

import_module(name, source_location=None)

Import a module. If we have already imported it, just return its ID. Otherwise, create new module nodes for each submodule in its parents and return it.

Source code in lineapy/instrumentation/tracer.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def import_module(
    self,
    name: str,
    source_location: Optional[SourceLocation] = None,
) -> Node:
    """
    Import a module. If we have already imported it, just return its ID.
    Otherwise, create new module nodes for each submodule in its parents and return it.
    """
    if name in self.module_name_to_node:
        return self.module_name_to_node[name]
    # Recursively go up the tree, to try to get parents, and if we don't have them, import them
    *parents, module_name = name.split(".")
    if parents:
        parent_module = self.import_module(
            ".".join(parents),
            source_location,
        )
        node = self.call(
            self.lookup_node(l_import.__name__),
            source_location,
            self.literal(module_name),
            parent_module,
        )
    else:
        node = self.call(
            self.lookup_node(l_import.__name__),
            source_location,
            self.literal(module_name),
        )
    self.module_name_to_node[name] = node
    return node

lookup_node(variable_name, source_location=None)

Cases for the node that we are looking up:

  • user defined variable & function definitions
  • imported libs
  • unknown runtime magic functions — special case to LookupNode

  • builtin functions, e.g., min

  • custom runtime, e.g., get_ipython
Source code in lineapy/instrumentation/tracer.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def lookup_node(
    self,
    variable_name: str,
    source_location: Optional[SourceLocation] = None,
) -> Node:
    """
    Cases for the node that we are looking up:

    - user defined variable & function definitions
    - imported libs
    - unknown runtime magic functions &mdash; special case to LookupNode

      - builtin functions, e.g., min
      - custom runtime, e.g., get_ipython

    """
    if variable_name in self.variable_name_to_node:
        # user define var and fun def
        return self.variable_name_to_node[variable_name]
    elif variable_name in self.module_name_to_node:
        return self.module_name_to_node[variable_name]
    else:
        new_node = LookupNode(
            id=get_new_id(),
            session_id=self.get_session_id(),
            name=variable_name,
            source_location=source_location,
            control_dependency=self.control_flow_tracker.current_control_dependency(),
        )
        self.process_node(new_node)
        return new_node

process_node(node)

Execute a node, and adds it to the database.

Source code in lineapy/instrumentation/tracer.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def process_node(self, node: Node) -> None:
    """
    Execute a node, and adds it to the database.
    """

    ##
    # Update the graph from the side effects of the node,
    # If an artifact could not be created, quietly return without saving
    # the node to the DB.
    ##
    logger.debug("Executing node %s", node)
    try:
        side_effects = self.executor.execute_node(
            node,
            {k: v.id for k, v in self.variable_name_to_node.items()},
        )
    except ArtifactSaveException as exc_info:
        logger.error("Artifact could not be saved.")
        logger.debug(exc_info)
        return
    logger.debug("Processing side effects")

    # Iterate through each side effect and process it, depending on its type
    for e in side_effects:
        if isinstance(e, ImplicitDependencyNode):
            self._process_implicit_dependency(
                node, self._resolve_pointer(e.pointer)
            )
        elif isinstance(e, ViewOfNodes):
            if len(e.pointers) > 0:  # skip if empty
                self.mutation_tracker.set_as_viewers_of_each_other(
                    *map(self._resolve_pointer, e.pointers)
                )
        elif isinstance(e, AccessedGlobals):
            self._process_accessed_globals(
                node.session_id, node, e.retrieved, e.added_or_updated
            )
        # Mutate case
        else:
            mutated_node_id = self._resolve_pointer(e.pointer)
            for (
                mutate_node_id,
                source_id,
            ) in self.mutation_tracker.set_as_mutated(mutated_node_id):
                mutate_node = MutateNode(
                    id=mutate_node_id,
                    session_id=node.session_id,
                    source_id=source_id,
                    call_id=node.id,
                    control_dependency=self.control_flow_tracker.current_control_dependency(),
                )
                self.process_node(mutate_node)

    # also special case for import node
    if isinstance(node, ImportNode):
        # must process after the call has been executed
        package_name, version = get_lib_package_version(node.name)
        node.version = version
        node.package_name = package_name

    self.db.write_node(node)

trace_import(name, source_location=None, alias=None, attributes=None)

Parameters:

Name Type Description Default
name str

the name of the module

required
alias Optional[str]

the module could be aliased, e.g., import pandas as pd

None
attributes Optional[Dict[str, str]]

a list of functions imported from the library. It keys the aliased name to the original name.

None
Note
  • The input args would either have alias or attributes, but not both
  • Didn't call the function import because I think that's a protected name

note that version and path will be introspected at runtime

Source code in lineapy/instrumentation/tracer.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def trace_import(
    self,
    name: str,
    source_location: Optional[SourceLocation] = None,
    alias: Optional[str] = None,
    attributes: Optional[Dict[str, str]] = None,
) -> None:
    """
    Parameters
    ----------
    name: str
        the name of the module
    alias: Optional[str]
        the module could be aliased, e.g., import pandas as pd
    attributes: Optional[Dict[str, str]]
        a list of functions imported from the library.
        It keys the aliased name to the original name.

    ??? note
        - The input args would _either_ have alias or attributes, but not both
        - Didn't call the function import because I think that's a protected name

    note that version and path will be introspected at runtime
    """
    module_node = self.import_module(name, source_location)
    if alias:
        self.assign(alias, module_node, from_import=True)
    elif attributes:
        module_value = self.executor.get_value(module_node.id)
        if IMPORT_STAR in attributes:
            """
            Import the module, get all public attributes, and set them as globals
            """
            # Import star behavior copied from python docs
            # https://docs.python.org/3/reference/simple_stmts.html#the-import-statement
            if hasattr(module_value, "__all__"):
                public_names = module_value.__all__  # type: ignore
            else:
                public_names = [
                    attr
                    for attr in dir(module_value)
                    if not attr.startswith("_")
                ]
            attributes = {attr: attr for attr in public_names}
        """
        load module `x`, check if `y` is an attribute of `x`, otherwise load `x.y`
        If `x.y` is a module, load that, otherwise get the `y` attribute of `x`.
        """
        for alias, attr_or_module in attributes.items():
            if hasattr(module_value, attr_or_module):
                self.assign(
                    alias,
                    self.call(
                        self.lookup_node(GETATTR),
                        source_location,
                        module_node,
                        self.literal(attr_or_module),
                    ),
                    from_import=True,
                )
            else:
                full_name = f"{name}.{attr_or_module}"
                sub_module_node = self.import_module(
                    full_name, source_location
                )
                self.assign(alias, sub_module_node, from_import=True)

    else:
        self.assign(name, module_node, from_import=True)

    node = ImportNode(
        id=get_new_id(),
        name=name,
        session_id=self.get_session_id(),
        source_location=source_location,
        control_dependency=self.control_flow_tracker.current_control_dependency(),
    )
    self.process_node(node)

Was this helpful?

Help us improve docs with your feedback!