HDFS(GFS) Python实现
实现了HDFS的最基本的功能。具体的每一步的实现过程可以在这里看到https://github.com/DanielJyc/HDFS。每一个commit都可以执行,算是记录了自己的实现过程。总体的设计框图如下:
总体设计
操作:
- 上传文件到HDFS:uploadfilename
- 从HDFS下载文件:downloadfilename
- 删除HDFS的文件:deletefilename
- 列出HDFS文件:ls
- 退出HDFS:exits
实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | # -*- coding: UTF-8 -*- import os import uuid import math import time class Client(object): """docstring for Client""" def __init__(self, namenode): self.namenode = namenode def write(self, filename, data): chunks = [] #存放data分出来的num_chunks份数据 chunkloc = 1 num_chunks = self.get_num_chunks(data) for i in range(0, len(data), self.namenode.chunksize): chunks.append(data[i:i+self.namenode.chunksize]) chunk_uuids = self.namenode.alloc(filename, num_chunks) #为文件分配空间,更新元数据 for i in range(0, len(chunk_uuids)): chunkloc = i % self.namenode.num_datanodes + 1 self.namenode.datanodes[chunkloc].write(chunk_uuids[i], chunks[i]) #备份第二份 chunkloc = chunkloc % self.namenode.num_datanodes + 1 self.namenode.datanodes[chunkloc].write(chunk_uuids[i], chunks[i]) def read(self, filename): if True == self.namenode.exits(filename) : data = '' chunk_uuids = self.namenode.filetable[filename] for chunk_uuid in chunk_uuids: chunkloc = self.namenode.chunktable[chunk_uuid] #获取uuid的DataNode的位置 data_temp = self.namenode.datanodes[chunkloc].read(chunk_uuid) if -1 == data_temp: #读取当前DataNode上的chunk不存在(即:某一个DataNode被损坏) data_temp = self.namenode.datanodes[chunkloc%self.namenode.num_datanodes + 1].read(chunk_uuid) print 'Current chunk is broken.' #读取下一个DataNode的chunk data = data + data_temp return data else : print "The file: \"" + filename + "\" is not exits." def delete(self, filename): #删除文件:物理删除和元数据删除 if True == self.namenode.exits(filename) : chunk_uuids = self.namenode.filetable[filename] for chunk_uuid in chunk_uuids : chunkloc = self.namenode.chunktable[chunk_uuid] self.namenode.datanodes[chunkloc].delete(chunk_uuid) #物理删除:第一份 self.namenode.datanodes[chunkloc%self.namenode.num_datanodes + 1].delete(chunk_uuid) #物理删除:第二份 self.namenode.delete(filename) #逻辑删除:在元数据删除信息 else : print "The file: \"" + filename + "\" is not exits." def list_files(self): print "Files:" for (k, v) in self.namenode.filetable.items(): print k def get_num_chunks(self, data): return int(math.ceil(len(data)*1.0 / self.namenode.chunksize)) class Namenode(object): """docstring for Namenode""" def __init__(self): self.num_datanodes = 3 self.chunksize = 10 self.filetable = {} self.chunktable = {} self.datanodes = {} self.init_datanodes() #初始化:loc<-->server def init_datanodes(self): for i in range(1, self.num_datanodes+1): self.datanodes[i] = Datanode(i) def alloc(self, filename, num_chunks): #完成映射:filetable和chunktable chunkloc = 1 chunk_uuids = [] for i in range(0, num_chunks): chunk_uuid = uuid.uuid1(); chunk_uuids.append(chunk_uuid) self.chunktable[chunk_uuid] = chunkloc chunkloc = chunkloc % self.num_datanodes + 1 #!!注意:要+1,否则chunkloc值不会变坏 self.filetable[filename] = chunk_uuids print self.filetable return chunk_uuids def delete(self, filename): chunk_uuids = self.filetable[filename] for chunk_uuid in chunk_uuids: self.chunktable.pop(chunk_uuid) self.filetable.pop(filename) def exits(self, filename): #检测文件是否存在 if filename in self.filetable: return True else: return False class Datanode(object): """docstring for Datanode""" def __init__(self, chunkloc): self.chunkloc = chunkloc self.local_fs_root = "D:/HDFSTemp/Datanode" + str(chunkloc) #用不同的目录来模仿不同的Datanode if not os.path.isdir(self.local_fs_root): os.makedirs(self.local_fs_root) def write(self, chunk_uuid, chunk):#写入到chunk try: with open(self.local_fs_root + "/" + str(chunk_uuid), "w") as fw: fw.write(chunk) except IOError : print "The HDFS is broken." def read(self, chunk_uuid): #从chunk读取 data = None try : with open(self.local_fs_root + "/" + str(chunk_uuid), "r") as fr: data = fr.read() return data except IOError : return -1 def delete(self, chunk_uuid): try: os.remove(self.local_fs_root + "/" + str(chunk_uuid)) except WindowsError: print "Filename:" + self.local_fs_root + "/" + str(chunk_uuid) + 'dose not exits.' class Command(object): """docstring for Command""" def __init__(self, client): self.client = client def command_line(self): while True: cmd = raw_input('Input your command:\n') if('upload' == cmd): self.upload_cmd() elif('download' == cmd): self.download_cmd() elif('delete' == cmd): filename = raw_input('Input the filename which you want to delete in HDFS:\n') self.client.delete(filename) elif('ls' == cmd): self.client.list_files() elif('exits' == cmd): break else: print "Wrong command. \n" def upload_cmd(self): filename = raw_input('Input the filename which you want to upload in local:\n') try : with open(filename, "r") as fr: #读取本地文件 data = fr.read() self.client.write(filename, data) #写入HDFS except IOError : print "No such file in local." def download_cmd(self): filename = raw_input('Input the filename which you want to download in HDFS:\n') data = self.client.read(filename) #读取HDFS文件 print data with open(filename, "w") as fw: fw.write(data) #写入本地 def main(): nd = Namenode() client = Client(nd) command = Command(client) command.command_line() if __name__ == '__main__': main() |