Customizing your own dataset

The first thing to know about Graph4NLP’s Dataset class is that the basic element for a dataset is DataItem, which can be arbitrary collection of data instances, including natural language sentences (string), token list (list) or graphs. In this guide we will use the JOBS dataset as an example to walk readers through the process of customizing a dataset.

Customizing DataItem

The base class for data item is DataItem. DataItem has an abstract method extract(), which returns the input and output tokens. To create your own DataItem class, simply inherit the base class and implement the extract() method.

The Jobs dataset inherits the Text2TextDataset base class, which uses Text2TextDataItem as its composing elements. Text2TextDataItem implements its own extract method which returns the list(s) of tokens contained in the text graph.

class Text2TextDataItem(DataItem):
    def __init__(self, input_text, output_text, tokenizer, share_vocab=True):
        super(Text2TextDataItem, self).__init__(input_text, tokenizer)
        self.output_text = output_text
        self.share_vocab = share_vocab
    def extract(self):
        g: GraphData = self.graph
        input_tokens = []
        for i in range(g.get_node_num()):
            if self.tokenizer is None:
                tokenized_token = g.node_attributes[i]['token'].strip().split(' ')
            else:
                tokenized_token = self.tokenizer(g.node_attributes[i]['token'])

            input_tokens.extend(tokenized_token)

        if self.tokenizer is None:
            output_tokens = self.output_text.strip().split(' ')
        else:
            output_tokens = self.tokenizer(self.output_text)

        if self.share_vocab:
            return input_tokens + output_tokens
        else:
            return input_tokens, output_tokens


class JobsDataset(Text2TextDataset):
        def __init__(self, root_dir,
             topology_builder, topology_subdir,
            #  pretrained_word_emb_file=None,
             pretrained_word_emb_name="6B",
             pretrained_word_emb_url=None,
             pretrained_word_emb_cache_dir=None,
             graph_type='static',
             merge_strategy="tailhead", edge_strategy=None,
             seed=None,
             word_emb_size=300, share_vocab=True, lower_case=True,
             thread_number=1, port=9000,
             dynamic_graph_type=None,
             dynamic_init_topology_builder=None,
             dynamic_init_topology_aux_args=None,
             for_inference=None,
             reused_vocab_model=None):
    # Initialize the dataset. If the preprocessed files are not found, then do the preprocessing and save them.
    super(JobsDataset, self).__init__(root_dir=root_dir, topology_builder=topology_builder,
                                      topology_subdir=topology_subdir, graph_type=graph_type,
                                      edge_strategy=edge_strategy, merge_strategy=merge_strategy,
                                      share_vocab=share_vocab, lower_case=lower_case,
                                      pretrained_word_emb_name=pretrained_word_emb_name, pretrained_word_emb_url=pretrained_word_emb_url, pretrained_word_emb_cache_dir=pretrained_word_emb_cache_dir,
                                      seed=seed, word_emb_size=word_emb_size,
                                      thread_number=thread_number, port=port,
                                      dynamic_graph_type=dynamic_graph_type,
                                      dynamic_init_topology_builder=dynamic_init_topology_builder,
                                      dynamic_init_topology_aux_args=dynamic_init_topology_aux_args,
                                      for_inference=for_inference,
                                      reused_vocab_model=reused_vocab_model)

Customizing downloading

Downloading can be decomposed into 2 steps: 1) check whether file exist and 2) download the missing files. To customize checking, the file names must be specified:

@property
def raw_file_names(self):
    """3 reserved keys: 'train', 'val' (optional), 'test'. Represent the split of dataset."""
    return {'train': 'train.txt', 'test': 'test.txt'}

The file names will be concatenated with self.raw_dir to compose the complete file path. To customize downloading, simply override the download() method, since the root downloading method in the base class Dataset is defined in such a way.

class Dataset:
    def _download(self):
        if all([os.path.exists(raw_path) for raw_path in self.raw_file_paths.values()]):
            return

        os.makedirs(self.raw_dir, exist_ok=True)
        self.download()

    @abc.abstractmethod
    def download(self):
        """Download the raw data from the Internet."""
        raise NotImplementedError

Customizing processing

Similar to the way we customize downloading, processing consists of the same set of sub-steps. Except for an additional check for split ratio. It first checks if the processed files exist, and directly load these files if they exist in the file system. Otherwise it will perform several pre-processing steps, namely build_topology, build_vocab and vectorization.

def _process(self):
    if all([os.path.exists(processed_path) for processed_path in self.processed_file_paths.values()]):
        if 'val_split_ratio' in self.__dict__:
            UserWarning(
                "Loading existing processed files on disk. Your `val_split_ratio` might not work since the data have"
                "already been split.")
        return
    if self.for_inference and \
            all([(os.path.exists(processed_path) or self.processed_file_names['data'] not in processed_path) for
                 processed_path in self.processed_file_paths.values()]):
        return

    os.makedirs(self.processed_dir, exist_ok=True)

    self.read_raw_data()

    if self.for_inference:
        self.test = self.build_topology(self.test)
        self.vectorization(self.test)
        data_to_save = {'test': self.test}
        torch.save(data_to_save, self.processed_file_paths['data'])
    else:
        self.train = self.build_topology(self.train)
        self.test = self.build_topology(self.test)
        if 'val' in self.__dict__:
            self.val = self.build_topology(self.val)

        self.build_vocab()

        self.vectorization(self.train)
        self.vectorization(self.test)
        if 'val' in self.__dict__:
            self.vectorization(self.val)

        data_to_save = {'train': self.train, 'test': self.test}
        if 'val' in self.__dict__:
            data_to_save['val'] = self.val
        torch.save(data_to_save, self.processed_file_paths['data'])

        vocab_to_save = self.vocab_model
        torch.save(vocab_to_save, self.processed_file_paths['vocab'])

build_topology builds text graph for each DataItem in the dataset and bind it to the corresponding DataItem object. This routine usually involves functions provided by the GraphConstruction module. Besides, since the construction of each individual text graph is independent of each other, the construction of multiple graphs can be done concurrently, which involves the multiprocessing module of Python.

build_vocab takes all the tokens that have appeared in the data items and build a vocabulary out of it. By default, the VocabModel in graph4nlp.utils.vocab_utils.VocabModel takes the responsibility of constructing a vocabulary and represents the vocabulary itself. The constructed vocabulary will become a member of the Dataset instance.

vectorization is a lookup step, which converts the tokens from ASCII characters to word embeddings. Since there are various ways to assign embedding vectors to tokens, this step is usually overridden by the downstream classes.

In Jobs, these pre-processing steps are implemented in its base classes: Text2TextDataset and Dataset:

class Dataset:
    def build_topology(self, data_items):
        """
        Build graph topology for each item in the dataset. The generated graph is bound to the `graph` attribute of the
        DataItem.
        """
        total = len(data_items)
        thread_number = min(total, self.thread_number)
        pool = Pool(thread_number)
        res_l = []
        for i in range(thread_number):
            start_index = total * i // thread_number
            end_index = total * (i + 1) // thread_number

            """
            data_items, topology_builder,
                                graph_type, dynamic_graph_type, dynamic_init_topology_builder,
                                merge_strategy, edge_strategy, dynamic_init_topology_aux_args,
                                lower_case, tokenizer, port, timeout
            """
            r = pool.apply_async(self._build_topology_process,
                                 args=(data_items[start_index:end_index], self.topology_builder, self.graph_type,
                                       self.dynamic_graph_type, self.dynamic_init_topology_builder,
                                       self.merge_strategy, self.edge_strategy, self.dynamic_init_topology_aux_args,
                                       self.lower_case, self.tokenizer, self.port, self.timeout))
            res_l.append(r)
        pool.close()
        pool.join()

        data_items = []
        for i in range(thread_number):
            res = res_l[i].get()
            for data in res:
                if data.graph is not None:
                    data_items.append(data)

        return data_items

    def build_vocab(self):
        """
        Build the vocabulary. If `self.use_val_for_vocab` is `True`, use both training set and validation set for building
        the vocabulary. Otherwise only the training set is used.

        """
        data_for_vocab = self.train
        if self.use_val_for_vocab:
            data_for_vocab = self.val + data_for_vocab

        vocab_model = VocabModel.build(saved_vocab_file=self.processed_file_paths['vocab'],
                                       data_set=data_for_vocab,
                                       tokenizer=self.tokenizer,
                                       lower_case=self.lower_case,
                                       max_word_vocab_size=self.max_word_vocab_size,
                                       min_word_vocab_freq=self.min_word_vocab_freq,
                                       share_vocab=self.share_vocab,
                                       pretrained_word_emb_name=self.pretrained_word_emb_name,
                                       pretrained_word_emb_url=self.pretrained_word_emb_url,
                                       pretrained_word_emb_cache_dir=self.pretrained_word_emb_cache_dir,
                                       target_pretrained_word_emb_name=self.target_pretrained_word_emb_name,
                                       target_pretrained_word_emb_url=self.target_pretrained_word_emb_url,
                                       word_emb_size=self.word_emb_size)
        self.vocab_model = vocab_model

        return self.vocab_model

class Text2TextDataset:
    def vectorization(self, data_items):
        if self.topology_builder == IEBasedGraphConstruction:
            use_ie = True
        else:
            use_ie = False
        for item in data_items:
            graph: GraphData = item.graph
            token_matrix = []
            for node_idx in range(graph.get_node_num()):
                node_token = graph.node_attributes[node_idx]['token']
                node_token_id = self.vocab_model.in_word_vocab.getIndex(node_token, use_ie)
                graph.node_attributes[node_idx]['token_id'] = node_token_id

                token_matrix.append([node_token_id])
            if self.topology_builder == IEBasedGraphConstruction:
                for i in range(len(token_matrix)):
                    token_matrix[i] = np.array(token_matrix[i][0])
                token_matrix = pad_2d_vals_no_size(token_matrix)
                token_matrix = torch.tensor(token_matrix, dtype=torch.long)
                graph.node_features['token_id'] = token_matrix
                pass
            else:
                token_matrix = torch.tensor(token_matrix, dtype=torch.long)
                graph.node_features['token_id'] = token_matrix

            if use_ie and 'token' in graph.edge_attributes[0].keys():
                edge_token_matrix = []
                for edge_idx in range(graph.get_edge_num()):
                    edge_token = graph.edge_attributes[edge_idx]['token']
                    edge_token_id = self.vocab_model.in_word_vocab.getIndex(edge_token, use_ie)
                    graph.edge_attributes[edge_idx]['token_id'] = edge_token_id
                    edge_token_matrix.append([edge_token_id])
                if self.topology_builder == IEBasedGraphConstruction:
                    for i in range(len(edge_token_matrix)):
                        edge_token_matrix[i] = np.array(edge_token_matrix[i][0])
                    edge_token_matrix = pad_2d_vals_no_size(edge_token_matrix)
                    edge_token_matrix = torch.tensor(edge_token_matrix, dtype=torch.long)
                    graph.edge_features['token_id'] = edge_token_matrix

            tgt = item.output_text
            tgt_token_id = self.vocab_model.out_word_vocab.to_index_sequence(tgt)
            tgt_token_id.append(self.vocab_model.out_word_vocab.EOS)
            tgt_token_id = np.array(tgt_token_id)
            item.output_np = tgt_token_id

Customizing batching

The runtime iteration over dataset is performed by PyTorch’s dataloader. And since the basic composing element is DataItem, it is our job to convert the low-level list of DataItem fetched by torch.DataLoader to the batch data we want. Dataset.collate_fn() is designed to do this job.

class Text2TextDataset:
    @staticmethod
    def collate_fn(data_list: [Text2TextDataItem]):
        graph_list = [item.graph for item in data_list]
        graph_data = to_batch(graph_list)

        output_numpy = [deepcopy(item.output_np) for item in data_list]
        output_str = [deepcopy(item.output_text.lower().strip()) for item in data_list]
        output_pad = pad_2d_vals_no_size(output_numpy)

        tgt_seq = torch.from_numpy(output_pad).long()
        return {
            "graph_data": graph_data,
            "tgt_seq": tgt_seq,
            "output_str": output_str
        }

It takes in a list of DataItem and returns the expected type of data required by the model. Interested readers may refer to the examples we provided in the source code for practical usages.