ICode9

精准搜索请尝试: 精确搜索
  • python – PySpark,通过JSON文件导入模式2019-10-06 04:57:08

    tbschema.json看起来像这样: [{"TICKET":"integer","TRANFERRED":"string","ACCOUNT":"STRING"}] 我使用以下代码加载它 >>> df2 = sqlContext.jsonFile("tbschema.json") >>> f2.schema StructT

  • python – PySpark – 将列表作为参数传递给UDF2019-10-06 01:55:17

    我需要将列表传递给UDF,列表将确定距离的分数/类别.就目前而言,我很难将所有距离编码为第4分. a= spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"]) from pyspark.sql.functions import udf def cate(label, feature_list):

  • python – pyspark错误:AttributeError:’SparkSession’对象没有属性’parallelize’2019-10-05 21:55:17

    我在Jupyter笔记本上使用pyspark.以下是Spark设置的方式: import findspark findspark.init(spark_home='/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive', python_path='python2.7') import pyspark from pyspark.sql import * sc

  • python – PySpark:使用过滤函数后取一列的平均值2019-10-05 19:56:31

    我使用以下代码来获得薪水大于某个阈值的人的平均年龄. dataframe.filter(df['salary'] > 100000).agg({"avg": "age"}) 列的年龄是数字(浮点数),但我仍然收到此错误. py4j.protocol.Py4JJavaError: An error occurred while calling o86.agg. : scala.MatchError: age (of cla

  • 如何在Spark SQL中的多个列上进行数据透视?2019-10-04 12:58:12

    我需要在pyspark数据帧中转动多个列.示例数据框, >>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)] >>> mydf = spark.createDataFr

  • python – 向Spark DataFrame添加一个空列2019-10-04 03:58:16

    如在Web上的many other locations中所述,向现有DataFrame添加新列并不简单.不幸的是,拥有此功能非常重要(即使它在分布式环境中效率低下),尤其是在尝试使用unionAll连接两个DataFrame时. 将空列添加到DataFrame以便于unionAll的最优雅的解决方法是什么? 我的版本是这样的: from pysp

  • 如何使用PySpark加载IPython shell2019-10-03 22:05:41

    我想加载IPython shell(不是IPython笔记本),我可以通过命令行使用PySpark.那可能吗? 我安装了Spark-1.4.1.解决方法:如果你使用Spark< 1.2你可以简单地用环境变量IPYTHON = 1执行bin / pyspark. IPYTHON=1 /path/to/bin/pyspark 要么 export IPYTHON=1 /path/to/bin/pyspark 虽

  • python – PySpark用其他列中的值替换列中的null2019-10-02 11:57:27

    我想用一个相邻列中的值替换一列中的空值,例如,如果我有 A|B 0,1 2,null 3,null 4,2 我希望它是: A|B 0,1 2,2 3,3 4,2 试过 df.na.fill(df.A,"B") 但是没有用,它说值应该是float,int,long,string或dict 有任何想法吗?解决方法:最后找到了另一种选择: df.withColumn("B",coales

  • mysql – 使用for循环数组时使用INSERT INTO表ON DUPLICATE KEY时出错2019-10-02 08:16:28

    我正在使用pyspark框架更新mysql数据库,并在AWS Glue服务上运行. 我有一个数据帧如下: df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103&qu

  • python – 可以在Spark中按组扩展数据吗?2019-10-02 04:59:13

    我想用StandardScaler(来自pyspark.mllib.feature导入StandardScaler)来扩展数据,现在我可以通过将RDD的值传递给transform函数来实现,但问题是我想保留密钥.无论如何,我通过保留其密钥来扩展我的数据? 样本数据集 0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.0

  • python – PySpark DataFrame上的Sum运算在type为fine时给出TypeError2019-10-02 01:57:20

    我在PySpark中有这样的DataFrame(这是take(3)的结果,数据帧非常大): sc = SparkContext() df = [Row(owner=u'u1', a_d=0.1), Row(owner=u'u2', a_d=0.0), Row(owner=u'u1', a_d=0.3)] 同一所有者将拥有更多行.我需要做的是在分组之后将每个所有者的字段a_d的值相加为 b = df.gro

  • python – PySpark:StructField(…,…,False)总是返回`nullable = true`而不是`nullable = false`2019-10-02 01:56:44

    我是PySpark的新手,面临一个奇怪的问题.我正在尝试在加载CSV数据集时将某些列设置为不可为空.我可以用一个非常小的数据集(test.csv)重现我的情况: col1,col2,col3 11,12,13 21,22,23 31,32,33 41,42,43 51,,53 在第5行第2列有一个空值,我不想在我的DF中获得该行.我将所有字段设置

  • python – PySpark:TypeError:condition应该是string或Column2019-10-02 00:56:43

    我试图过滤基于如下的RDD: spark_df = sc.createDataFrame(pandas_df) spark_df.filter(lambda r: str(r['target']).startswith('good')) spark_df.take(5) 但是得到了以下错误: TypeErrorTraceback (most recent call last) <ipython-input-8-86cfb363dd8b> in &l

  • 在从其他列派生的数据框中添加新列(Spark)2019-10-01 08:58:33

    我正在使用Spark 1.3.0和Python.我有一个数据框,我希望添加一个从其他列派生的附加列.像这样, >>old_df.columns [col_1, col_2, ..., col_m] >>new_df.columns [col_1, col_2, ..., col_m, col_n] 哪里 col_n = col_3 - col_4 我如何在PySpark中执行此操作?解决方法:实现这一

  • python – Pyspark从日期到字符串更改列的类型2019-10-01 07:57:29

    我有以下数据帧: corr_temp_df [('vacationdate', 'date'), ('valueE', 'string'), ('valueD', 'string'), ('valueC', 'string'), ('valueB', 'string'), ('value

  • python – 将数据从Dataframe传递到现有ML VectorIndexerModel时出错2019-09-29 20:58:15

    我有一个Dataframe,我想用它来预测现有的模型.使用模型的transform方法时出错. 这就是我处理trainingdata的方法. forecast.printSchema() 我的Dataframe的架构: root |-- PM10: double (nullable = false) |-- rain_3h: double (nullable = false) |-- is_rain: double (null

  • python – 使用UDF忽略条件2019-09-29 20:55:22

    假设您有以下pyspark DataFrame: data= [('foo',), ('123',), (None,), ('bar',)] df = sqlCtx.createDataFrame(data, ["col"]) df.show() #+----+ #| col| #+----+ #| foo| #| 123| #|null| #| bar| #+----+ 接下来的两个代码块应该做同样的事情 – 也就是说,

  • python – pyspark解析固定宽度的文本文件2019-09-29 17:58:21

    试图解析固定宽度的文本文件. 我的文本文件如下所示,我需要一个行id,日期,字符串和整数: 00101292017you1234 00201302017 me5678 我可以使用sc.textFile(path)将文本文件读取到RDD. 我可以使用解析的RDD和模式createDataFrame.这是在这两个步骤之间的解析.解决方法:Spark的substr

  • kafka+pyspark2019-09-29 17:50:44

    安装kafka kafka 三部分 server producer consumer pyspark 监控 一、环境部署 1.导入对应版本的spark-streaming-kafka-*-*.jar 2.相应jar追加到SPARK_DIST_CLASSPATH 二、kafka+spark测试 1.启动kafka的server和producer 2.代码 from pyspark.streaming.kafka import KafkaUtils

  • python – PySpark.将Dataframe传递给pandas_udf并返回一个系列2019-09-29 15:59:51

    我正在使用PySpark的新pandas_udf装饰器,我试图让它将多列作为输入并返回一个系列作为输入,但是,我得到一个TypeError:无效的参数 示例代码 @pandas_udf(df.schema, PandasUDFType.SCALAR) def fun_function(df_in): df_in.loc[df_in['a'] < 0] = 0.0 return (df_in['a'] -

  • python – 遇到丢失的功能时,Apache Spark会抛出NullPointerException2019-09-29 15:57:34

    在为要素中的字符串列编制索引时,我对PySpark有一个奇怪的问题.这是我的tmp.csv文件: x0,x1,x2,x3 asd2s,1e1e,1.1,0 asd2s,1e1e,0.1,0 ,1e3e,1.2,0 bd34t,1e1e,5.1,1 asd2s,1e3e,0.2,0 bd34t,1e2e,4.3,1 我在’x0’中有一个缺失值. 首先,我正在使用pyspark_csv:https://github.co

  • SparkException:Python工作者没有及时连接2019-09-29 14:58:26

    我正在尝试向2个工作节点Spark集群提交Python作业,但我一直看到以下问题,最终导致spark-submit失败: 15/07/04 21:30:40 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 2, workernode0.rhom-spark.b9.internal.cloudapp.net): org.apache.spark.SparkExcep

  • python – Pyspark RDD ReduceByKey多功能2019-09-29 08:56:13

    我有一个名为DF的PySpark DataFrame,带有(K,V)对. 我想用ReduceByKey应用多个函数.例如,我有以下三个简单的功能: def sumFunc(a,b): return a+b def maxFunc(a,b): return max(a,b) def minFunc(a,b): return min(a,b) 当我只应用一个函数时,例如,以下三个函数: DF.reduceByKey

  • python – 从数据帧中获取值2019-09-29 07:56:54

    在Scala中,我可以获取(#)或getAs [Type](#)来获取数据帧中的值.我应该如何在pyspark中做到这一点? 我有两列DataFrame:item(字符串)和salesNum(整数).我做了一个groupby,意思是得到这样的数字的平均值: saleDF.groupBy( “salesNum”).意思是()).收集() 它的工作原理.现在我在数据框中

  • python – PySpark使用dict创建新列2019-09-28 19:59:00

    使用Spark 1.6,我有一个Spark DataFrame列(名为let,比如col1),其值为A,B,C,DS,DNS,E,F,G和H,我想用值创建一个新列(比如col2)从下面的词典中,我该如何映射? (所以f.i.’A’需要映射到’S’等……) dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S'

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有