1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
17 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
18 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/;
20 export const processLogsPanelActions = unionize({
21 RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
22 INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
23 SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
24 ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
27 // Max size of logs to fetch in bytes
28 const maxLogFetchSize: number = 128 * 1000;
30 type FileWithProgress = {
35 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
37 export const setProcessLogsPanelFilter = (filter: string) =>
38 processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
40 export const initProcessLogsPanel = (processUuid: string) =>
41 async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
42 dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
43 const process = getProcess(processUuid)(getState().resources);
44 if (process?.containerRequest?.uuid) {
45 // Get log file size info
46 const logFiles = await loadContainerLogFileList(process.containerRequest.uuid, logService);
48 // Populate lastbyte 0 for each file
49 const filesWithProgress = logFiles.map((file) => ({file, lastByte: 0}));
51 // Fetch array of LogFragments
52 const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
54 // Populate initial state with filters
55 const initialState = createInitialLogPanelState(logFiles, logLines);
56 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
60 export const pollProcessLogs = (processUuid: string) =>
61 async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
63 // Get log panel state and process from store
64 const currentState = getState().processLogsPanel;
65 const process = getProcess(processUuid)(getState().resources);
67 // Check if container request is present and initial logs state loaded
68 if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) {
69 const logFiles = await loadContainerLogFileList(process.containerRequest.uuid, logService);
71 // Determine byte to fetch from while filtering unchanged files
72 const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
73 // Fetch last byte or 0 for new log files
74 const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
76 const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
77 const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
79 if (isNew || isChanged) {
80 return acc.concat({file: updatedFile, lastByte: currentStateLogLastByte});
84 }, [] as FileWithProgress[]);
86 // Perform range request(s) for each file
87 const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
89 if (logFragments.length) {
90 // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
91 const groupedLogs = groupLogs(logFiles, logFragments);
92 await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
95 return Promise.resolve();
97 return Promise.reject();
101 const loadContainerLogFileList = async (containerUuid: string, logService: LogService) => {
102 const logCollectionContents = await logService.listLogFiles(containerUuid);
104 // Filter only root directory files matching log event types which have bytes
105 return logCollectionContents.filter((file): file is CollectionFile => (
106 file.type === CollectionFileType.FILE &&
107 file.path === `/arvados/v1/container_requests/${containerUuid}/log` &&
108 PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
114 * Loads the contents of each file from each file's lastByte simultaneously
115 * while respecting the maxLogFetchSize by requesting the start and end
116 * of the desired block and inserting a snipline.
117 * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
120 * @returns LogFragment[] containing a single LogFragment corresponding to each input file
122 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
123 (await Promise.all(logFilesWithProgress.filter(({file}) => file.size > 0).map(({file, lastByte}) => {
124 const requestSize = file.size - lastByte;
125 if (requestSize > maxLogFetchSize) {
126 const chunkSize = Math.floor(maxLogFetchSize / 2);
127 const firstChunkEnd = lastByte+chunkSize-1;
129 logService.getLogFileContents(process.containerRequest.uuid, file, lastByte, firstChunkEnd),
130 logService.getLogFileContents(process.containerRequest.uuid, file, file.size-chunkSize, file.size-1)
131 ] as Promise<(LogFragment | undefined)>[]);
133 return Promise.all([logService.getLogFileContents(process.containerRequest.uuid, file, lastByte, file.size-1)]);
135 }))).filter((logResponseSet) => ( // Top level filter ensures array of LogFragments is not empty and contains 1 or more fragments containing log lines
136 logResponseSet.length && logResponseSet.some(logFragment => logFragment && logFragment.contents.length)
137 )).map((logResponseSet)=> {
138 // Remove fragments from subarrays that are undefined or have no lines
139 const responseSet = logResponseSet.filter((logFragment): logFragment is LogFragment => (
140 !!logFragment && logFragment.contents.length > 0
143 // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
144 // Don't add snip line as a separate line so that sorting won't reorder it
145 for (let i = 1; i < responseSet.length; i++) {
146 const fragment = responseSet[i-1];
147 const lastLineIndex = fragment.contents.length-1;
148 const lastLineContents = fragment.contents[lastLineIndex];
149 const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
151 responseSet[i-1].contents[lastLineIndex] = newLastLine;
154 // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
155 return responseSet.reduce((acc, curr: LogFragment) => ({
156 logType: curr.logType,
157 contents: [...(acc.contents || []), ...curr.contents]
158 }), {} as LogFragment);
162 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): {filters: string[], logs: ProcessLogs} => {
163 const logs = groupLogs(logFiles, logFragments);
164 const filters = Object.keys(logs);
165 return { filters, logs };
169 * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
171 * @param logFragments
172 * @returns ProcessLogs for the store
174 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
175 const sortableLogFragments = mergeMultilineLoglines(logFragments);
177 const allLogs = mergeSortLogFragments(sortableLogFragments);
178 const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
180 const groupedLogs = logFragments.reduce((grouped, fragment) => ({
182 [fragment.logType as string]: {lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents}
186 [MAIN_FILTER_TYPE]: {lastByte: undefined, contents: mainLogs},
187 [ALL_FILTER_TYPE]: {lastByte: undefined, contents: allLogs},
193 * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
194 * If there is no previous line (first line has no timestamp), the line is deleted.
195 * Only used for combined logs that need sorting by timestamp after merging
196 * @param logFragments
197 * @returns Modified LogFragment[]
199 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
200 logFragments.map((fragment) => {
201 // Avoid altering the original fragment copy
202 let fragmentCopy: LogFragment = {
203 logType: fragment.logType,
204 contents: [...fragment.contents],
206 // Merge any non-timestamped lines in sortable log types with previous line
207 if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
208 for (let i = 0; i < fragmentCopy.contents.length; i++) {
209 const lineContents = fragmentCopy.contents[i];
210 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
211 // Partial line without timestamp detected
213 // If not first line, copy line to previous line
214 const previousLineContents = fragmentCopy.contents[i-1];
215 const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
216 fragmentCopy.contents[i-1] = newPreviousLineContents;
218 // Delete the current line and prevent iterating
219 fragmentCopy.contents.splice(i, 1);
229 * Merges log lines of different types and sorts types that contain timestamps (are sortable)
230 * @param logFragments
231 * @returns string[] of merged and sorted log lines
233 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
234 const sortableLines = fragmentsToLines(logFragments
235 .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType))));
237 const nonSortableLines = fragmentsToLines(logFragments
238 .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
239 .sort((a, b) => (a.logType.localeCompare(b.logType))));
241 return [...sortableLines.sort(sortLogLines), ...nonSortableLines]
244 const sortLogLines = (a: string, b: string) => {
245 return a.localeCompare(b);
248 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
249 fragments.reduce((acc, fragment: LogFragment) => (
250 acc.concat(...fragment.contents)
254 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
255 return logFiles.find((file) => (file.name.startsWith(key)))?.size
258 export const navigateToLogCollection = (uuid: string) =>
259 async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
261 await services.collectionService.get(uuid);
262 dispatch<any>(navigateTo(uuid));
264 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Could not request collection', hideDuration: 2000, kind: SnackbarKind.ERROR }));
268 const ALL_FILTER_TYPE = 'All logs';
270 const MAIN_FILTER_TYPE = 'Main logs';
271 const MAIN_EVENT_TYPES = [
272 LogEventType.CRUNCH_RUN,
277 const PROCESS_PANEL_LOG_EVENT_TYPES = [
278 LogEventType.ARV_MOUNT,
279 LogEventType.CRUNCH_RUN,
280 LogEventType.CRUNCHSTAT,
281 LogEventType.DISPATCH,
282 LogEventType.HOSTSTAT,
283 LogEventType.NODE_INFO,
286 LogEventType.CONTAINER,
287 LogEventType.KEEPSTORE,
290 const NON_SORTED_LOG_TYPES = [
291 LogEventType.NODE_INFO,
292 LogEventType.CONTAINER,