文章目录
  1. 1. 总体设计
  2. 2. 操作:
  3. 3. 实现代码

实现了HDFS的最基本的功能。具体的每一步的实现过程可以在这里看到https://github.com/DanielJyc/HDFS。每一个commit都可以执行,算是记录了自己的实现过程。总体的设计框图如下:

总体设计




操作:

  • 上传文件到HDFS:uploadfilename
  • 从HDFS下载文件:downloadfilename
  • 删除HDFS的文件:deletefilename
  • 列出HDFS文件:ls
  • 退出HDFS:exits

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# -*- coding: UTF-8 -*-
import os
import uuid
import math
import time

class Client(object):
	"""docstring for Client"""
	def __init__(self, namenode):
		self.namenode = namenode

	def write(self, filename, data):
		chunks = [] #存放data分出来的num_chunks份数据
		chunkloc = 1
		num_chunks = self.get_num_chunks(data)
		for i in range(0, len(data), self.namenode.chunksize):
			chunks.append(data[i:i+self.namenode.chunksize])
		chunk_uuids = self.namenode.alloc(filename, num_chunks) #为文件分配空间,更新元数据	
		for i in range(0, len(chunk_uuids)):			
			chunkloc = i % self.namenode.num_datanodes + 1
			self.namenode.datanodes[chunkloc].write(chunk_uuids[i], chunks[i]) 	
			#备份第二份		
			chunkloc = chunkloc % self.namenode.num_datanodes + 1
			self.namenode.datanodes[chunkloc].write(chunk_uuids[i], chunks[i]) 

	def read(self, filename):
		if True == self.namenode.exits(filename) :
			data = ''
			chunk_uuids = self.namenode.filetable[filename]
			for chunk_uuid in chunk_uuids:
				chunkloc = self.namenode.chunktable[chunk_uuid] #获取uuid的DataNode的位置
				data_temp = self.namenode.datanodes[chunkloc].read(chunk_uuid)	
				if -1 == data_temp: #读取当前DataNode上的chunk不存在(即:某一个DataNode被损坏)
					data_temp = self.namenode.datanodes[chunkloc%self.namenode.num_datanodes + 1].read(chunk_uuid)
					print 'Current chunk is broken.'  #读取下一个DataNode的chunk
				data = data + data_temp
			return data
		else :
			print "The file: \"" + filename + "\" is not exits."		

	def delete(self, filename):  #删除文件:物理删除和元数据删除
		if True == self.namenode.exits(filename) :
			chunk_uuids = self.namenode.filetable[filename]
			for chunk_uuid in chunk_uuids :
				chunkloc = self.namenode.chunktable[chunk_uuid]
				self.namenode.datanodes[chunkloc].delete(chunk_uuid)  #物理删除:第一份
				self.namenode.datanodes[chunkloc%self.namenode.num_datanodes + 1].delete(chunk_uuid)  #物理删除:第二份
			self.namenode.delete(filename) #逻辑删除:在元数据删除信息
		else :
			print "The file: \"" + filename + "\" is not exits."

	def list_files(self):
		print "Files:"
		for (k, v) in self.namenode.filetable.items():
			print k

	def get_num_chunks(self, data):
		return int(math.ceil(len(data)*1.0 / self.namenode.chunksize))

class Namenode(object):
	"""docstring for Namenode"""
	def __init__(self):
		self.num_datanodes = 3
		self.chunksize = 10
		self.filetable = {}
		self.chunktable = {}
		self.datanodes = {}
		self.init_datanodes() #初始化:loc<-->server

	def init_datanodes(self):
		for i in range(1, self.num_datanodes+1):
			self.datanodes[i] = Datanode(i)

	def alloc(self, filename, num_chunks): #完成映射:filetable和chunktable
		chunkloc = 1
		chunk_uuids = []
		for i in range(0, num_chunks):
			chunk_uuid = uuid.uuid1();
			chunk_uuids.append(chunk_uuid)
			self.chunktable[chunk_uuid] = chunkloc
			chunkloc = chunkloc % self.num_datanodes + 1 #!!注意:要+1,否则chunkloc值不会变坏
		self.filetable[filename] = chunk_uuids
		print self.filetable
		return chunk_uuids

	def delete(self, filename):
		chunk_uuids = self.filetable[filename]
		for chunk_uuid in chunk_uuids:
			self.chunktable.pop(chunk_uuid)
		self.filetable.pop(filename)

	def exits(self, filename): #检测文件是否存在
		if filename in self.filetable:
			return True
		else: 
			return False
		
class Datanode(object):
	"""docstring for Datanode"""
	def __init__(self, chunkloc):
		self.chunkloc = chunkloc
		self.local_fs_root = "D:/HDFSTemp/Datanode" + str(chunkloc) #用不同的目录来模仿不同的Datanode
		if not os.path.isdir(self.local_fs_root):
			os.makedirs(self.local_fs_root)

	def write(self, chunk_uuid, chunk):#写入到chunk
		try:
			with open(self.local_fs_root + "/" + str(chunk_uuid), "w") as fw:
				fw.write(chunk)
		except IOError :
			print "The HDFS is broken."
	def read(self, chunk_uuid): #从chunk读取
		data = None
		try :
			with open(self.local_fs_root + "/" + str(chunk_uuid), "r") as fr:
				data = fr.read()
			return data
		except IOError :
			return -1

	def delete(self, chunk_uuid):
		try:
			os.remove(self.local_fs_root + "/" + str(chunk_uuid))
		except WindowsError:
			print "Filename:" + self.local_fs_root + "/" + str(chunk_uuid) + 'dose not exits.'

class Command(object):
	"""docstring for Command"""
	def __init__(self, client):
		self.client = client
	
	def command_line(self):
		while True:
			cmd = raw_input('Input your command:\n')	
			if('upload' == cmd):
				self.upload_cmd()
			elif('download' == cmd):
				self.download_cmd()
			elif('delete' == cmd):
				filename = raw_input('Input the filename which you want to delete in HDFS:\n')
				self.client.delete(filename)
			elif('ls' == cmd):
				self.client.list_files()
			elif('exits' == cmd):
				break
			else:
				print "Wrong command. \n"

	def upload_cmd(self):
		filename = raw_input('Input the filename which you want to upload in local:\n')
		try :
			with open(filename, "r") as fr: #读取本地文件
				data = fr.read()
				self.client.write(filename, data)	  #写入HDFS
		except IOError :
			print "No such file in local."

	def download_cmd(self):
		filename = raw_input('Input the filename which you want to download in HDFS:\n')
		data = self.client.read(filename)  #读取HDFS文件
		print data 
		with open(filename, "w") as fw:  
			fw.write(data)	  #写入本地

def main():		
	nd = Namenode()
	client = Client(nd)	
	command = Command(client)
	command.command_line()

if __name__ == '__main__':
	main()
文章目录
  1. 1. 总体设计
  2. 2. 操作:
  3. 3. 实现代码