从pyspark.ml和管道API开始,我发现自己为典型的预处理任务编写了自定义转换器,以便在管道中使用它们.例子:
Getting started with pyspark.ml and the pipelines API, I find myself writing custom transformers for typical preprocessing tasks in order to use them in a pipeline. Examples:
from pyspark.ml import Pipeline, Transformer class CustomTransformer(Transformer): # lazy workaround - a transformer needs to have these attributes _defaultParamMap = dict() _paramMap = dict() _params = dict() class ColumnSelector(CustomTransformer): """Transformer that selects a subset of columns - to be used as pipeline stage""" def __init__(self, columns): self.columns = columns def _transform(self, data): return data.select(self.columns) class ColumnRenamer(CustomTransformer): """Transformer renames one column""" def __init__(self, rename): self.rename = rename def _transform(self, data): (colNameBefore, colNameAfter) = self.rename return data.withColumnRenamed(colNameBefore, colNameAfter) class NaDropper(CustomTransformer): """ Drops rows with at least one not-a-number element """ def __init__(self, cols=None): self.cols = cols def _transform(self, data): dataAfterDrop = data.dropna(subset=self.cols) return dataAfterDrop class ColumnCaster(CustomTransformer): def __init__(self, col, toType): self.col = col self.toType = toType def _transform(self, data): return data.withColumn(self.col, data[self.col].cast(self.toType))它们可以工作,但是我想知道这是模式还是反模式-这样的转换器是使用管道API的好方法吗?是否有必要实现它们,还是在其他地方提供了等效功能?
They work, but I was wondering if this is a pattern or antipattern - are such transformers a good way to work with the pipeline API? Was it necessary to implement them, or is equivalent functionality provided somewhere else?
推荐答案我会说它主要是基于意见的,尽管它看起来不必要冗长,并且Python Transformers与Pipeline的其余部分集成得不好API.
I'd say it is primarily opinion based, although it looks unnecessarily verbose and Python Transformers don't integrate well with the rest of the Pipeline API.
还值得指出的是,使用SQLTransformer可以轻松实现此处的所有功能.例如:
It is also worth pointing out that everything you have here can be easily achieved with SQLTransformer. For example:
from pyspark.ml.feature import SQLTransformer def column_selector(columns): return SQLTransformer( statement="SELECT {} FROM __THIS__".format(", ".join(columns)) )或
def na_dropper(columns): return SQLTransformer( statement="SELECT * FROM __THIS__ WHERE {}".format( " AND ".join(["{} IS NOT NULL".format(x) for x in columns]) ) )只需花费一点点精力,您就可以将SQLAlchemy与Hive方言结合使用,以避免手写SQL.
With a little bit of effort you can use SQLAlchemy with Hive dialect to avoid handwritten SQL.
更多推荐
pyspark.ml管道:基本预处理任务是否需要自定义转换器?
发布评论