处理大数据集的建议

发表于:2018-4-11 10:23  作者:不正经数据科学家   来源:个人博客

字体: | 上一篇 | 下一篇 |我要投稿 | 推荐标签: 软件开发 大数据

  一般来说,只有16G的内存的“小”电脑都无法直接处理这种数据集了,本文收集了一些关于处理这种数据的建议,供大家参考。
  1.及时删除无用变量并垃圾回收
  通常我们在特征工程中会涉及大量的转换操作,产生很多的中间变量等,除了使用del以外,使用gc.collect()也是个不错的选择。
  temp = pd.read_csv('../input/train_sample.csv')
  #do something to the file
  temp['os'] = temp['os'].astype('str')
  #delete when no longer needed
  del temp
  #collect residual garbage
  gc.collect()
  2.预定义数据类型
  pandas一般会自己推断数据类型,不过倾向于使用耗费空间大的,如下面例子所示,预定义数据类型节省了超过一半的空间。
  dtypes = {
          'ip'            : 'uint32',
          'app'           : 'uint16',
          'device'        : 'uint16',
          'os'            : 'uint16',
          'channel'       : 'uint16',
          'is_attributed' : 'uint8',
          }
  dtypes2 = {
          'ip'            : 'int32',
          'app'           : 'int16',
          'device'        : 'int16',
          'os'            : 'int16',
          'channel'       : 'int16',
          'is_attributed' : 'int8',
          }
  train = pd.read_csv(train_sample_file,parse_dates=['click_time'])
  #check datatypes:
  train.info()
  train = pd.read_csv(train_sample_file,dtype=dtypes,parse_dates=['click_time'])
  #check datatypes:
  train.info()
  train = pd.read_csv(train_sample_file,dtype=dtypes2,parse_dates=['click_time'])
  #check datatypes:
  train.info()
  '''
  <class 'pandas.core.frame.DataFrame'>
  RangeIndex: 100000 entries, 0 to 99999
  Data columns (total 8 columns):
  ip                 100000 non-null int64
  app                100000 non-null int64
  device             100000 non-null int64
  os                 100000 non-null int64
  channel            100000 non-null int64
  click_time         100000 non-null datetime64[ns]
  attributed_time    227 non-null object
  is_attributed      100000 non-null int64
  dtypes: datetime64[ns](1), int64(6), object(1)
  memory usage: 6.1+ MB
  <class 'pandas.core.frame.DataFrame'>
  RangeIndex: 100000 entries, 0 to 99999
  Data columns (total 8 columns):
  ip                 100000 non-null uint32
  app                100000 non-null uint16
  device             100000 non-null uint16
  os                 100000 non-null uint16
  channel            100000 non-null uint16
  click_time         100000 non-null datetime64[ns]
  attributed_time    227 non-null object
  is_attributed      100000 non-null uint8
  dtypes: datetime64[ns](1), object(1), uint16(4), uint32(1), uint8(1)
  memory usage: 2.8+ MB
  <class 'pandas.core.frame.DataFrame'>
  RangeIndex: 100000 entries, 0 to 99999
  Data columns (total 8 columns):
  ip                 100000 non-null int32
  app                100000 non-null int16
  device             100000 non-null int16
  os                 100000 non-null int16
  channel            100000 non-null int16
  click_time         100000 non-null datetime64[ns]
  attributed_time    227 non-null object
  is_attributed      100000 non-null int8
  dtypes: datetime64[ns](1), int16(4), int32(1), int8(1), object(1)
  memory usage: 2.8+ MB
  '''
  3.只使用csv文件内的指定行
  a) 指定行数
  直接使用nrows指定
  train = pd.read_csv('../input/train.csv', nrows=1e5, dtype=dtypes)
  b) 跳过行数
  比如我们跳过前500w取100w下面保留了head,
  train = pd.read_csv('../input/train.csv', skiprows=range(1, 5000000), nrows=1000000, dtype=dtypes)
  c) sampling
  import subprocess
  print('# Line count:')
  for file in ['train.csv', 'test.csv', 'train_sample.csv']:
      lines = subprocess.run(['wc', '-l', '../input/{}'.format(file)], stdout=subprocess.PIPE).stdout.decode('utf-8')
      print(lines, end='', flush=True)
  '''
  # Line count:
  184903891 ../input/train.csv
  18790470 ../input/test.csv
  100001 ../input/train_sample.csv
  '''
  train一共有lines=184903891 行,那么假设我们需要采样出100w行,那么我们需要跳过lines - 1 - 1000000行,即
  #generate list of lines to skip
  skiplines = np.random.choice(np.arange(1, lines), size=lines-1-1000000, replace=False)
  #sort the list
  skiplines=np.sort(skiplines)
  #check our list
  print('lines to skip:', len(skiplines))
  print('remaining lines in sample:', lines-len(skiplines), '(remember that it includes the heading!)')
  ###################SANITY CHECK###################
  #find lines that weren't skipped by checking difference between each consecutive line
  #how many out of first 100000 will be imported into the csv?
  diff = skiplines[1:100000]-skiplines[2:100001]
  remain = sum(diff!=-1)
  print('Ratio of lines from first 100000 lines:',  '{0:.5f}'.format(remain/100000) ) 
  print('Ratio imported from all lines:', '{0:.5f}'.format((lines-len(skiplines))/lines) )
  train = pd.read_csv('../input/train.csv', skiprows=skiplines, dtype=dtypes)
  train.head()
  del skiplines
  gc.collect()
  4.使用pandas 的生成器,用chunk处理
  这里我们使用np.where过滤掉‘is_attributed’为0的部分(例如[xv if c else yv for (c,xv,yv) in zip(condition,x,y)])
 #set up an empty dataframe
  df_converted = pd.DataFrame()
  #we are going to work with chunks of size 1 million rows
  chunksize = 10 ** 6
  #in each chunk, filter for values that have 'is_attributed'==1, and merge these values into one dataframe
  for chunk in pd.read_csv('../input/train.csv', chunksize=chunksize, dtype=dtypes):
      filtered = (chunk[(np.where(chunk['is_attributed']==1, True, False))])
      df_converted = pd.concat([df_converted, filtered], ignore_index=True, )
  5.只载入若干列
  #wanted columns
  columns = ['ip', 'click_time', 'is_attributed']
  dtypes = {
          'ip'            : 'uint32',
          'is_attributed' : 'uint8',
          }
  ips_df = pd.read_csv('../input/train.csv', usecols=columns, dtype=dtypes)
  print(ips_df.info())
  ips_df.head()
  '''
  <class 'pandas.core.frame.DataFrame'>
  RangeIndex: 184903890 entries, 0 to 184903889
  Data columns (total 3 columns):
  ip               uint32
  click_time       object
  is_attributed    uint8
  dtypes: object(1), uint32(1), uint8(1)
  memory usage: 2.2+ GB
  None'''
  6.结合多种方法创意性的处理数据
  例如无法使用整个数据来groupby那么可以分块来做,
  size=100000
  all_rows = len(ips_df)
  num_parts = all_rows//size
  #generate the first batch
  ip_sums = ips_df[0:size][['ip', 'is_attributed']].groupby('ip', as_index=False).sum()
  #add remaining batches
  for p in range(1,num_parts):
      start = p*size
      end = p*size + size
      if end < all_rows:
          group = ips_df[start:end][['ip', 'is_attributed']].groupby('ip', as_index=False).sum()
      else:
          group = ips_df[start:][['ip', 'is_attributed']].groupby('ip', as_index=False).sum()
      ip_sums = ip_sums.merge(group, on='ip', how='outer')
      ip_sums.columns = ['ip', 'sum1','sum2']
      ip_sums['conversions_per_ip'] = np.nansum((ip_sums['sum1'], ip_sums['sum2']), axis = 0)
      ip_sums.drop(columns=['sum1', 'sum2'], axis = 0, inplace=True)
  7.使用dask代替pandas
  import dask
  import dask.dataframe as dd



上文内容不用于商业目的,如涉及知识产权问题,请权利人联系博为峰小编(021-64471599-8017),我们将立即处理。

评 论

论坛新帖

顶部 底部


建议使用IE 6.0以上浏览器,800×600以上分辨率,法律顾问:上海瀛东律师事务所 张楠律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2018, 沪ICP备05003035号
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪公网安备 31010102002173号

51Testing官方微信

51Testing官方微博

扫一扫 测试知识全知道