精通IPFS:IPFS 保存内容之上篇

经过前面的分析,我们已经明白了 IPFS 启动过程,从今天起,我会分析一些常见的命令或动作,希望大家喜欢。

在开始真正分析这些命令/动作之前,先要对 pull-stream 类库进行简单介绍,如果不熟悉这个类库,接下来就没办法进行。

pull-stream 是一个新型的流库,数据被从源中拉取到目的中,它有两种基本类型的流:Source 源和 Sink 接收器。除此之外,有两种复合类型的流:Through 通道流(比如转换)和 Duplex 双向流。

source 流,这类流返回一个匿名函数,这个匿名函数被称为 read 函数,它被后续的 sink 流函数或 through 流函数调用,从而读取 source 流中的内容。

sink 流,这类流最终都返回内部 drain.js 中的 sink 函数。这类流主要是读取数据,并且对每一个读取到的数据进行处理,如果流已经结束,则调用用户指定结束函数进行处理。

through 流,这类流的函数会返回嵌套的匿名函数,第一层函数接收一个 source 流的 read 函数或其他 through 函数返回的第一层函数为参数,第二层函数接收最终 sink 提供的写函数或其他 through 返回的第二层函数,第二层函数内部调用 read 函数,从而直接或间接从 source 中取得数据,获取数据后直接或间接调用 sink 函数,从而把数据写入到目的地址。

在 pull-streams 中,数据在流动之前,必须有一个完整的管道,这意味着一个源、零个或多个通道、一个接收器。但是仍然可以创建一个部分化的管道,这非常有用。也就是说可以创建一个完整的管道,比如 pull(source, sink) => undefined,也可以部分化的管道,比如pull(through, sink) => sink,或者pull(through1, through2) => through,我们在下面会大量遇到这种部分化的管道。

今天,我们看下第一个最常用的 add命令/动作,我们使用 IPFS 就是为了把文件保存到 IPFS,自然少不了保存操作,add命令就是干这个的,闲话少数,我们来看一段代码。

const {createNode} = require('ipfs')

const node = createNode({ libp2p:{ config:{ dht:{ enabled:true } } }})

node.on('ready', async => {

const content = `我爱黑萤`;

const filesAdded = await node.add({ content: Buffer.from(content) },{ chunkerOptions:{ maxChunkSize:1000, avgChunkSize:1000 } })

console.log('Added file:', filesAdded[0].path, filesAdded[0].hash)})

这次我们没有完全使用默认配置,开启了 DHT,看过我文章的读者都知道 DHT 是什么东东,这里不详细解释。在程序中,通过调用 IPFS 节点的 add方法来上传内容,内容可以是文件,也可以是直接的内容,两者有稍微的区别,在讲到相关代码时,我们指出这种区别的,这里我们为了简单直接上传内容为例来说明。

add方法位于core/components/files-regular/add.js文件中,在 《精通IPFS:系统启动之概览》 那篇文章中,我们说过,系统会把core/components/files-regular目录下的所有文件扩展到 IPFS 对象上面,这其中自然包括这里的add.js文件。下面,我们直接看这个函数的执行流程。

这个函数返回了一个内部定义的函数,在这个内部定义的函数中对参数做了一些处理,然后就调用内部的 add函数,后者才是主体,它的逻辑如下:

  1. 首先,检查选项对象是否为函数,如果是,则重新生成相关的变量。
    if (typeof options === 'function') { callback = options options = {} } 
  2. 定义检测内容的工具函数来检测我们要上传的内容。
    const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj) const isContentObject = obj => { if (typeof obj !== 'object') return false if (obj.content) return isBufferOrStream(obj.content) return Boolean(obj.path) && typeof obj.path === 'string' }

    const isInput = obj => isBufferOrStream(obj) || isContentObject(obj) const ok = isInput(data) || (Array.isArray(data) && data.every(isInput))

    if (!ok) { return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects')) }

  3. 接下来,执行 pull-stream 类库提供的 pull函数。我们来看pull函数的主要内容。它的第一个参数是pull.values函数执行的结果,这个values函数就是一个 source 流,它返回一个称为read的函数来读取我们提供的数据。这个read函数从数组中读取当前索引位置的值,以此值为参数,调用它之后的 through 函数第二层函数内部定义的回调函数或最终的 sink 函数内部定义的回调函数。如果数组已经读取完成,则直接以 true 为参数进行调用。

    第二个参数是 IPFS 对象的 addPullStream方法,这个方法也是在启动时候使用同样的方法扩展到 IPFS 对象,它的主体是当前目录的add-pull-stream.js文件中的函数。接下来,我们会详细看这个函数,现在我们只需要知道这个函数返回了一个部分化的管道。

    第三个参数是 pull-sort中定义的函数,这是一个依赖于pull-stream的库,根据一定规则来排序,这个函数我们不用管。

    最后一个参数是 pull.collect函数执行的结果,这个collect函数就是一个 sink 流。它把最终的结果放入一个数组中,然后调用回调函数。我们在前面代码中看到的filesAdded之所以是一个数组就是拜这个函数所赐。

    上面逻辑的代码如下:

    pull( pull.values([data]), self.addPullStream(options), sort((a, b) => { if (a.path < b.path) return 1 if (a.path > b.path) return -1 return 0 }), pull.collect(callback) ) 

    在上面的代码中,我们把要保存的内容构成一个数组,具体原因下面解释。

现在,我们来看 addPullStream方法,这个方法是保存内容的主体,add方法是只开胃小菜。addPullStream方法执行逻辑如下:

  1. 调用 parseChunkerString函数,处理内容分块相关的选项。这个函数位于相同目录下的utils.js文件中,它检查用户指定的分块算法。如果用户没有指定,则使用固定分块算法,大小为系统默认的 262144;如果指定了大小,则使用固定分块算法,但大小为用户指定大小;如果指定为rabin类分割法,即变长分割法,则调用内部函数来生成对应的分割选项。上面逻辑代码如下:
    parseChunkerString = (chunker) => { if (!chunker) { return { chunker: 'fixed' } } else if (chunker.startsWith('size-')) { const sizeStr = chunker.split('-')[1] const size = parseInt(sizeStr) if (isNaN(size)) { throw new Error('Chunker parameter size must be an integer') } return { chunker: 'fixed', chunkerOptions: { maxChunkSize: size } } } else if (chunker.startsWith('rabin')) { return { chunker: 'rabin', chunkerOptions: parseRabinString(chunker) } } else { throw new Error(Unrecognized chunker option: ${chunker}) } }

    注意:我们也可以通过重写这个函数来增加自己的分割算法。

  2. 合并整理选项变量。
    const opts = Object.assign({}, { shardSplitThreshold: self._options.EXPERIMENTAL.sharding ? 1000 : Infinity }, options, chunkerOptions) 
  3. 设置默认的 CID 版本号。如果指定了 Hash 算法,但是 CID 版本又不是 1,则强制设为 1。CID 是分布式系统的自描述内容寻址标识符,目前有两个版本 0 和 1,版本 0 是一个向后兼容的版本,只支持 sha256 哈希算法,并且不能指定。
    if (opts.hashAlg && opts.cidVersion !== 1) { opts.cidVersion = 1 } 
  4. 设置进度处理函数,默认空实现。
    const prog = opts.progress || noop const progress = (bytes) => { total += bytes prog(total) }

    opts.progress = progress

  5. pull函数返回一个部分化的 pull-stream 流。这个部分化的 pull-stream 流是处理文件/内容保存的关键,我们仔细研究下。
    1. 首先调用 pull.map方法对保存的内容进行处理。pull.map方法是 pull-stream 流中的一个 source 流,它对数组中的每个元素使用指定的处理函数进行处理。这就是我们在add函数中把需要保存的内容转化为数组的原因。在这里,对每个数组元素进行处理的函数是normalizeContent。这个函数定义在同一个文件中,它首先检查保存的内容是否为数组,如果不是则转化为数组;然后,对数组中的每一个元素进行处理,具体如下:
      • 如果保存的内容是 Buffer 对象,则把要保存的内容转化为路径为空字符串,内容为 pull-stream 流的对象。
        if (Buffer.isBuffer(data)) { data = { path: '', content: pull.values([data]) } } 
      • 如果保存的内容是一个 Node.js 可读流,比如文件,则把要保存的转化为路径为空字符串,内容使用 stream-to-pull-stream 类的 source方法库把 Node.js 可读流转化为 pull-stream 的 source 流对象。
        if (isStream.readable(data)) { data = { path: '', content: toPull.source(data) } } 
      • 如果保存的内容是 pull-stream 的 source 流,则把要保存的内容转化为路径为空字符串,内容不变的对象。
        if (isSource(data)) { data = { path: '', content: data } } 
      • 如果要保存的内容是一个对象,并且 content属性存在,且不是函数,则进行如下处理:
        if (data && data.content && typeof data.content !== 'function') { if (Buffer.isBuffer(data.content)) { data.content = pull.values([data.content]) }

        if (isStream.readable(data.content)) { data.content = toPull.source(data.content) } }

      • 如果指定的是路径,则进行下面的处理。
        if (opts.wrapWithDirectory && !data.path) { throw new Error('Must provide a path when wrapping with a directory') }

        if (opts.wrapWithDirectory) { data.path = WRAPPER + data.path }

      • 返回最终生成的要保存的内容。
    2. 调用 pull.flatten方法,把前上步生成的数组进行扁平化处理。flatten方法是一个 through 流,主要是把多个流或数组流转换为一个流,比如把多个数组转换成一个数组,比如:
      [ [1, 2, 3], [4, 5, 6], [7, 8, 9] ] 

      这样的数组使用这个方法处理后,最终会变成下面的数组

      [1, 2, 3, 4, 5, 6, 7, 8, 9] 
    3. 调用 importer函数来保存内容。这个函数定义在ipfs-unixfs-importer类库中,这个类库是 IPFS 用于处理文件的布局和分块机制的 JavaScript 实现,具体如何保存内容,如何分块我们将在下篇文章中进行详细分析。
    4. 调用 pull.asyncMap方法,对已经保存的文件/内容进行预处理,生成用户看到的内容。当程序执行到这里时,我们要保存的文件或内容已经保存在本地 IPFS 仓库,已经可以用使用catgetls等命令来 API 来查看我们保存的内容或文件了。asyncMap方法是一个 through 流,类似于map流,但是有更好的性能。它会对每一个数组元素进行处理,这里处理函数为prepareFile

      这个函数定义在同一个文件中,它的处理具体如下:

      • 使用已经生成文件的 multihash内容生成 CID 对象。
        let cid = new CID(file.multihash) 

        CID 构造方法会检查传入的参数,如果是 CID 对象,则直接从对象中取出版本号、编码方式、多哈希等属性;如果是字符串,则又分为是否被 multibase 编码过,如果是则需要先解码,然后再分离出各种属性,如果没有经过 multibase 编码,那么肯定是 base58 字符串,则设置版本为0,编码方式为dag-pb,再从 base58 串中获取多哈希值;如果是缓冲对象,则取得第一个字节,并按十六进制转化成整数,如果第一个字节是 0或1,则生成各自属性,否则为多哈希,则设置版本为0,编码方式为dag-pb

      • 如果用户指定 CID 版本为 1,则生成 CID 对象到版本1.
        if (opts.cidVersion === 1) { cid = cid.toV1 } 
      • 接下来,调用 waterfall方法,顺序处理它指定的函数。第一个函数,检查配置选项是否指定了onlyHash,即不实际地上传文件到IFS网络,仅仅计算一下这个文件的 HASH,那么直接调用第二个函数,否则,调用 IPFS 对象的object.get方法来获取指定文件在仓库中保存的节点信息。这个方法我们后面会专门讲解,这里略去不讲。第二个函数,生成最终返回给用户的对象,这个对象包括了:path、size、hash 等。

        上面代码如下,比较简单,可自己阅读。

         waterfall([ (cb) => opts.onlyHash ? cb(null, file) : self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb), (node, cb) => { const b58Hash = cid.toBaseEncodedString()
         let size = node.size

        if (Buffer.isBuffer(node)) { size = node.length }

        cb(null, { path: opts.wrapWithDirectory ? file.path.substring(WRAPPER.length) : (file.path || b58Hash), hash: b58Hash, size }) }

         ], callback) 
    5. 调用 pull.map方法,把已经保存到本地的文件预加载到指定节点。map是一个 through 流,它会对每一个数组元素进行处理,这里处理函数为preloadFile。这个函数定义在同一个文件中,这会把已经保存的文件预加载到指定的节点,具体保存在哪些节点,可以参考《精通IPFS:系统启动之概览》篇中preload.addresses,也可以手动指定。
    6. 调用 pull.asyncMap方法,把已经保存到本地的文件长期保存在本地,确保不被垃圾回收。asyncMap方法是一个 through 流,这里处理函数为pinFile。pin 操作后面我们会详细分析,这里略过不提,读者可以自行阅读相关代码。