Skip to main content
Version: 0.3.0

Bulk Loading Strategies

Learn efficient techniques for loading large datasets into Apache AGE using ageSchemaClient.

Overview

Bulk loading is essential when working with large datasets. This guide covers strategies to optimize performance and memory usage.

Basic Batch Loading

Using the Batch Loader

import { AgeSchemaClient } from 'age-schema-client';

const client = new AgeSchemaClient({
connectionString: 'postgresql://user:pass@localhost:5432/graphdb',
graphName: 'large_graph'
});

// Create batch loader
const batchLoader = client.createBatchLoader();

// Prepare data
const graphData = {
vertices: [
{ label: 'Person', properties: { id: 1, name: 'Alice', age: 30 } },
{ label: 'Person', properties: { id: 2, name: 'Bob', age: 25 } },
// ... thousands more
],
edges: [
{
label: 'KNOWS',
from: { label: 'Person', properties: { id: 1 } },
to: { label: 'Person', properties: { id: 2 } },
properties: { since: '2020' }
},
// ... thousands more
]
};

// Load data
await batchLoader.load(graphData);

Chunked Loading

Processing Large Datasets in Chunks

async function loadLargeDataset(data: any[], chunkSize: number = 1000) {
const batchLoader = client.createBatchLoader();

for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);

const graphData = {
vertices: chunk.map(item => ({
label: item.type,
properties: item.properties
})),
edges: [] // Process edges separately
};

await batchLoader.load(graphData);

console.log(`Processed ${Math.min(i + chunkSize, data.length)} of ${data.length} items`);

// Optional: Add delay to prevent overwhelming the database
await new Promise(resolve => setTimeout(resolve, 100));
}
}

Memory-Efficient Streaming

import { createReadStream } from 'fs';
import { createInterface } from 'readline';

async function loadFromCSV(filePath: string) {
const fileStream = createReadStream(filePath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});

const batchLoader = client.createBatchLoader();
let batch: any[] = [];
const BATCH_SIZE = 1000;

for await (const line of rl) {
const [id, name, age] = line.split(',');

batch.push({
label: 'Person',
properties: { id: parseInt(id), name, age: parseInt(age) }
});

if (batch.length >= BATCH_SIZE) {
await batchLoader.load({ vertices: batch, edges: [] });
batch = [];
}
}

// Process remaining items
if (batch.length > 0) {
await batchLoader.load({ vertices: batch, edges: [] });
}
}

Optimized Loading Patterns

Vertices First, Then Edges

async function loadVerticesAndEdges(vertexData: any[], edgeData: any[]) {
const batchLoader = client.createBatchLoader();

// 1. Load all vertices first
console.log('Loading vertices...');
await batchLoader.load({ vertices: vertexData, edges: [] });

// 2. Load edges in chunks
console.log('Loading edges...');
const EDGE_CHUNK_SIZE = 500; // Smaller chunks for edges

for (let i = 0; i < edgeData.length; i += EDGE_CHUNK_SIZE) {
const edgeChunk = edgeData.slice(i, i + EDGE_CHUNK_SIZE);
await batchLoader.load({ vertices: [], edges: edgeChunk });

console.log(`Loaded ${Math.min(i + EDGE_CHUNK_SIZE, edgeData.length)} of ${edgeData.length} edges`);
}
}

Parallel Loading with Connection Pools

async function parallelLoad(datasets: any[][]) {
const promises = datasets.map(async (dataset, index) => {
// Each parallel operation gets its own client
const parallelClient = new AgeSchemaClient({
connectionString: 'postgresql://user:pass@localhost:5432/graphdb',
graphName: 'large_graph'
});

const batchLoader = parallelClient.createBatchLoader();

try {
await batchLoader.load({
vertices: dataset,
edges: []
});
console.log(`Parallel batch ${index} completed`);
} finally {
await parallelClient.close();
}
});

await Promise.all(promises);
}

Data Preparation Strategies

Pre-processing for Efficiency

// Group data by label for better performance
function groupDataByLabel(rawData: any[]) {
const grouped = rawData.reduce((acc, item) => {
if (!acc[item.label]) {
acc[item.label] = [];
}
acc[item.label].push(item);
return acc;
}, {});

return grouped;
}

// Load grouped data
async function loadGroupedData(rawData: any[]) {
const grouped = groupDataByLabel(rawData);
const batchLoader = client.createBatchLoader();

for (const [label, items] of Object.entries(grouped)) {
console.log(`Loading ${items.length} ${label} vertices...`);

await batchLoader.load({
vertices: items as any[],
edges: []
});
}
}

Data Validation and Cleaning

function validateAndCleanData(data: any[]) {
return data
.filter(item => {
// Remove invalid items
if (!item.properties || !item.properties.id) {
console.warn(`Skipping invalid item: ${JSON.stringify(item)}`);
return false;
}
return true;
})
.map(item => {
// Clean and normalize data
return {
...item,
properties: {
...item.properties,
// Ensure consistent data types
id: parseInt(item.properties.id),
name: String(item.properties.name).trim(),
// Remove null/undefined values
...Object.fromEntries(
Object.entries(item.properties).filter(([_, v]) => v != null)
)
}
};
});
}

Performance Monitoring

Progress Tracking

class LoadingProgressTracker {
private startTime: number;
private totalItems: number;
private processedItems: number = 0;

constructor(totalItems: number) {
this.totalItems = totalItems;
this.startTime = Date.now();
}

update(itemsProcessed: number) {
this.processedItems += itemsProcessed;
const elapsed = Date.now() - this.startTime;
const rate = this.processedItems / (elapsed / 1000);
const remaining = this.totalItems - this.processedItems;
const eta = remaining / rate;

console.log(`Progress: ${this.processedItems}/${this.totalItems} (${(this.processedItems/this.totalItems*100).toFixed(1)}%)`);
console.log(`Rate: ${rate.toFixed(1)} items/sec, ETA: ${eta.toFixed(0)}s`);
}
}

// Usage
async function loadWithProgress(data: any[]) {
const tracker = new LoadingProgressTracker(data.length);
const CHUNK_SIZE = 1000;

for (let i = 0; i < data.length; i += CHUNK_SIZE) {
const chunk = data.slice(i, i + CHUNK_SIZE);

await client.createBatchLoader().load({
vertices: chunk,
edges: []
});

tracker.update(chunk.length);
}
}

Memory Usage Monitoring

function logMemoryUsage(label: string) {
const used = process.memoryUsage();
console.log(`${label} - Memory usage:`);
for (let key in used) {
console.log(` ${key}: ${Math.round(used[key] / 1024 / 1024 * 100) / 100} MB`);
}
}

async function loadWithMemoryMonitoring(data: any[]) {
logMemoryUsage('Before loading');

const CHUNK_SIZE = 1000;
for (let i = 0; i < data.length; i += CHUNK_SIZE) {
const chunk = data.slice(i, i + CHUNK_SIZE);

await client.createBatchLoader().load({
vertices: chunk,
edges: []
});

if (i % (CHUNK_SIZE * 10) === 0) {
logMemoryUsage(`After ${i} items`);

// Force garbage collection if available
if (global.gc) {
global.gc();
}
}
}

logMemoryUsage('After loading');
}

Error Handling and Recovery

Robust Error Handling

async function loadWithErrorHandling(data: any[]) {
const batchLoader = client.createBatchLoader();
const CHUNK_SIZE = 1000;
const failedChunks: any[] = [];

for (let i = 0; i < data.length; i += CHUNK_SIZE) {
const chunk = data.slice(i, i + CHUNK_SIZE);

try {
await batchLoader.load({
vertices: chunk,
edges: []
});
console.log(`Successfully loaded chunk ${i / CHUNK_SIZE + 1}`);
} catch (error) {
console.error(`Failed to load chunk ${i / CHUNK_SIZE + 1}:`, error);
failedChunks.push({ chunk, index: i });
}
}

// Retry failed chunks with smaller batch sizes
if (failedChunks.length > 0) {
console.log(`Retrying ${failedChunks.length} failed chunks...`);

for (const { chunk, index } of failedChunks) {
await retryChunkWithSmallerBatches(chunk, index);
}
}
}

async function retryChunkWithSmallerBatches(chunk: any[], originalIndex: number) {
const SMALL_BATCH_SIZE = 100;

for (let i = 0; i < chunk.length; i += SMALL_BATCH_SIZE) {
const smallBatch = chunk.slice(i, i + SMALL_BATCH_SIZE);

try {
await client.createBatchLoader().load({
vertices: smallBatch,
edges: []
});
} catch (error) {
console.error(`Failed to load small batch at ${originalIndex + i}:`, error);
// Log individual items for manual inspection
console.error('Failed items:', smallBatch);
}
}
}

Best Practices

Configuration Optimization

// Optimized client configuration for bulk loading
const bulkLoadClient = new AgeSchemaClient({
connectionString: 'postgresql://user:pass@localhost:5432/graphdb',
graphName: 'large_graph',
poolConfig: {
max: 20, // Increase connection pool size
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 10000
}
});

Transaction Management

async function loadInTransactions(data: any[]) {
const TRANSACTION_SIZE = 5000;

for (let i = 0; i < data.length; i += TRANSACTION_SIZE) {
const chunk = data.slice(i, i + TRANSACTION_SIZE);

await client.transaction(async (tx) => {
const batchLoader = tx.createBatchLoader();
await batchLoader.load({
vertices: chunk,
edges: []
});
});

console.log(`Committed transaction for items ${i} to ${i + chunk.length}`);
}
}

Next Steps